Open pabloem opened 1 year ago
@lukecwik could you take a quick look at this and let me know if my assessment is correct?
The expectation is that the runner will checkpoint the bundle allowing the SDF to be checkpointed if it detects that the bundle has been running long enough or has done enough meaningful work.
Also the conversion only outputs one record at a time, is there a DoFn buffering all the intermediate output downstream?
After looking through the code the issue is that the https://github.com/apache/beam/blob/6ba647333c4c69fb6dfc65929456c7c11570382f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L184 is a normal DoFn which forces outputting all the records that are part of the split. This doesn't support checkpointing which causes OOMs in Dataflow since it buffers all the output for the bundle.
https://github.com/apache/beam/blob/6ba647333c4c69fb6dfc65929456c7c11570382f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L455 is an SDF wrapper for unbounded sources which could be useful as part of a solution here.
If this has never worked on SDF, is it P1? It does seem like trouble that it cannot be used in portable runners in some cases. How widespread is the problem it causes?
It is primarily uses for benchmarks and slowly changing side input patterns.
this is only for streaming GenerateSequence with a to
and a from
parameter. And for very large sequences. If it's being used for slowly changing side inputs, it usually doesn't have a to
parameter, (I think?) so in that case it is not broken
I would say since this has been broken for a long time without too much complaint P2 should be OK. We don't need it included in our alert email of urgent things being neglected.
What happened?
The transform
GenerateSequence.from(0).to(...).withRate(...)
that is expanded into a streaming source (that is bounded, as there's a maximum number of records read) is broken after it was migrated from anUnboundedSource
based implementaion.For smaller data sizes, the source works as expected, and a sequence is generated properly.
For larger data sizes, the transform causes workers to OOM and never make progress.
Reasons:
This is what causes ooms for workers before they can make any progress.
(here's the expansion for GenerateSequence: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java#L242-L258)
I found that the expansion always became a
BoundedReadFromUnboundedSource
if we hadfrom
andto
parameters.Issue Priority
Priority: 1 (data loss / total loss of function)
Issue Components