apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.76k stars 4.21k forks source link

[prism]: unsupported feature "WindowStrategy.OutputTime" set with value (LATEST|EARLIEST)_IN_PANE #31462

Closed damondouglas closed 1 month ago

damondouglas commented 3 months ago

Several Java tests fail on Prism with errors:

unsupported feature "WindowStrategy.OutputTime" set with value LATEST_IN_PANE
unsupported feature "WindowStrategy.OutputTime" set with value EARLIEST_IN_PANE

Steps to reproduce:

  1. Run Prism server.
./gradlew :runners:prism:runServer
  1. Execute :runners:portability:java:ulrLoopbackValidatesRunnerTests on Prism:

Many tests fail with these errors, but to see the output of one quickly:

TEST=org.apache.beam.sdk.transforms.ReshuffleTest
./gradlew :runners:portability:java:ulrLoopbackValidatesRunnerTests -PjobEndpoint=localhost:8073 --tests="$TEST"

Parent tracker: #28187 Related: #31438

lostluck commented 1 month ago

Re-opening since the feature isn't fully implemented at this time.

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L343

Latest and Earliest in Pane must also take into account the rest of the Aggregation, which means using the previous computed time for the aggregation, and not simply the most recently seen timestamp for the element.

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L343

Assuming the aggregation struct exists kt.time contains the previous value during the aggregation.

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L350

lostluck commented 1 month ago

Looks like the Go SDK doesn't yet support OutputTime configuration in the windowing strategy:

20436 tracks allowing and correctly implementing OutputTime SDK side (accounting for CombinerLifitng too).

This will be implemented and then validate via the Java tests.