Open 7ERr0r opened 1 year ago
I suspect there's better ways to implement this. I imagine very few applications get anywhere close to the 128k default buffer limit, so creating that huge collection of objects right off the bat is pretty wasteful. I ran into this issue because I was creating a number of child processes, and given each their own nonblocking instance to copy stdout into a log file, which turned my 2M app into an 80M app.
I wrote a quick-and-dirty implementation with a tokio mpsc channel, which takes up virtually no memory when it's empty:
use std::{
io::{self, Write},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
thread,
};
use tokio::sync::mpsc::{self, error::TrySendError};
/// NonblockingWriter writes to some underlying writer without blocking.
///
/// This is similar to the result of `tracing_appender::NonBlocking`, but takes
/// up considerably less memory.
#[derive(Debug, Clone)]
pub struct NonblockingWriter {
/// Transmitter used to send lines to the log thread.
tx: mpsc::Sender<Box<[u8]>>,
/// Count of total messages all writers have dropped.
total_dropped: Arc<AtomicUsize>,
}
/// Nonblocking is the actor half of the NonblockingWriter.
pub struct Nonblocking {
/// Transmitter used to send lines to the log thread.
tx: mpsc::Sender<Box<[u8]>>,
/// Count of total messages all writers have dropped.
total_dropped: Arc<AtomicUsize>,
join_handle: std::thread::JoinHandle<io::Result<()>>,
}
impl Nonblocking {
/// Create a new nonblocking writer.
pub fn new<T: Write + Send + 'static>(
mut writer: T,
max_buffered_messages: usize,
name: &str,
) -> Nonblocking {
let (tx, mut rx) = mpsc::channel::<Box<[u8]>>(max_buffered_messages);
let total_dropped = Arc::new(AtomicUsize::new(0));
let join_handle = thread::Builder::new()
.name(format!("log-{name}"))
.spawn(move || {
while let Some(data) = rx.blocking_recv() {
writer.write_all(&data)?;
writer.flush()?;
}
Ok(())
})
.expect("should be able to create threads");
Nonblocking {
tx,
total_dropped,
join_handle,
}
}
/// Get a count of the total number of writes that have been dropped.
pub fn total_dropped(&self) -> usize {
self.total_dropped.load(Ordering::Relaxed)
}
/// Get a new writer for this Nonblocking instance.
pub fn writer(&self) -> NonblockingWriter {
NonblockingWriter {
tx: self.tx.clone(),
total_dropped: self.total_dropped.clone(),
}
}
/// Wait until all NonblockingWriters have been dropped and all data written
/// to the underlying writer.
pub async fn close(self) -> io::Result<()> {
let Self {
tx, join_handle, ..
} = self;
drop(tx);
// Wait for the underlying thread to exit.
tokio::task::spawn_blocking(move || join_handle.join().unwrap())
.await
.unwrap()
}
}
impl NonblockingWriter {
/// Get a count of the total number of writes that have been dropped.
pub fn total_dropped(&self) -> usize {
self.total_dropped.load(Ordering::Relaxed)
}
}
impl Write for NonblockingWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let size = buf.len();
match self.tx.try_send(buf.to_vec().into_boxed_slice()) {
Err(TrySendError::Closed(_)) => {
return Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"log thread has exited",
));
}
Err(TrySendError::Full(_)) => {
self.total_dropped.fetch_add(1, Ordering::Relaxed);
}
Ok(_) => {
// Success!
}
}
Ok(size)
}
fn flush(&mut self) -> io::Result<()> {
// Do nothing.
Ok(())
}
}
Another idea here would be to use a buffer to store messages-to-be-written, and lock-and-swap it out using a mutex (something similar to the solution proposed here). The buffer could start small and grow as needed, so it uses little memory in well behaved programs. This also has the advantage that we could set an upper limit on the number of bytes buffered instead of on the number of "lines" buffered, which is probably a better limit, especially in memory constrained systems.
The buffer may be needlessly large and probably should be configurable. Or we could investigate another solution.
I don't think the mutex solution is viable here because there would be too much contention between producers. We'd either have to try_lock
and then we'd be losing messages even when the buffer is actually mostly empty but another thread happens to be holding the lock, or we'd have to give up on the non-blocking property as the producers would have to wait their turn with the mutex.
That's what I'd have thought too, but that article seems to suggest the opposite - that you get better performance with a simple mutex than with mpsc (although there are many variations on mpsc out there with different performance characteristics). No one ever holds the mutex for very long, so your odds of blocking on the mutex are low.
If we want a quick-and-easy fix here (with the caveat that I haven't actually tried this to verify it empirically) at a glance it looks like futures::channel::mpsc doesn't allocate a huge amount of memory at startup like crossbeam does, and it's async-runtime-agnostic. Might be a good drop-in replacement.
I read the article, but my issue isn't throughput, but guarantees. When a producer fails to lock the mutex, it can either block (not something that should happen here) or it could discard the message (or try a few times more without really blocking, but taking longer from outside point of view).
I don't think spuriously losing messages should happen unless it's really necessary to not block.
Feature Request
Hi, so currently starting a program with only a single
non_blocking
subscriber allocates 4 MB of RAM ( that's a LOT! ) Channel with capacity128_000
allocates a lot ofMsg
by default:[Spoiler] Allocation stacktrace
`128_000 * 24 bytes = 3 MB` But `dhat` says it uses 4 MB ```rust PP 1.1.1/2 { Total: 4,096,000 bytes (3.9%, 101,907.94/s) in 1 blocks (0%, 0.02/s), avg size 4,096,000 bytes, avg lifetime 40,193,109 µs (100% of program duration) Max: 4,096,000 bytes in 1 blocks, avg size 4,096,000 bytes At t-gmax: 4,096,000 bytes (44.64%) in 1 blocks (0.02%), avg size 4,096,000 bytes At t-end: 4,096,000 bytes (45.34%) in 1 blocks (0.03%), avg size 4,096,000 bytes Allocated at { ^1:Crates
tracing-appender
Motivation
Program using 8 MB of RAM just after it starts isn't good. Two (stdout and file output) of those channels/buffers use more than 90% of my program's memory. Reducing
Msg
size 3 times would save a lot of useful memory space.Proposal
Representing
Line
asBox<...>
.Note, that the whole enum shrinks to only 8 bytes just like
Option
Alternatives
Using
unbounded()
crossbeam channel. Implemented as dynamically allocated linked-list of blocks with 31 elements. https://docs.rs/crossbeam-channel/latest/crossbeam_channel/fn.unbounded.htmlUsing
Vec32
frommediumvec
crate on 64-bit architectures:Msg
size from 24 to 16 bytesBox
ofVec
on the heaphttps://docs.rs/mediumvec/latest/mediumvec/vec32/struct.Vec32.html