apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

[Bug]: Unable to drain Flink job when RequiresStableInput is used #28554

Closed kkdoon closed 11 months ago

kkdoon commented 1 year ago

What happened?

Issue: Flink pipeline does not get drained when RequiresStableInput annotation is used. Stack-trace when drain is triggered: Caused by: java.lang.RuntimeException: There are still watermark holds left when terminating operator KVStoreTransform/GroupIntoBatches/ParDo(GroupIntoBatches)/ParMultiDo(GroupIntoBatches) -> PersistenceTransform/WriteSpendStateToDb/ParMultiDo(WriteSpendStateDbStreaming) (1/1)#0 Watermark held 1695147850921 at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.flushData(DoFnOperator.java:631) at org.apache.beam.runners.flink.translation.wrappers.streaming.AbstractStreamOperatorCompat.finish(AbstractStreamOperatorCompat.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:239) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:213) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:186) ... 13 more

Cause: When drain is triggered, MAX_WATERMARK gets emitted before the last checkpoint barrier. This helps in triggering all the registered event-time timers and flushing out any state. Therefore, the expectation is that when flushData is finally invoked in DoFnOperator, all the event timers should be fired and the watermark should proceed.

However, when RequiresStableInput annotation is used, the behavior is to process the DoFn after the checkpoint operation is complete. Since, flush is invoked when the final checkpoint/savepoint operation is in progress, the watermark is held by the DoFn with the RequiresStableInput annotation and is waiting for the checkpoint to complete.

Potential Solution: Skip this check in case the DoFn has RequiresStableInput set, since we know that all the final pending data will be processed after the final savepoint operation completes.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

je-ik commented 1 year ago

@dmvk Is it possible that this is bug in Flink? I think there should be checkpoint barrier strictly after emitting MAX_WATERMARK, does it make sense?

je-ik commented 1 year ago

@kkdoon can you please clarify how you do the drain? According to the docs, using stop and drain with savepoint should do all the necessary steps for the pipeline to terminate. Is this not working in your case?

kkdoon commented 1 year ago

I have tried stopping the job using both the CLI (with --drain) as well as via flink RestClusterClient library where advanceToEndOfTime is set to True to drain the job.

Note that when drain is set to false, pipeline is able to successfully stop with savepoint and DoFn with RequiresStableInput annotation does emit the buffered results. There's no error since flushData is not invoked & watermark does not proceed to MAX_WATERMARK.

So this issue is only for the specific scenario when drain flag is used to advance the watermark and permanently stop the pipeline.

je-ik commented 1 year ago

I think I see the problem.

From my understanding, using stop with --drain should result in the following sequence: a) sources stop emitting data b) MAX_WATERMARK is emitted c) savepoint is taken d) job is terminated

The problem is that after the savepoint is taken in step c) we process the data and this results in more elements being sent downstream after the savepoint barrier has passed. This problem should arise only in case when there is a chain of at least two transforms with @RequiresStableInput. Is this your case?

kkdoon commented 1 year ago

My job only has 1 DoFn using the @RequiresStableInput annotation (at the terminal node). Job graph looks as follows:

KafkaIO -> DoFn -> DoFn with RequiresStableInput

The job always fails during the final savepoint operation with the watermark hold error and not after the savepoint is taken (fails between step b and c).

je-ik commented 1 year ago

This feels strange. Can you try to log the elements arriving at the stable DoFn after the savepoint to see where are they coming from? I would be surprised if this would be coming from the KafkaIO (provided it stops sending data before triggering the checkpoint).

kkdoon commented 1 year ago

These events are the buffered events that are emitted from DoFn1. I tried the following steps:

  1. Publish 10 elements to Kafka topic and stop the producer job.
  2. Verified that the 10 elements were processed by DoFn1 and Stable DoFn did not receive them.
  3. Drained the job and it failed to execute the savepoint (and did not emit any of the buffered 10 elements).

From my experience, the only instance drain operation works is if the buffered elements are fully processed (via checkpoint trigger) and buffer is empty at time of drain execution.

je-ik commented 1 year ago

Sorry for late reaction, I was OOO.

I once again walked though the code and it seems to make sense now. Here is the problem: a) flushData method is wired up to https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/StreamOperator.html#finish-- (https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html#close-- to Flink 1.12) b) this method is apparently called prior to call to notifyCheckpointComplete, which means we still have buffered data

The only solution then seems to make sure that in call to flushData je clean the buffer (process all buffered data and emit outputs, clearing the watermark hold). All this should happen immediately before the last checkpoint is completed and thus it should not break the stable input DoFn's contract.

kkdoon commented 1 year ago

No worries and thanks for your response.

I wanted to double check that if we flush the buffer within flushData AND before final checkpoint operation is complete, won't that violate stable input DoFn contract (where stable DoFn should only be invoked once state is check-pointed)? Or is it ok in case of drain operation? I might be misunderstanding the sequence of operations here.

Issue i see is that once we flush the buffer, it would trigger the downstream DoFn processing. And in the scenario where the downstream stable DoFn operation fails (say its writing to a sink and the sink is unavailable and/or checkpoint itself has timeout), our final checkpoint and drain operation will fail, which could lead to inconsistent state.

je-ik commented 1 year ago

Yes, if the final checkpoint fails (for whatever reason), then the retry can yield different result than the first run. I think we have only two options: a) best-effort, i.e. flushing our buffer in flushData, which is what it was intended for, or b) fail the pipeline, as draining is incompatible with stable DoFns.

Currently, we do b), which is actually semantically correct. We can change the exception to be more explanatory or even better throw exception unconditionally, if DoFn has stable input (currently it might fail non-deterministically).

Because switching between a) and b) might be subject to user decision, we might introduce a flag to FlinkPipelineOptions that will opt-in to from b) to a).

kkdoon commented 1 year ago

ok, adding a flag for drain behavior sounds reasonable.

Lastly, I just wanted to double check with what issues do you see if we instead skip the watermark hold check inside flushData and do it inside notifyCheckpointComplete (only when drain is initiated) ? That way we still maintain the contract of flushing after checkpoint is completed. And in this case if final checkpoint fails, its still ok since drain will fail and pipeline will continue running (and user can retry again later). And if checkpoint is successful but stable DoFn fails, then drain will finish successfully with a warning message (to restore last savepoint and re-run the pipeline since some elements were not processed).

If watermark skip approach doesn't seem correct, then i will update my PR to implement approach A.

je-ik commented 1 year ago

There are two problems: a) there can be other reasons why there are watermark holds besides stable input buffering, so skipping the checks would hide those as well b) draining pipeline first updates watermark to infinity, which (as documented) would result in incorrect output if the pipeline is later resumed from the final savepoint (it would move watermark back in time, which is correctness bug). Draining is meant only for cases when the pipeline will not be resumed later.

kkdoon commented 1 year ago

Thanks for the clarification! Makes sense to make this operation best-effort in that case.

je-ik commented 11 months ago

Closed via #29102