Open damccorm opened 2 years ago
Can reproduce this issue after migrating a GroupByKey.create()
to GroupIntoBatches.ofSize(size).withShardedKey()
:
java.lang.IllegalStateException: org.apache.beam.runners.dataflow.worker.SimpleParDoFn@8ffcc6c received state cleanup timer for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@71ffb928 that is before the appropriate cleanup time 294247-01-10T04:00:54.776Z
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:397)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:79)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:460)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:483)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:359)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:94)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1445)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1120)
at org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Is there any update regarding this issue?
@bvolpato Can this be fixed by not using GroupIntoBatches?
Is there any update regarding this issue?
https://github.com/apache/beam/pull/33037 should fix this in 2.61.0
Hello, I have a use case where I have two sets of PCollections (RecordA and RecordB) coming from a real time streaming source like Kafka. Both Records are correlated with a common key, let's say KEY. The purpose is to enrich RecordA with RecordB's data for which I am using CoGbByKey. Since RecordA and RecordB for a common key can come within 1-2 minutes of event time, I am maintaining a sliding window for both records and then do CoGpByKey for both PCollections. The sliding windows that will find both RecordA and RecordB for a common key KEY, will emit enriched output. Now, since multiple sliding windows can emit the same output, I finally remove duplicate results by feeding aforementioned outputs to a global window where I maintain a state to check whether output has already been processed or not. Since it is a global window, I maintain a Timer on state (for GC) to let it expire after 10 minutes have elapsed since state has been written. This is working perfectly fine w.r.t the expected results. However, I am unable to stop job gracefully i.e. Drain the job gracefully. I see following exception: java.lang.IllegalStateException: org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 received state cleanup timer for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) java.lang.IllegalStateException: org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 received state cleanup timer for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) java.lang.IllegalStateException: org.apache.beam.runners.dataflow.worker.SimpleParDoFn@4316932b received state cleanup timer for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) My code snippet: PCollection<KV<MyKey, RecordA>> windowedRecordA = incompleteRecordALogs.apply("Applying_Sliding_Window_RecordA", Window.<KV<MyKey, RecordA>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45))) .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes()); PCollection<KV<MyKey, RecordB>> windowedRecordB = recordBLogs.apply("Applying_Sliding_Window_RecordB", Window.<KV<MyKey, RecordB>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45))) .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes());PCollection<KV<MyKey, CoGbkResult>> coGbkRecords = KeyedPCollectionTuple.of(TagRecordA, windowedRecordA) .and(TagRecordB, windowedRecordB) .apply("CoGroupByKey", CoGroupByKey.create()); PCollection<RecordA> enrichedRecordA = coGbkRecords.apply("EnrichRecordAWithRecordB", new EnrichIncompleteRecordA()); class EnrichIncompleteRecordA extends PTransform<PCollection<KV<MyKey, CoGbkResult>>, PCollection<RecordA>> { @Override public PCollection<RecordA> expand(PCollection<KV<MyKey, CoGbkResult>> input) { logger.info("Enriching Incomplete RecordA with RecordB"); return input .apply("Add_RecordBInfo_To_RecordA", ParDo.of(new AddRecordBData())) .apply("Applying_Windowing", Window.<KV<MyKey, RecordA>>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .discardingFiredPanes()) .apply("Emit_Unique_RecordA", ParDo.of(new EmitUniqueRecordA()));
}
private class AddRecordBData extends DoFn<KV<MyKey, CoGbkResult>, KV<MyKey, RecordA>> { @Setup public void setup() { }
@StartBundle public void startBundle() {
}
@ProcessElement public void processElement(@Element KV<MyKey, CoGbkResult> input, OutputReceiver<KV<MyKey, RecordA>> out) { Iterable<RecordA> allRecordALogs = input.getValue().getAll(TagRecordA); Iterable<RecordB> allRecordBLogs = input.getValue().getAll(TagRecordB);
/ There should be max 1 RecordB per MyKey / if (allRecordALogs.iterator().hasNext() && allRecordBLogs.iterator().hasNext()) { RecordB recordB = Iterables.getFirst(allRecordBLogs, null); for (RecordA recordA : allRecordALogs) { if (null != recordB) { logger.info("Enriching incomplete recordA [{}] with recordB: [{}]", recordA, recordB); <code to populate recordA object with recordB data> out.output(KV.of(input.getKey(), recordA)); } else { logger.error("No recordB available for recordA log [{}]", recordA); } } } else { logger.info("Either recordA or recordB not present for myKey: {}", input.getKey()); } }
@FinishBundle public void finishBundle() {
}
@Teardown public void teardown() { } }
private class EmitUniqueRecordA extends DoFn<KV<MyKey, RecordA>, RecordA> { @Setup public void setup() { }
@StartBundle public void startBundle() { }
@StateId("processedRecordA") private final StateSpec<ValueState<RecordA> processedRecordASpec = StateSpecs.value(AvroCoder.of(RecordA.class));
@TimerId("stateExpiry") private final TimerSpec stateExpirySpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@ProcessElement public void processElement(@Element KV<MyKey, RecordA> input, OutputReceiver<RecordA> out, @StateId("processedRecordA") ValueState<Set<RecordA>> processedRecordAState, @TimerId("stateExpiry") Timer stateExpiryTimer) { << code to check if recordA has already been processed by checking state >> if (recordA need to be emitted) { processedRecordAState.write(processedRecordASet); stateExpiryTimer.offset(Duration.standardMinutes(10)).setRelative(); logger.info("Emitting unique recordA {} for myKey {}", recordA, myKey); out.output(input.getValue()); } }
@OnTimer("stateExpiry") public void onExpiry( OnTimerContext context, @StateId("processedRecordA") ValueState<RecordA> processedRecordAState) { logger.info("Expiring State after timer expiry"); processedRecordAState.clear(); }
@FinishBundle public void finishBundle() { }
@Teardown public void teardown() { } } }
Imported from Jira BEAM-10053. Original Jira may contain additional context. Reported by: mohilkhare.