BurntSushi / rust-csv

A CSV parser for Rust, with Serde support.
The Unlicense
1.72k stars 219 forks source link

api: expose writer flush_buf #333

Closed okjodom closed 1 year ago

okjodom commented 1 year ago

Useful for scenarios where manually flushing the buffer and keeping the Writer is desirable

BurntSushi commented 1 year ago

Please provide a use case. I don't understand why you would want to do this.

okjodom commented 1 year ago

Please provide a use case. I don't understand why you would want to do this.

I'm writing a long running simulator that writes its results to csv file. To monitor progress of the simulation, I tail the results written on the file. I need to periodically write (complete) results to file without flushing the writer. Flushing the writer and re-instantiating it is not ideal because then I'd have to buffer the data I want to write elsewhere before serialization to writer.

If I have wait for the writer to auto flush when the buffer eventually fills up, I'm forced to reduce the buffer size so auto-flush happens more often on some predictable schedule. Still this is not ideal because estimating a buffer size is imprecise, and auto-flush will most likely cut-off the last line written to file, making tail of the results written to file unusable

flush_buf api is perfect because I can control when to write to file, and eventually flush when I need to close the writer. The only downside I can think of in using this pattern, is that the buffer might be underutilized (but users can always reduce allocated buffer size if necessary)

BurntSushi commented 1 year ago

I need to periodically write (complete) results to file without flushing the writer.

This doesn't make any sense to me?

It sounds to me like you want Writer::flush, which already exists and is public.

Flushing the writer and re-instantiating it

I don't know where "re-instantiating it" is coming from. Calling csv::Writer::flush just flushes the buffer and then the underlying writer. There is no re-instantiation happening. (Although I'm still not sure what that means in this context.)

You might need to provide a minimal code example demonstrating your issue.

Like you mentioning tail is indeed usually the magic word indicating that you want to call flush. But what's unclear to me is specifically why flush is inappropriate for you.

okjodom commented 1 year ago

It sounds to me like you want Writer::flush, which already exists and is public.

This is the opposite of what I want in my scenario. I want to keep the writer, even when I flush the buffer in a controlled manner

Flushing the writer and re-instantiating it

I don't know where "re-instantiating it" is coming from. Calling csv::Writer::flush just flushes the buffer and then the underlying writer. There is no re-instantiation happening. (Although I'm still not sure what that means in this context.)

By this, I meant to say if I use csv::Writer::flush, I'd have to create a new Writer, and that is undesirable in my context.

Once I start streaming data and serializing them on the writer, it's desirable that I keep the writer for as long as possible. It's also desirable that I control when to flush the writer buffer to file, so I always have complete lines written on the csv

BurntSushi commented 1 year ago

keep the writer for as long as possible

csv::Writer::flush doesn't have anything to do with "keeping the writer."

I'm sorry, but I can't keep doing this back and forth. There is some kind of communication barrier between us as I do not understand why you want this. In order to move forward, I think you're going to have to demonstrate why you want this with a minimal code sample.

okjodom commented 1 year ago

scenario like this:

/// receives data from a streaming channel
/// writes to file after every 100 data events
/// does this without flushing the writer
fn print_stream(rx: mpsc::Receiver) {
    let mut writer = csv::WriterBuilder::new().from_path(format!(
        "example_{:?}.csv",
        SystemTime::now()
            .duration_since(SystemTime::UNIX_EPOCH)
            .unwrap()
            .as_secs()
    ))?;

    let batch_size = 100
    let counter = 1;

    while let Some(data) = rx.recv() {
        // keep serializing data to writer, assuming a large buffer
        writer.serialize(data).unwrap()

        // flush buffer if we have reached out batch size
        // we don't necessarily wait for buffer to fill up
        if batch_size == counter {
            // keep the writer alive
            writer.flush_buf().unwrap();
            counter = 1;
        } else {
            counter += 1;
        }
    }

    // flush buffer and writer when stream is done
    writer.flush().unwrap()

    Ok(())
}
BurntSushi commented 1 year ago

That isn't minimal because I can't run it. And this part:

            // keep the writer alive
            writer.flush_buf().unwrap();

doesn't make any sense to me. writer.flush() also keeps the writer "alive."

okjodom commented 1 year ago

That isn't minimal because I can't run it.

ack. TIL how to write a minimal reproducible example, though this doesn't seem quite minimal

use std::{io, error::Error};

use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;

#[derive(Debug, Serialize, Deserialize)]
struct Data {
    foo: String,
    bar: String,
}

#[tokio::main]
async fn main() {
    // data channel
    let (tx, rx) = mpsc::channel(100);

    // generate and stream data
    tokio::spawn(async move {
        loop {
            let data = Data {
                foo: "Hello".into(),
                bar: "World".into(),
            };

            if let Err(_) = tx.send(data).await {
                println!("receiver dropped");
                break;
            }

            let _ = tokio::time::sleep(std::time::Duration::from_secs(1));
        }
    });

    print_data_stream(rx).await.unwrap()
}

async fn print_data_stream(mut rx: mpsc::Receiver<Data>) -> Result<(), Box<dyn Error>> {
    let mut wtr = csv::Writer::from_writer(io::stdout());

    let batch_size = 100;
    let mut counter = 1;

    while let Some(data) = rx.recv().await {
        wtr.serialize(data)?;

        // flush buffer if we have reached out batch size
        if batch_size == counter {
            wtr.flush()?;
            // writer.flush_buf()?; <---- should I be doing this
            counter = 1;
        } else {
            counter += 1;
        }
    }
    wtr.flush()?;

    Ok(())
}

And this part:

            // keep the writer alive
            writer.flush_buf().unwrap();

doesn't make any sense to me. writer.flush() also keeps the writer "alive."

I see what you mean now. I was able to use Writer::flush() to satisfy my scenario, whereas I was previously convinced I needed Writer::flush_buf

From the method docs,

"Note that this also flushes the underlying writer."

seemed like the writer would be "destroyed" on flush(). Still, I should have validated this anyways

BurntSushi commented 1 year ago

Still, I should have validated this anyways

Yes indeed. Please at least try the suggestion from the maintainer next time.

seemed like the writer would be "destroyed" on flush()

"flush the writer" does not mean "destroy the writer." If the writer were destroyed, then what would happen on the next write? csv::Writer doesn't know how to create a writer at that point. It requires the caller to provide a writer.

"flush the writer" literally just means taking whatever contents are in memory and shoving them down a layer of abstraction.