apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.74k stars 4.21k forks source link

[prism] TestStream support (Elements + Watermarks) #29917

Closed lostluck closed 6 months ago

lostluck commented 7 months ago

With Event Time timers largely sorted out in #29900, the next thing to work on is TestStream, since it should inform correct handling of ProcessingTime in general.

Largely described in this blog post TestStream allows pipeline authors to validate their pipelines and code in a variety of conditions deterministically. TestStream does this by providing a string of 3 different event types: ElementEvents, WatermarkEvents, and ProcessingTime events.

WRT the implementation in protos, TestStream is a Primitive Transform configured by a TestStreamPayload.

The TestStream is to enact a single event at a time, and only proceed to the next event after the runner has reached quiescence. That is, all "side effects" of that event have occured, and processing has reached a stand still. This requires integration with the runner's watermark and bundle tracking infrastructure.

While in principle it's possible for a single pipeline to have multiple TestStream transforms, ElementEvents in practice this is not going to yield correct results, since it's not possible to specify a Total Ordering of events with multiple test streams, without belabored alternations with WatermarkEvents, which are the only real global constraint on ordering.

As such, this initial pass will only permit a single TestStream to exist in a pipeline, and reject pipelines that have more than one TestStream instance, at job submission time. This would be in the Prepare request from the JobManagment service.


Prism already knows when it has "reached quiescence" and currently treats this as an error case to avoid hanging indefinitely. In particular, in the ElementManager.Bundles after watermarks have refreshed, we check if there are no further inprogress bundles, or scheduled watermark refreshes. This is a natural point to trigger/insert any events from TestStream. If there are no test stream events,


The protos indicate it's possible to "tag" the Watermark and Element events, in order to specify a the total ordering of events, and avoid adding multiple TestStreams to a pipeline in the first place. This facility is currently only implemented in the Python SDK, in order to help service the Interactive Beam runner.

It is out of scope to block implementing TestStream tags to the Go SDK implementation, so while tags will be kept in mind during the prism implementation, it will not be able to be deemed complete until the python ValidatesRunner tests are executed against prism.

As such there are interesting code and tests from the Python SDK to look at for validating this feature set.

Tests: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/testing/test_stream_test.py

The Python direct runner implementation: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/direct/test_stream_impl.py

The WatermarkControllerEvaluator (that which the direct runner substitutes in to the above implementation to control the watermark precisely): https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L377


Structurally the Prism implementation will be similar, but without additional "transform" wrappings the Python runners take. Prism will approach test stream like impulse, in that there will be a single TestStreamHandler within the ElementManager. Each output tag will have a synthetic stage associated with it, permitting it to accept TestStreamEvents.

The synthetic stage approach allows tag level control of the output watermarks, and avoiding adding additional special case handling for output watermarks which are usually all the same for a given stage. (AKA a single output watermark, rather than multiple).

When the quiescence is reached, the TestStreamHandler can direct the next event as appropriate, which then updates their state as appropriate, via special methods on the stateState.

The overall pipeline execution should be fairly normal otherwise. TestStream can update "pending" elements and similar just as the ElementManager already does for ordinary data and timer handling.


TestStream also has the ability to connect to a TestStreamService. IIUC this is to allow for an external handler of test stream state for Interactive Beam. This is out of scope for Prism at this time, and test streams with the service configured will fail at pipeline submission.

lostluck commented 6 months ago

30072 has the PR for this. TIL that locking comments also prevents the automatic github bot from referencing it here.

lostluck commented 6 months ago

PR submitted, Processing time handling is being tracked in #30083.