jonhoo / async-bincode

Asynchronous access to a bincode-encoded item stream.
Apache License 2.0
68 stars 9 forks source link

Only call poll_flush inside poll_close if needed #11

Closed xmakro closed 1 year ago

xmakro commented 1 year ago

Currently poll_close calls poll_flush on every call (see writer.rs:184). Some other implementations of AsyncWrite (e.g. async-compression) expect that poll_flush is not called after poll_close is called for the first time. Should we make the call to poll_flush in this crate conditional, or should we fix the external implementations?

jonhoo commented 1 year ago

It seems reasonable to me for us to ensure that we don't call flush after we've called close :+1: Want to take a stab at a PR?

xmakro commented 1 year ago

Thanks for the quick response Jon. Please let me know if you see issues with the PR

xmakro commented 1 year ago

I found that the send logic is actually a bit more fiddly. The AynscBincodeWriter has an unbounded buffer size which can lead to OOM errors in common scenarios. Example:

let stream = AsyncBincodeReader::<_, Example>::from(BufReader::new(File::open("inputfile").await?));
let sink = AsyncBincodeWriter::from(BufWriter::new(File::create("outputfile").await?)).for_async();
stream.forward(sink).await?

For large files, if the stream produces items quicker than the sink can write, then the buffer of the AsyncBincodeWriter grows until the program runs OOM.

Instead we should rely on the buffer of the BufWriter for this. I changed the PR that the buffer of the AsyncBincodeWriter only contains one serialized item at a time.

jonhoo commented 1 year ago

The OOM is unfortunate, though I also worry about this being a decently-sized performance regression for anyone currently using the crate, as they'll now need to add a BufWriter in front of their output if they want to batch write operations to the underlying writer. May still be worth doing, I just worry about it.

As for #12, I guess the thinking is that changing the call of poll_flush to poll_ready in poll_close is fine because in order to get to the first call to poll_close, we must have already emptied the buffer, and thus the call to poll_ready must no longer be calling poll_write on the underlying writer on re-entry? It does raise the question about whether it's okay to never call poll_flush as part of poll_close though. I thought the proposal was to still call poll_flush in our poll_close, but to only do so if it hasn't already returned Ok (i.e., if we haven't already called the underlying poll_close at least once already)?

xmakro commented 1 year ago

Thank you for the thorough review Jon. I added some documentation about batching write operations. Unfortunately, I do not see any way around this change since the AsyncBincodeWriter must stall instead of growing memory. Introducing a buffer size argument for AsyncBincodeWriter seems wrong, as that is the job of BufWriter. Maybe we can do a major version increase to 0.8 so users have to opt-in to this change?

Re poll_ready in poll_close: Yes, this is my thinking, I added your description as a comment.

Re poll_flush in poll_close: I think this is not needed, the documentation of futures::Sink::poll_close describes that the stream is flushed when poll_close is called.

jonhoo commented 1 year ago

I agree, I think this is the correct change. And I'm okay with not bumping the major version — this isn't a real breaking change, and I'd rather people get the improvement :+1:

And thanks for updating the docs + comments in there!