apache / arrow-rs

Official Rust implementation of Apache Arrow
https://arrow.apache.org/
Apache License 2.0
2.31k stars 682 forks source link

Support for Async CSV Writer #3740

Closed metesynnada closed 1 year ago

metesynnada commented 1 year ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

I want an async CSV writer to use it in an async context. The current implementation is blocking even if we are in the tokio environment.

Describe the solution you'd like A writer using tokio::io::AsyncWrite might be the solution.

Describe alternatives you've considered NA

Additional context Na

tustvold commented 1 year ago

I wonder if this could be achieved by simply writing a batch to an in-memory Vec using the current "blocking" writer, and then flushing the output to an async output. This would be more flexible, and likely significantly faster than an approach that integrates async at a lower level.

metesynnada commented 1 year ago

I wonder if this could be achieved by simply writing a batch to an in-memory Vec using the current "blocking" writer, and then flushing the output to an async output. This would be more flexible, and likely significantly faster than an approach that integrates async at a lower level.

I think this requires constantly creating a "blocking" writer for each record batch since it will own the in-memory Vec. The API for reaching the internal buffer is self.writer.into_inner() which uses mem::replace(self, None).

I couldn't think of a solution on how to keep buffer ownership while writing with the usual Writer. Do you have any idea how I can code that?

Btw, I verified the performance degradation, I agree with you that CPU-bound computations like serialization shouldn't be async since there is no gain. I am trying to isolate the IO-bound operation (flush) async as you said.

tustvold commented 1 year ago

I couldn't think of a solution on how to keep buffer ownership while writing with the usual Writer

Perhaps something like (not tested)

let mut buffer = Vec::with_capacity(1024);

while let Some(batch) = stream.next().await.transpose()? {
    let writer = Writer::new(&mut buffer);
    writer.write(&batch)?;
    std::mem::drop(writer);
    flush(&buffer).await?;
    buffer.clear();
}

Whilst creating the Writer each time is perhaps a little unfortunate, I am skeptical it will be relevant when amortised over the cost of encoding an entire batch - CSV encoding is very expensive.

metesynnada commented 1 year ago

I made a benchmark for record batch writing by 3 cases. Ordinary writer, async writer, and buffered async writer (current discussion)

For low batch size (10) - batch count (1000) with the usual schema.

          Running benches/buffer_bench.rs (target/release/deps/buffer_bench-f7b991c50b44d14d)
async                   time:   [18.417 ms 18.942 ms 19.526 ms]
                        change: [-75.407% -74.625% -73.930%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe

async_buffered          time:   [17.519 ms 17.718 ms 18.016 ms]
                        change: [-75.844% -75.540% -75.034%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 15 outliers among 100 measurements (15.00%)
  8 (8.00%) high mild
  7 (7.00%) high severe

sync                    time:   [11.640 ms 12.062 ms 12.532 ms]
                        change: [-85.176% -84.617% -83.999%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 22 outliers among 100 measurements (22.00%)
  5 (5.00%) high mild
  17 (17.00%) high severe

For larger batch sizes (1000) - batch count (100) with the usual schema.

Benchmarking async: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 7.7s, or reduce sample count to 60.
async                   time:   [74.173 ms 74.648 ms 75.173 ms]
                        change: [+490.63% +496.74% +502.77%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 3 outliers among 100 measurements (3.00%)
  2 (2.00%) high mild
  1 (1.00%) high severe

Benchmarking async_buffered: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 7.3s, or reduce sample count to 60.
async_buffered          time:   [72.195 ms 72.435 ms 72.695 ms]
                        change: [+623.27% +627.13% +630.93%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 13 outliers among 100 measurements (13.00%)
  1 (1.00%) low mild
  8 (8.00%) high mild
  4 (4.00%) high severe

Benchmarking sync: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 7.9s, or reduce sample count to 60.
sync                    time:   [78.149 ms 78.415 ms 78.772 ms]
                        change: [+730.79% +735.48% +740.48%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 11 outliers among 100 measurements (11.00%)
  4 (4.00%) low mild
  7 (7.00%) high severe

For the usual batch size (4096) - batch count (100) with the usual schema.

     Running benches/buffer_bench.rs (target/release/deps/buffer_bench-f7b991c50b44d14d)
Benchmarking async: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 30.4s, or reduce sample count to 10.
async                   time:   [300.46 ms 301.73 ms 303.03 ms]
                        change: [+1443.9% +1493.0% +1538.6%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 3 outliers among 100 measurements (3.00%)
  3 (3.00%) high mild

Benchmarking async_buffered: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 28.1s, or reduce sample count to 10.
async_buffered          time:   [279.85 ms 281.20 ms 282.96 ms]
                        change: [+1459.3% +1487.1% +1508.2%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 6 outliers among 100 measurements (6.00%)
  6 (6.00%) high severe

Benchmarking sync: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 31.1s, or reduce sample count to 10.
sync                    time:   [310.66 ms 311.43 ms 312.36 ms]
                        change: [+2385.1% +2481.8% +2576.3%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 7 outliers among 100 measurements (7.00%)
  3 (3.00%) high mild
  4 (4.00%) high severe

I think the buffered version is also scalable immediately into JSON and AVRO:

pub struct BufferWriter<W: AsyncWrite + Unpin + Send> {
    writer: BufWriter<W>,
    buffer: Vec<u8>
}

impl<W: AsyncWrite + Unpin + Send> BufferWriter<W> {
    pub fn new(writer: W) -> Self {
        BufferWriter {
            writer: BufWriter::new(writer),
            buffer: Vec::with_capacity(4096)
        }
    }

    pub async fn write(&mut self, batch: &RecordBatch, header: bool) -> Result<(), ArrowError> {
        {
            let mut inner_writer = Writer::new_with_header(&mut self.buffer, header);
            inner_writer.write(batch)?;
        }
        self.writer.write_all(&self.buffer).await?;
        self.writer.flush().await?;
        self.buffer.clear();
        Ok(())

    }
}

If you are also satisfied with the result of buffered version, I will add this functionality into CSV and JSON cc @tustvold.

tustvold commented 1 year ago

If you are also satisfied with the result of buffered version

The performance across all seems to be basically comparable, it would be interesting to see a profile, but I suspect the difference is in the sizing of the intermediate buffer, which will be highly dependent on the destination sink as to what the optimal size is.

If you are also satisfied with the result of buffered version, I will add this functionality into CSV and JSON

Thus far we have managed to avoid async within arrow-rs, and I think this encourages a nice separation of compute and IO. What do you think about adding this functionality instead to DataFusion and perhaps just adding a doc comment to arrow-rs showing how it can be done?

e.g. something like (not tested)

async fn write_async<I, F, Fut>(batches: I, flush: F) -> Result<(), ArrowError> where I: IntoIterator<Item=RecordBatch>, F: Fn(&[u8]) -> Fut, Fut: Future<Output=Result<(), ArrowError> {
    let mut buffer = Vec::with_capacity(4096);
    for batch in batches {
        {
            let mut writer = Writer::new(&mut buffer);
            writer.write(batch)?;
        }
        flush(&buffer).await?;
        buffer.clear()
    }
}
metesynnada commented 1 year ago

I am OK with the separation. The main idea behind adding AsyncWrite support was using the object store's put and put_multipart APIs in Datafusion.

https://github.com/apache/arrow-rs/blob/350867436ab3477bafd7008355286378ab37045f/object_store/src/lib.rs#L191-L217

I was planning to add a new API like put_multipart which returns only Box<dyn AsyncWrite + Unpin + Send> to support single files without multipart support. What do you think about this feature?

It would look like

async fn put_singlepart(
        &self,
        location: &Path,
    ) -> Result<Box<dyn AsyncWrite + Unpin + Send>>;

Consider the put_multipart implementation of the local file system, which generates a temporary file and finalizes its name upon reaching the LocalUploadState::ShuttingDown stage to complete the writing process. However, when dealing with FIFO cases, what would you recommend? Would it be still appropriate to use the object_store for individual files? In Datafusion, my approach would be to verify if the underlying source is unbounded and, if so, utilize the put_singlepart().

tustvold commented 1 year ago

What do you think about this feature?

Store specific functionality, let alone operating specific functionality, doesn't seem like a great fit for the object store crate. Python fsspec which is more filesystem focused doesn't support them either.

I'm not familiar with your use-case, but trying to shoehorn streaming through OS-specific filesystem APIs seems a little odd to me. Is there a reason you opted for such an approach over a custom operator? This would also allow generalising to streaming brokers like Kafka, Kinesis, etc... and potentially using things like unix domain sockets which have better async stories?

metesynnada commented 1 year ago

I'm not familiar with your use case, but trying to shoehorn streaming through OS-specific filesystem APIs seems a little odd to me.

Suppose you read a ListingTable in Datafusion. You need to use the object-store crate to read the data. More specifically, you would use the get API and it would produce a ByteStream (futures::stream) and you can consume it.

I was looking for a put API similar to this logic, an async writer. As you said, we will be looking for extending this feature to streaming brokers like Kafka, Kinesis, etc. FIFO was a way to unify stream and batch approaches. This is why I was looking for ways for extending the object-store crate.

Overall, we believe that overfitting the batch solutions is mostly avoidable. However, I understand your concern.

tustvold commented 1 year ago

we believe that overfitting the batch solutions is mostly avoidable

I don't think this is avoidable, arrow is a columnar data format, it fundamentally assumes batching to amortise dispatch overheads. Row-based streaming would require a completely different architecture, likely using a JIT?

FWIW kafka and kinesis aren't really streaming APIs, under the hood they rely on aggressive batching for throughput

Suppose you read a ListingTable in Datafusion. You need to use the object-store crate to read the data

I am aware, I wrote a lot of that logic, my confusion is why this is the point of integration, instead of say a custom operator or TableProvider? This would be more flexible and would avoid all these issues?

ListingTable is intended for the case of ad-hoc querying files in the absence of a catalog, I would expect service workloads to always make use of some sort of catalog be it Hive MetaStore, Glue, Delta, Lakehouse, etc...

ozankabak commented 1 year ago

I don't think this is avoidable, arrow is a columnar data format, it fundamentally assumes batching to amortise dispatch overheads. Row-based streaming would require a completely different architecture, likely using a JIT?

@tustvold, I think there is maybe some terminology-related confusion going on here w.r.t. batching. I am sure @metesynnada was not trying to say he wants to avoid batching in its entirety. I think what he envisions (albeit maybe not conveyed clearly) is simply an API that operates with an async writer so that non-IO operations can carry on when the actual write to the object store is taking place.

The current API (i.e. the put function) is already async and it performs the actual write in a separate thread AFAICT. If this is indeed true, it already doesn't stop the other non-IO operations. Given that we want to serialize synchronously for performance reasons, then it doesn't really matter where we do it -- the API seems sufficient to me as is. I just had a discussion with @metesynnada on this, he seems to agree and can comment further on this if I'm missing something.

Given that we are analyzing this part of the code, one good thing we can do is to investigate whether avoiding the new IO thread and using async primitives to do the actual writing within the same thread makes sense. I am not entirely sure what the advantages/disadvantages of doing that will be. @metesynnada can do some measurements to quantify this. Maybe you can share the reasoning behind the current choice?

tustvold commented 1 year ago

using async primitives to do the actual writing within the same thread

FWIW tokio doesn't support non-blocking filesystem IO, tokio-uring is still experimental, so it will always dispatch to a separate blocking threadpool. This was what I was alluding to when I suggested sockets might be a more compelling primitive than using filesystem APIs, as they support epoll.

The current API (i.e. the put function) is already async

This is true, but each put creates a new file, overwriting anything already present, which I suspect will be a problem?

ozankabak commented 1 year ago

OK, so we will need proper tokio support before we can investigate thread vs. async primitives distinction w.r.t. files, so let's wait on that. For handling streaming brokers, a custom operator leveraging a socket-based approach makes sense, I will think about that.

This is true, but each put creates a new file, overwriting anything already present, which I suspect will be a problem?

Right. A simple mechanism to choose between overwrite/append behaviors should be enough for @metesynnada's purposes. Everything is already async. Any suggestions on how we can add this capability?

alamb commented 1 year ago

For more context, I believe the use case for this feature may be related to https://github.com/apache/arrow-datafusion/issues/5130

Having a "native" async csv writer in arrow-rs I can see being a compelling UX feature (aka it would make it easier to use and integrate). However, I don't know how much easier it would be and how much maintenance it would require.

I really like the idea, mentioned in a few places on this thread, of creating an async CSV writer by wrapping the existing blocking / batching one outside of arrow-rs initially (I don't think there is any reason this can not be done).

Once we have the actual code and an an example use of it it, then we will be in a much better position to evaluate how it performs compared to a "native" async writer, how it is actually used, and if we should reconsider incorporating it into arrow-rs

metesynnada commented 1 year ago

cc @tustvold , I introduced a new API for objects that support append. What do you think about it?

tustvold commented 1 year ago

What do you think about it

I'm not a massive fan of introducing an abstraction to an object_store crate that few if any actual object stores support... Perhaps @alamb might be able to facilitate a synchronous chat if you would be amenable? I think we may be able to facilitate your use-case without needing to graft it to filesystem APIs

_Edit: Whilst Azure does have an Append Blob concept, I think we would need to bifurcate the API as the way you interact with them is different. The general approach of objectstore has been to expose the common functionality across multiple backends, it is unclear how we could do this in a portable manner

metesynnada commented 1 year ago

A synchronous chat would be OK.

Meanwhile, you can check https://github.com/apache/arrow-datafusion/issues/5130 for more insight into the use case. In short, we want to support appending files (only supporting ones).

tustvold commented 1 year ago

label_issue.py automatically added labels {'arrow'} from #2

tustvold commented 1 year ago

label_issue.py automatically added labels {'object-store'} from #2