apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.9k stars 4.27k forks source link

[Bug][Go SDK]: GrowableTracker TrySplit() with fraction math.NaN() #29210

Closed johannaojeling closed 1 year ago

johannaojeling commented 1 year ago

What happened?

Problem

In SDFs, ProcessSizedElementsAndRestrictions.GetProgress() can return math.NaN() if done and remaining are both 0:

https://github.com/apache/beam/blob/714badb821b7acfb9801a69dd0aff08bc84f6b4f/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L925-L926

    d, r := n.rt.GetProgress() // d, r := 0, 0
    frac := d / (d + r) // frac := 0 / (0 + 0)

This causes downstream problems if using an offsetrange.GrowableTracker with an unbounded offsetrange.Restriction, as math.NaN() is then being passed as the fraction to TrySplit(). The function returns a primary restriction with its End smaller than its Start. This in turn leads to a negative return value when Size() is invoked, which causes the pipeline to fail.

Example use case

Reading messages from a stream fails at checkpointing if no new messages have been published and read since the last checkpoint.

Given

Initial state

Rest: {Start: 1, End: math.MaxInt64} Tracker: {attempted: -1, claimed: 0} Estimated end: 6

At checkpoint 1

Rest: {Start: 1, End: math.MaxInt64} Tracker: {attempted: 5, claimed: 5} Estimated end: 6

→ fraction: 0.5 → splitPt: 6 → primary: {Start: 1, End: 6} → residual: {Start: 6, End: math.MaxInt64}

At checkpoint 2

Rest: {Start: 6, End: math.MaxInt64} Tracker: {attempted: -1, claimed: 5} Estimated end: 6

→ fraction: math.NaN() → splitPt: 5 → primary: {Start: 6, End: 5} → residual: {Start: 5, End: math.MaxInt64}

→ primary.Size() < 0 → error

Logs ``` 2023/10/31 07:48:33 WARN SDK Error from split, aborting splits bundle.ID=inst004 bundle.stage=stage-003 bundle.watermark=1969-12-31T23:59:59.999Z error="split[inst004] error from SDK: unable to split inst004: \tAttempting split in ProcessSizedElementsAndRestrictions\n\tSDF.ProcessSizedElementsAndRestrictions[natsio.readFn] UID:4 Out:[2]\nsize returned expected to be non-negative but received -1." 2023/10/31 07:48:33 ERROR unable to split inst004: Attempting split in ProcessSizedElementsAndRestrictions SDF.ProcessSizedElementsAndRestrictions[natsio.readFn] UID:4 Out:[2] ```

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

johannaojeling commented 1 year ago

@lostluck I'm happy to fix this but want to align with you on the implementation:

johannaojeling commented 1 year ago

.take-issue

lostluck commented 1 year ago

OK, I think the best answer for now is "if 0 work has been done, fraction is 0". This avoids the NA condition. With a growable tracker, there's the potential for more work to appear later, so this avoids issues with the fraction of progress going backwards as work appears (eg. 1. back to 0) . (This can still happen, if the amount of work goes up faster than we're processing it... but that's a different issue for runners to deal with).

I think it's a safe enough behavior to add to ProcessSizedElementsAndRestrictions.GetProgress