delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.35k stars 414 forks source link

perf: batch json decode checkpoint actions when writing to parquet #2983

Closed alexwilcoxson-rel closed 2 weeks ago

alexwilcoxson-rel commented 2 weeks ago

Description

This change pushes more serialized json actions into the decoder before flushing. For a log with 10s of thousands of actions, the current implementation took ~18 seconds, this change dropped it to 3.

Related Issue(s)

n/a

Documentation

https://docs.rs/arrow-json/53.2.0/arrow_json/reader/struct.Decoder.html#method.decode

codecov[bot] commented 2 weeks ago

Codecov Report

Attention: Patch coverage is 66.66667% with 3 lines in your changes missing coverage. Please review.

Project coverage is 72.27%. Comparing base (7a3b3ec) to head (4add802). Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
crates/core/src/protocol/checkpoints.rs 66.66% 0 Missing and 3 partials :warning:
Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #2983 +/- ## ======================================= Coverage 72.26% 72.27% ======================================= Files 128 128 Lines 40329 40334 +5 Branches 40329 40334 +5 ======================================= + Hits 29143 29150 +7 + Misses 9334 9331 -3 - Partials 1852 1853 +1 ```

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

hntd187 commented 2 weeks ago

Does it make sense to tie it to record batch size? Could we instead pull that into it's own configuration or at least constant?

alexwilcoxson-rel commented 2 weeks ago

Does it make sense to tie it to record batch size? Could we instead pull that into it's own configuration or at least constant?

I think if its less than the checkpoint batch size, then the decode and flush will yield a batch < the checkpoint batch size constant. For example

    const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000;

    let mut decoder = ReaderBuilder::new(arrow_schema)
        .with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE)
        .build_decoder()?;

    // Count of actions
    let mut total_actions = 0;

    for chunk in &jsons.chunks(2500) {
        let mut buf = Vec::new();
        // write 2500 serialized json objects to buffer
        for j in chunk {
            serde_json::to_writer(&mut buf, &j?)?;
            total_actions += 1;
        }
        // internally buffers 2500 objects and returns because
        // buf is exhausted even though 2500 < 5000
        let _ = decoder.decode(&buf)?;
        // flush yields 2500 row batch and writes it
        while let Some(batch) = decoder.flush()? {
            writer.write(&batch)?;
        }
    }
hntd187 commented 2 weeks ago

I understand, I guess I'm asking is there any reason a user might want to configure this independently from the batch size like above? If we don't think it's ever gonna happen then we can forgo configuration.