During scheduling, the controller sends the task assignments to the workers then waits for the tasks to start up. Each worker engine then constructs its graph and starts of the "local nodes"—i.e., the ones that it is responsible for running.
Each operator on startup follows these steps:
Notify the controller of task startup
Call on_start
Wait for all other operators to have started
Call run
If any of these steps panic, a TaskFailed message is sent to the controller.
However, if an operator panicked in step 2 at the wrong time, the pipeline could end up stuck while the controller thought it was healthy in the running state.
Why?
The controller determined that the pipeline was ready when it received all of the task start notifications, but those are sent upfront
A failed operator will close its queues, which should cause a cascade of panics that shuts down the pipeline cleanly. However, if it panics during on_start, no other operator will be started up because they are waiting for all operators to wait on the barrier (step 3) so they will never try to produce messages to the closed queue
The failed task will send a TaskFailed message to the controller, but because this is a "RunningMessage," it's ignored in scheduling
For the problem to occur, all three three issues are required.
This PR fixes the first and third issue, and ensures that a pipeline will either get into a true running state or fail and get restarted by the controller:
We now send TaskStarted notifications after on_start, once the operators are actually running; this means that we won't transition to running until the operators are actually running
The scheduling state now handles TaskFailed notifications and will restart scheduling on receiving one
Fixing the second issue—for example by allowing the barrier to be canceled on panic—is left as a future improvement.
During scheduling, the controller sends the task assignments to the workers then waits for the tasks to start up. Each worker engine then constructs its graph and starts of the "local nodes"—i.e., the ones that it is responsible for running.
Each operator on startup follows these steps:
on_start
run
If any of these steps panic, a TaskFailed message is sent to the controller.
However, if an operator panicked in step 2 at the wrong time, the pipeline could end up stuck while the controller thought it was healthy in the running state.
Why?
For the problem to occur, all three three issues are required.
This PR fixes the first and third issue, and ensures that a pipeline will either get into a true running state or fail and get restarted by the controller:
on_start
, once the operators are actually running; this means that we won't transition to running until the operators are actually runningFixing the second issue—for example by allowing the barrier to be canceled on panic—is left as a future improvement.