feat: event based non-blocking logger

This commit is contained in:
Chance 2025-04-15 20:08:58 +00:00 committed by BitSyndicate
parent 37abc3f52d
commit f215c10d0e
Signed by: bitsyndicate
GPG key ID: 443E4198D6BBA6DE
8 changed files with 534 additions and 671 deletions

11
subcrates/zlog/Cargo.toml Normal file
View file

@ -0,0 +1,11 @@
[package]
name = "zlog"
version = "0.1.0"
edition = "2024"
[dependencies]
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
[dev-dependencies]
pretty_assertions = "1.4.1"

View file

@ -0,0 +1,58 @@
use crate::LogLevel;
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LoggerConfig {
pub(crate) log_level: LogLevel,
pub(crate) log_to_file: bool,
pub(crate) log_file_path: PathBuf,
pub(crate) log_to_stdout: bool,
pub(crate) stdout_color: bool,
pub(crate) stdout_include_time: bool,
pub(crate) file_include_time: bool,
}
impl LoggerConfig {
pub fn level(mut self, level: LogLevel) -> Self {
self.log_level = level;
self
}
pub fn log_to_file(mut self, f: bool) -> Self {
self.log_to_file = f;
self
}
pub fn colored_stdout(mut self, c: bool) -> Self {
self.stdout_color = c;
self
}
pub fn log_to_stdout(mut self, s: bool) -> Self {
self.log_to_stdout = s;
self
}
pub fn log_path<P: AsRef<Path>>(mut self, p: P) -> Self {
self.log_file_path = p.as_ref().to_path_buf();
self
}
pub fn stdout_include_time(mut self, i: bool) -> Self {
self.stdout_include_time = i;
self
}
pub fn file_include_time(mut self, i: bool) -> Self {
self.file_include_time = i;
self
}
}
impl Default for LoggerConfig {
fn default() -> Self {
Self {
log_level: LogLevel::Debug,
log_to_file: true,
log_file_path: "app.log".into(),
log_to_stdout: true,
stdout_color: true,
stdout_include_time: false,
file_include_time: false,
}
}
}

284
subcrates/zlog/src/lib.rs Normal file
View file

@ -0,0 +1,284 @@
pub mod config;
pub mod query;
#[cfg(test)]
mod tests;
use config::LoggerConfig;
use query::LogQuery;
use std::{
fmt,
fs::OpenOptions,
io::{BufWriter, Write},
str::FromStr,
sync::{
Arc, RwLock,
mpsc::{self, Sender},
},
thread,
time::SystemTime,
};
use tracing::{Event, Level, Subscriber};
use tracing_subscriber::{
layer::{Context, Layer, SubscriberExt},
registry::LookupSpan,
util::SubscriberInitExt,
};
#[derive(Debug, Clone, PartialEq, Eq)]
enum LogEvent {
Log(LogEntry),
Shutdown,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LogEntry {
timestamp: SystemTime,
level: Level,
message: String,
}
impl PartialOrd for LogEntry {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for LogEntry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.cmp(
&other
.timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap(),
)
}
}
struct BufferLayer {
log_entries: Arc<RwLock<Vec<LogEntry>>>,
senders: Vec<Sender<LogEvent>>,
}
impl BufferLayer {
pub fn new(log_entries: Arc<RwLock<Vec<LogEntry>>>, senders: Vec<Sender<LogEvent>>) -> Self {
Self {
log_entries,
senders,
}
}
}
impl Drop for BufferLayer {
fn drop(&mut self) {
for tx in &self.senders {
tx.send(LogEvent::Shutdown).unwrap();
}
self.senders.clear();
}
}
impl<S> Layer<S> for BufferLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
let metadata = event.metadata();
let level = *metadata.level();
let timestamp = SystemTime::now();
let mut message = String::new();
let mut visitor = LogVisitor::new(&mut message);
event.record(&mut visitor);
let log_entry = LogEvent::Log(LogEntry {
timestamp,
level,
message,
});
if let LogEvent::Log(ref entry) = log_entry {
if let Ok(mut buf) = self.log_entries.write() {
buf.push(entry.clone());
}
}
for tx in &self.senders {
let _ = tx.send(log_entry.clone());
}
}
}
struct LogVisitor<'msg> {
message: &'msg mut String,
}
impl<'msg> LogVisitor<'msg> {
fn new(message: &'msg mut String) -> Self {
Self { message }
}
}
impl tracing::field::Visit for LogVisitor<'_> {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn fmt::Debug) {
use std::fmt::Write;
if field.name() == "message" {
write!(self.message, "{:?}", value).unwrap();
} else {
write!(self.message, "{}={:?} ", field.name(), value).unwrap();
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
pub enum LogLevel {
Error,
#[default]
Info,
Warn,
Debug,
Trace,
}
impl From<LogLevel> for tracing::Level {
fn from(level: LogLevel) -> Self {
match level {
LogLevel::Error => Level::ERROR,
LogLevel::Info => Level::INFO,
LogLevel::Debug => Level::DEBUG,
LogLevel::Trace => Level::TRACE,
LogLevel::Warn => Level::WARN,
}
}
}
impl fmt::Display for LogLevel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl FromStr for LogLevel {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"trace" => Ok(Self::Trace),
"debug" => Ok(Self::Debug),
"info" => Ok(Self::Info),
"error" => Ok(Self::Error),
"warn" => Ok(Self::Warn),
_ => Err(()),
}
}
}
pub struct Logger {
subscriber_guard: Option<tracing::subscriber::DefaultGuard>,
log_entries: Arc<RwLock<Vec<LogEntry>>>,
_handles: Vec<thread::JoinHandle<()>>,
}
impl Logger {
pub fn new(config: LoggerConfig) -> Self {
let log_entries = Arc::new(RwLock::new(Vec::new()));
let mut senders = Vec::new();
let mut handles = Vec::new();
if config.log_to_stdout {
let (tx, rx) = mpsc::channel();
senders.push(tx);
let config_clone = config.clone();
let handle = thread::spawn(move || {
for msg in rx {
match msg {
LogEvent::Log(entry) => {
println!(
"{}",
format_entry(
&entry,
config_clone.stdout_color,
config_clone.stdout_include_time
)
);
}
LogEvent::Shutdown => break,
}
}
});
handles.push(handle);
}
if config.log_to_file {
let (tx, rx) = mpsc::channel();
senders.push(tx);
let path = config.log_file_path.clone();
let include_time = config.file_include_time;
let handle = thread::spawn(move || {
let file = OpenOptions::new()
.append(true)
.create(true)
.open(&path)
.expect("Failed to open log file");
let mut writer = BufWriter::new(file);
for msg in rx {
match msg {
LogEvent::Log(entry) => {
let line = format_entry(&entry, false, include_time);
writeln!(writer, "{}", line).expect("Failed to write to log file");
writer.flush().expect("Failed to flush log file");
}
LogEvent::Shutdown => break,
}
}
});
handles.push(handle);
}
let buffer_layer = BufferLayer::new(log_entries.clone(), senders);
let subscriber = tracing_subscriber::registry().with(buffer_layer);
let guard = subscriber.set_default();
Logger {
subscriber_guard: Some(guard),
log_entries,
_handles: handles,
}
}
pub fn get_logs(&self, query: LogQuery) -> Vec<LogEntry> {
let guard = self.log_entries.read().unwrap();
match query {
LogQuery::All => guard.clone(),
LogQuery::From(t) => guard.iter().filter(|e| e.timestamp >= t).cloned().collect(),
}
}
}
impl Drop for Logger {
fn drop(&mut self) {
let _ = self.subscriber_guard.take();
let handles = std::mem::take(&mut self._handles);
for handle in handles {
handle.join().unwrap_or_else(|e| {
eprintln!("Logger thread panicked: {:?}", e);
});
}
}
}
fn format_entry(entry: &LogEntry, use_color: bool, _: bool) -> String {
let lvl = if use_color {
match entry.level {
Level::ERROR => "\x1b[31mERROR\x1b[0m",
Level::WARN => "\x1b[33mWARN\x1b[0m",
Level::INFO => "\x1b[32mINFO\x1b[0m",
Level::DEBUG => "\x1b[36mDEBUG\x1b[0m",
Level::TRACE => "\x1b[34mTRACE\x1b[0m",
}
} else {
entry.level.as_str()
};
format!("{} {}", lvl, entry.message)
}

View file

@ -0,0 +1,14 @@
use std::time::SystemTime;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Default)]
pub enum LogQuery {
#[default]
All,
From(SystemTime),
}
impl LogQuery {
pub fn since(time: SystemTime) -> Self {
LogQuery::From(time)
}
}

View file

@ -0,0 +1,90 @@
use super::*;
use pretty_assertions::assert_eq;
use tracing::Level;
#[test]
fn test_logger_sequential_consistency() {
use std::sync::atomic::{AtomicUsize, Ordering};
use tracing::{debug, error, info, trace, warn};
let config = LoggerConfig::default()
.log_to_stdout(true)
.log_to_file(true);
let logger = Logger::new(config);
static COUNTER: AtomicUsize = AtomicUsize::new(0);
for i in 0..4096 * 128 {
let count = COUNTER.fetch_add(1, Ordering::SeqCst);
match i % 5 {
0 => error!("Error message {}", count),
1 => warn!("Warning message {}", count),
2 => info!("Info message {}", count),
3 => debug!("Debug message {}", count),
_ => trace!("Trace message {}", count),
}
}
let logs = logger.get_logs(LogQuery::All);
assert_eq!(
logs.len(),
4096 * 128,
"Should have exactly 5000 log entries"
);
for (i, log) in logs.iter().enumerate() {
let expected_count = i;
let expected_level = match i % 5 {
0 => Level::ERROR,
1 => Level::WARN,
2 => Level::INFO,
3 => Level::DEBUG,
_ => Level::TRACE,
};
assert_eq!(
log.level, expected_level,
"Log {} has incorrect level: {:?}",
i, log.level
);
let expected_msg = match expected_level {
Level::ERROR => format!("Error message {}", expected_count),
Level::WARN => format!("Warning message {}", expected_count),
Level::INFO => format!("Info message {}", expected_count),
Level::DEBUG => format!("Debug message {}", expected_count),
Level::TRACE => format!("Trace message {}", expected_count),
};
assert_eq!(
log.message, expected_msg,
"Log {} has incorrect message. Expected: '{}', Got: '{}'",
i, expected_msg, log.message
);
if i > 0 {
assert!(
log.timestamp >= logs[i - 1].timestamp,
"Log {} has timestamp out of order. Current: {:?}, Previous: {:?}",
i,
log.timestamp,
logs[i - 1].timestamp
);
}
}
let mut counts: Vec<usize> = logs
.iter()
.map(|log| {
log.message
.split_whitespace()
.last()
.unwrap()
.parse::<usize>()
.unwrap()
})
.collect();
counts.sort();
counts.dedup();
assert_eq!(counts.len(), 4096 * 128, "Found duplicate log entries");
}