haydnv / destream_json

Rust library for encoding and decoding a JSON stream
Apache License 2.0
7 stars 3 forks source link

big chunks causes performance issues #7

Open AliBasicCoder opened 1 week ago

AliBasicCoder commented 1 week ago

hi, i'm interested in this repo so i was was curios about performance so i wrote the following benchmark code

use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::stream;
use std::hint::black_box;

// a random json file 13.8 MB
const DATA: &str = include_str!("generated.json");

fn destream_json(c: &mut Criterion) {
    use criterion::async_executor::FuturesExecutor;
    use futures::StreamExt;
    static KB: usize = 1_000;

    let mut group = c.benchmark_group("destream_json");

    async fn test(chunk_size: usize) {
        let source = stream::iter(DATA.as_bytes().iter().copied())
            .chunks(chunk_size)
            .map(Bytes::from)
            .map(Result::<Bytes, destream_json::de::Error>::Ok);

        #[allow(unused_variables)]
        let result: Result<destream_json::Value, destream_json::de::Error> =
            black_box(destream_json::try_decode((), source).await);
    }

    group.sample_size(10);
    for size in [10, 1 * KB, 10 * KB, 30 * KB, 50 * KB, 100 * KB].iter() {
        group.bench_with_input(BenchmarkId::from_parameter(size), size, |b, &size| {
            b.to_async(FuturesExecutor).iter(|| test(size))
        });
    }
    group.finish();
}

criterion_group!(benches, destream_json);
criterion_main!(benches);

result:

destream_json/10        time:   [466.36 ms 468.73 ms 471.58 ms]
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) high mild
destream_json/1000      time:   [365.20 ms 365.41 ms 365.60 ms]
                        change: [+0.1766% +0.2495% +0.3275%] (p = 0.00 < 0.05)
                        Change within noise threshold.
destream_json/10000     time:   [489.91 ms 490.05 ms 490.18 ms]
                        change: [-1.0070% -0.9566% -0.9109%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Benchmarking destream_json/30000: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 7.9s.
destream_json/30000     time:   [786.69 ms 787.16 ms 787.70 ms]
                        change: [-0.9410% -0.8630% -0.7688%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) high mild
Benchmarking destream_json/50000: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 13.1s.
destream_json/50000     time:   [1.3026 s 1.3043 s 1.3059 s]
                        change: [-1.3058% -1.1579% -1.0197%] (p = 0.00 < 0.05)
                        Performance has improved.
Benchmarking destream_json/100000: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 25.6s.
destream_json/100000    time:   [2.5586 s 2.5616 s 2.5649 s]
                        change: [-0.6785% -0.3662% -0.0861%] (p = 0.02 < 0.05)
                        Change within noise threshold.

as you can see performance is greatly reduced the bigger the chunks are

my machine is i3-8100 cpu, no external gpu runs ubuntu

cargo 1.81.0 (2dbb1af80 2024-08-20)
rustc 1.81.0 (eeb90cda1 2024-09-04)
haydnv commented 5 days ago

Hi Ali, thank you for this contribution! I suspect the discrepancy may be due to the memory allocation time for the chunks. Could you try allocating a single Vec at initialization time and reusing it for each chunk, or using borrowed slices as chunks instead? You could do this by calling futures::stream::iter(data.chunks(chunk_size)).

cf. https://doc.rust-lang.org/std/primitive.slice.html#method.chunks https://docs.rs/futures/latest/futures/stream/fn.iter.html

AliBasicCoder commented 5 days ago

tried this

let source = stream::iter(DATA.chunks(chunk_size))
            .map(Bytes::from_static)
            .map(Result::<Bytes, destream_json::de::Error>::Ok);

no effect

tried this

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::stream;
use std::hint::black_box;

const DATA: &'static [u8] = include_bytes!("generated.json");

fn destream_json(c: &mut Criterion) {
    use bytes::Bytes;
    use criterion::async_executor::FuturesExecutor;
    use futures::StreamExt;
    static KB: usize = 1_000;

    let mut group = c.benchmark_group("destream_json");

    async fn test(chunks: Vec<Result<Bytes, destream_json::de::Error>>, chunk_size: usize) {
        let source = stream::iter(chunks);

        #[allow(unused_variables)]
        let result: destream_json::Value =
            black_box(destream_json::try_decode((), source).await.unwrap());
    }

    group.sample_size(10);
    for size in [10, 1 * KB, 10 * KB, 30 * KB, 50 * KB, 100 * KB].iter() {
        let mut chunks: Vec<Result<Bytes, destream_json::de::Error>> = DATA
            .chunks(*size)
            .map(Bytes::from_static)
            .map(Result::<Bytes, destream_json::de::Error>::Ok)
            .collect();

        group.bench_with_input(BenchmarkId::from_parameter(size), size, |b, &size| {
            b.to_async(FuturesExecutor)
                .iter(|| test(std::mem::take(&mut chunks), size))
        });
    }
    group.finish();
}

panics unexpected end of stream

haydnv commented 4 days ago

Ok in that case it does seem like there's an issue. Could you submit a PR with your code for debugging purposes? Please don't include the "generated.json" file itself but consider including the code used to generate it.