apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

[Bug]: Possibly unnecessary prefetch during GroupIntoBatches #26395

Open nbali opened 1 year ago

nbali commented 1 year ago

What happened?

As I was inspecting why the Dataflow "Total streaming data processed" metric was so much higher (2-4× depending on the pipeline) than the actual data that was being processed as it only has one shuffle/gbk/gib/etc transform I stumbled upon something that I couldn't justify being there: https://github.com/apache/beam/blob/f549fd33abdc672143ccbe3f0f66104995d30fe6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L566-L569

I might be missing some context here, but I'm not even sure what benefit this provides for any generic GroupIntoBatches transform, but it seems to be especially useless if called on a GroupIntoBatches.WithShardedKey (when every data the batch contains comes from the same worker already).

I would appreciate any insight before I submit a PR to change this.

Issue Priority

Priority: 3 (minor)

Issue Components

nbali commented 1 year ago

.

Abacn commented 1 year ago

Thanks for reporting this. This if block exists at the first place when GroupIntoBatches was implemented (#2610), assuming there was some reason for that. WithShardedKey was added some time later (still 3y ago). Haven't looked into detail, would like to dig into it.

"Total streaming data processed" metric was so much higher (2-4× depending on the pipeline) than the actual data

Have you tested that this is caused by the pointed code path? Would be nice if there is some benchmark data to share with

nbali commented 1 year ago

"assuming there was some reason for that" I assume too, but there is literally no comment or discussion about it anywhere I could find. Meanwhile when I just think it through I can't find any justification for it.

"Have you tested that this is caused by the pointed code path? Would be nice if there is some benchmark data to share with" Kinda. I didn't launch any pipeline with modified code to pinpoint this at 100% certainty though.

Also I don't think my pipeline result can be considered good enough for benchmarking regarding consistency and straightforwardness.

... but basically what I did was that I ran the pipelines on the same kafka streams with different configs concurrently. So theoretically the "shuffled" data amount should be equal. It wasn't. Meanwhile everything else (read data from kafka, written data into bq, etc) were essentially identical. My original goal wasn't to test this, but to actually optimize the pipelines by using non-default config values as the default seemed rather unoptimized based on the GCS storage usage pattern, and average batch size, and sometimes the pipeline couldn't even scale down due to the estimated backlog size (when cpu was clearly available)

In order to do that I was playing with these configs:

options.setMaxBufferingDurationMilliSec(...);
options.setGcsUploadBufferSizeBytes(...);

When I increased them - as expected - the behaviour, cpu utilization, storage usage pattern, etc changed in a way that corresponds with having bigger batches, but I noticed increased costs due to the "processed data amount". So it was obvious for some reason it handles data differently, so I started checking the code what it might be, and this seems like the only thing that could cause this.

For example one run like I mentioned: Pipeline1:

Pipeline2:

nbali commented 1 year ago

@Abacn I have created two PRs for the two most likely implementations, feel free to pick what you prefer :)

Abacn commented 1 year ago

Thanks for sharing benchmark result! Let me help find people with experience to review

Abacn commented 1 year ago

Also thanks for being persistent. I am also looking into it in the mean time, and the benchmark result looks impressive. To my understanding, at least there is no harm to add a parameter to the GroupIntoBatches PTransform specifically, something like GroupIntoBatches.disablePreFetch

nbali commented 1 year ago

Before I created the PRs I was thinking what option to choose, and I decided against having a specific method on the GroupIntoBatches (GIB from now) itself for various reasons:

So to finally sum it up, either stateful processing needs it, and we need it everywhere, or it doesn't and we don't need it at all either. I saw no reason not to so I would have just removed it completely, but I also created the experiment version, just so we can release/use it without potentially breaking a release.

Abacn commented 1 year ago

yeah, thanks, get it. Experimental flag sounds good in turns of not changing the behavior of the default config.

I am trying to run some test also in this weekend.

Abacn commented 1 year ago

CC: @kennknowles (from file history) may have insight

Abacn commented 1 year ago

Update: I tested #26618 with a test pipeline and found no noticable change with the switch on/off. Basically I am generating a steady stream of 100k element per second and each of 1 kB, so it is 100 MB/s throughput.

pipeline option provided: -Dexec.args="--project=*** --tempLocation=gs://***/temp --region=us-central1 --runner=DataflowRunner --numWorkers=5 --autoscalingAlgorithm=NONE --dataflowWorkerJar=/Users/***beam/runners/google-cloud-dataflow-java/worker/build/libs/beam-runners-google-cloud-dataflow-java-legacy-worker-2.48.0-SNAPSHOT.jar --experiments=enable_streaming_engine"

and add the disable prefetch experiments option.

Both jobs are drained after 15 min.

Metric prefetch disabled prefetch enabled (current master)
Elapsed time 19 min 23 sec 19 min 10 sec
elements_read 28,003,335 54,323,517
Total streaming data 84.76 GB 167.62 GB
ratio (kB / element) 3.03 3.09

Both settings having similar ratio (3kB per element, which is 3 times the actual data size), however, disabling the prefetch show significant regression on the through put, and causing surging backlog:

Backlog:

(pregetch disabled)

image

(prefetch enabled)

image

@nbali Have you been able to test your changed code with a pipeline? To test modified java-core code with your pipeline, one can build both sdks-java-core and dataflow worker jar (for runner v1) sth like

./gradlew -Ppublishing  sdks:java:core:jar
./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar

and use the compiled jar as the dependency of java core, and pass the shadow worker jar to dataflow as pipeline option above.


test pipeline:


public class GroupIntoBatchesTest {

  public static void main(String[] argv) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(argv).withValidation().as(PipelineOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(GenerateSequence.from(0).withRate(100000, Duration.standardSeconds(1)))
        .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
        .apply(MapElements.via(new MapToKVFn(1000)))
        .apply(GroupIntoBatches.ofSize(50000))
        .apply(ParDo.of(new DoFn<KV<Integer, Iterable<byte[]>>, Void>(){
          @ProcessElement
          public void procees(ProcessContext ctx) {
            System.out.println("grouped key:" + Objects.requireNonNull(ctx.element()).getKey());
          }
        }));
    p.run();

  }

  static class MapToKVFn extends SimpleFunction<Long, KV<Integer, byte[]>> {

    private transient Random rd;

    private final int valueSize;

    public MapToKVFn(int valueSize) {
      this.valueSize = valueSize;
    }

    @Override
    public KV<Integer, byte[]> apply(Long input) {
      if (rd == null) {
        rd = new Random();
      }
      byte[] data = new byte[valueSize];
      rd.nextBytes(data);
      return KV.of(Long.hashCode(input)%10, data);
    }
  }
}
nbali commented 1 year ago

I haven't had the time yet - I will try to make time in the upcoming days -, but your example has a theoretical maximum at 100k element/sec, executed for 900s, meanwhile it's only 28M and 54M. Which means it was 30-60k/sec. The hashing split it into 10 keys. So a single key received about 3-6k msg/sec. The windowing essentially closed every batch after 10s. So a window for a key received 30-60k element, which indicates totally different amount of prefetching (even if both are using the master version), yet the data amount is the same. Seems odd to me. Do you have the input/output stats of the GIB transform? Also over a minute delays with both cases with 10s windowing? It's seems CPU limited. I mean isn't this essentially single threaded?

nbali commented 1 year ago

@Abacn Well, I only had time now at work. At least some. I haven't tried running it with the modified runner yet, only just by modifying parameters.

First of all I'm not saying your tests are invalid in any way, but I also did mine and saw huge differences. I didn't mention originally as I thought it's irrelevant and should happen with every GIB- my bad - but where I noticed this is the GroupIntoBatches inside BigQueryIO. More specifically this one: https://github.com/apache/beam/blob/bb044f4863cd151214f61174ac3c097bea724098/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L686-L689

This is what I use on the BQ.Write:

.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(...)
.withAutoSharding()

For the benchmark I removed some irrelevant parts from my pipeline, but kept the core logic of Kafka.Read->BigQuery.Write. First deployed a flex template, then launched the template with different configurations to start consuming.

About the configurations:

That gives me 4 scenario.

The source is a kafka stream with 12 partitions, about 1k elements/sec and 1 MB/sec for every partition.

Reaching the prefetchSize (500'000 / 5) takes typically almost 2 minutes, so if we trigger the batches more often we never call the "prefetch" code. The 2 MB limit most likely triggers it immediately in a few seconds, so we should never "prefetch". The 100 MB limit most likely triggers 1 prefetch (100MB / 1KB ~100'000).

After 22 min of runtime:

As you can see there are huge differences in the results, but also compared to your test pipeline in the code as well, for example you don't use .withShardedKey() and trigger it at a much higher frequency.

For the next run I have increased the 100 MB option to 256 MB to trigger prefetching more (and also switched to n1-highmem-8 from n1-standard-8 just to be sure avoid an OOME). Everything else is the same.

Drained after 40 min:

I could only guess the reason you couldn't see it is that you fire too quickly and it might be still cached? I would modify the rate from 100'000/sec to 10'000/sec for example (so with the key creation, it will be 1'000/sec for every key on average).

When I have time again, I will try with a modified runner.

nbali commented 1 year ago

Quick update, runtime 11h, same as the previous example with the exception that the message size has been reduced to ~1/10, but I process every message 10 times, so still 1MB/sec for every partition, but 10k elements/sec.

nbali commented 1 year ago

@Abacn Tested with the modified GroupIntoBatches transform too, and it didn't influence the processed data amount for me either in any of the tested scenarios. Something is still causing non-linear increase in the processed data amount, but this isn't it.

Abacn commented 10 months ago

We identified a cause of large ratio of (shuffled data) / (processed data): #27283 which is recently fixed in #29517. It may/may not related to this use case

nbali commented 10 months ago

@Abacn I see how that would cause a similar pattern in unexpected shuffled data amount, and I also get it how that change fixes that issue, but what I can't see is where do we use an Iterable in the tested scenarios that would cause this? When checking GroupIntoBatchesDoFn the obvious pick would be BagState, but that is what this test was about, and we clearly didn't read it. Looking at the other states (CombiningState, ValueState) nothing really sticks out for me... OR is that the registerByteObserver() was called regularly on the BagState's Iterable content by the framework and although we clearly didn't explicitly read it, it was read as this unintended side-effect therefore causing the costs?

EDIT: Nvm, did a quick test. The shuffled data is still non linear. The diff between 1 MB + 1 minute vs 256MB + 1 hour is over 2*.