apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.76k stars 4.21k forks source link

[prism] Programmatic Cancel, and Drain #29669

Open lostluck opened 9 months ago

lostluck commented 9 months ago

Under #29650.

Why is this needed

While most pipelines should naturally run to termination, successfully or not, it should be possible to abort pipeline execution by Prism in two similar, but distinct ways, Canceling, and Draining.

The goal is to only leverage the JobManagement API as the external surface through which these pipeline level operations are activated. However, at present, the JobManagement Service only has a Cancel request.

A Drain and/or Update request would need to be designed and added, as there doesn't appear to be an existing hook for it. Alternatively a prism specific prototype service method added for experimenting with to avoid blocking on consensus, and cluttering the JobManagement API prior to that.

What problem does Cancel and Drain solve?

During local development, developers need to cancel or drain pipelines submitted to Prism. Currently there is no way to stop submitted pipelines running via Prism that have remaining elements to process unless they terminate the Prism executable, consequently terminating all other running Jobs.

Expected behavior

The following is the in-scope expected behavior and purposely leaves out implementation details.

Feature: Terminate a Job

Scenario: Developer terminates Job from Prism's UI


    Given Job state is <before_state>
     When I click <action> button
     Then I see the Job state as <after_state>
      And the cancel and drain buttons are disabled
      And Prism stops reading from input sources
      And Prism <continue_or_stops> processing remaining elements in the pipeline

    | before_state | action | after_state | continue_or_stops |
    | RUNNING      | cancel | CANCELLING  | stops             |
    | RUNNING      | drain  | DRAINING    | continues         |

Scenario: Developer sends a terminate request via API call

    Given Job state is <before_state>
     When I send <request>
     Then the <response> state property is <after_state>

    | before_state         | request                  | response                 | after_state          |
    | RUNNING              | CancelJobRequest         | CancelJobResponse        | CANCELLING           |
    | RUNNING              | DrainJobRequest          | DrainJobResponse         | DRAINING             |
    | DRAINING|DRAINED     | (Cancel|Drain)JobRequest | (Cancel|Drain)JobRequest | DRAINING|DRAINED     |
    | CANCELLING|CANCELLED | (Cancel|Drain)JobRequest | (Cancel|Drain)JobRequest | CANCELLING|CANCELLED |
    | STOPPED              | (Cancel|Drain)JobRequest | (Cancel|Drain)JobRequest | STOPPED              |
    | Does not exist       | (Cancel|Drain)JobRequest | nil                      |                      |

Additional comments

Pipeline Cancel is relatively straight forward: Get a hold of the job handle, and call the job's CancelFn function, and ultimately update the state to "CANCELED". Canceling a job doesn't require that a job is gracefully terminated, only that all active workers are stopped. While this is the purpose of the CancelFn, it should be verified that clean up has occured.

Ideally, the state should transition through CANCELLING, and the job is only set to CANCELLED once all extant environments have validated as terminated.

This would be implemented in Prism's JobManagement server, as the Cancel method:

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go

Cancel can be used for both Batch pipelines and Streaming pipelines, not that Prism makes a distinction.


Drain is not a full part of the Beam Model as of yet (see https://beam.apache.org/documentation/runners/capability-matrix/additional-common-features-not-yet-part-of-the-beam-model/). That said, it remains a useful thing to implement.

Note: It's noted Dataflow's implementation of drain is unable to complete if there are EventTime Timer Loops (that is, an event time timer callback schedules another event time timer callback, preventing the watermark hold to progress, and ultimately terminate). It is sufficient to be at parity with this implementation.

Outside of the JobManagement portion, Drain is similar in goal (job termination), but goes through a different path, in particular to avoid Data Loss from the user pipeline. It's largely meaningful for streaming pipelines. (The Google Cloud Dataflow runner does not allow Batch pipelines to drain, for example).

Pipeline Drain needs to cut short any upstream sources, and allow all inflight data to be processed to termination. Any data pending to be processed needs to be processed, and then the pipeline should gracefully terminate if it's able to. In principle, this is a matter of requesting that the SDF's SDK instance truncates the restrictions of any SDF elements before processing them again, and then completing processing.

Beam's mechanism for this is sending a bundledescriptor to the SDK worker with the TRUNCATE_SIZED_RESTRICTIONS URN, prior to the normal PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS bundle.

While this initial description doesn't aim to proscribe a specific implementation, the following should useful pointers into the Prism code to be aware of.

Prism notes the TRUNCATE_SIZED_RESTRICTIONS transform in a comment in handlepardo.go, but doesn't generate it. However, it should not be any different Payload wise than the PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS, and the coder for the input and output are identical. Therefore it isn't hard to synthesize the transform later on if needed, using the existing Pending Elements for the input PCollection.

Prism builds BundleDescriptors for a given fused stage in the buildDescriptor function in stage.go

A possible approach would be to detect when a stage contains a PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS in the root position, and generate a modified deep clone of the stage's ProcessBundleDescriptor, that ensures the root DataSource transform's output PCollection is consumed by the Truncate transform, and the Truncate's output is consumed by the Process transform. No other changes should be required on the descriptor.

Prism's bundles are scheduled by the ElementManager, which feeds ready to process bundles to the execution stack through a channel. For streaming/process continuation bundles, there can be ResidualRoots returned with the ProcessBundleResponse. These are fed back into the stage's input PCollection for subsequent reprocessing. Prism should skip doing so if the stage is handling a drain/truncation, since the goal is to cease processing.


A reasonable test for correct handling of drain would be ensuring that prism can handle draining pipelines with multiple Unbounded SDFs as sources. It feels unlikely that a pipeline would have an unbounded PCollection as the input into an unbounded SDF, but ideally this case is also verified as well.


While being able to do all this from the UI is sufficiently useful actual programmatic access from each of the SDKs is out of scope of this issue.

That is, unless it's simple to add some kind of handle or hook to an SDK so a unit test can programmatically trigger the cancel or drain, it's acceptable to make a simpler path. Eg. Forcing a manual specification of a JobID, or hacking a simplified/separable Job submission pathway so that the JobID is known, and the job can have the operation applied on the side.

This is largely noted because the current Go SDK doesn't have a simple hook for actuating job management API code at present. It would be a separate issue and task to do something like update the PipelineResults interface to support these operations.

lostluck commented 2 months ago

It looks like Python does have some drain tests already authored, so the big question is can we make them work in loopback mode. They're currently skipped in the portable_runner_test.py structures we're inheriting from for prism_runner_test in #31583.

Granted, they do seem to be based on executing in the PythonPortable runner, so they may be a non-starter as is, since the Drain call is relatively new.

damondouglas commented 2 months ago

Are these Python drain tests part of the runner validations?

lostluck commented 2 months ago

Technically yes, but marked as skipped by the hierarchy, since portable runners couldn't trigger drain previously. The "pre work" would be making it so the python sdk could call drain through to the actual runner. I think there's a handle that could have that method. I when I find it again I'll post a link to it here.