apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.88k stars 4.27k forks source link

Canceling a timer by ID is not yet supported in InMemoryTimerInternals #21378

Closed damccorm closed 2 years ago

damccorm commented 2 years ago

Got the following exception with spark runner and GroupIntoBatches transform:

Caused by: java.lang.UnsupportedOperationException: Canceling a timer by ID is not yet supported.     at org.apache.beam.runners.core.InMemoryTimerInternals.deleteTimer(InMemoryTimerInternals.java:158)     at org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.clear(SimpleDoFnRunner.java:1198)     at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.processElement(GroupIntoBatches.java:474)

 

The InMemoryTimerInternals.deleteTimer with a timeDomain is not supported:

public void deleteTimer( StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) { throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported."); }

However, this function is called by SimpleDoFnRunner$TimerInternalsTimer.clear:

public void clear() { this.timerInternals.deleteTimer(this.namespace, this.timerId, this.timerFamilyId, this.spec.getTimeDomain()); }

Which is called by GroupIntoBatches$GroupIntoBatchesDoFn.processElement.

 

Imported from Jira BEAM-14018. Original Jira may contain additional context. Reported by: wenqing.yang.

mosche commented 2 years ago

Same issue apparently exists for the Flink runner.

From user@beam

When running on Flink, eventually it hits an unsupported exception "Canceling a timer by ID is not yet supported." on this line [1]. The source inputs are AVRO files for testing (batch) but will use kafka topics (streaming) when deployed.

This happens when the batch is filled (10 items) and the max buffering time timer needs to be cancelled.

Anyone else observed this issue?

On a related note. Looks like the FlinkStatefulDoFnFunction [2] uses InMemoryTimerInternals. Curious why is FlinkTimerInternals not used? I would guess there's a difference between batch and streaming requirements?

[1] https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L157 [2] https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java#L136 [3] https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1314

TheNeuralBit commented 2 years ago

Is this not exercised in any VR tests?

mosche commented 2 years ago

Unfortunately not, I'll address that as well