Open teskje opened 1 year ago
Regarding possible solutions:
Hello
responses after successful connection, to inform the controller about the state of installed dataflows in each of the workers. The controller could use that information to identify partially installed dataflows and push their creation after the initialization stage, to ensure they always get dropped and re-created by the replica. That doesn't work though, because partially installed dataflows cause a mismatch in allocated Timely operator/channel IDs on different workers, and having all workers reinstall a dataflow doesn't fix this mismatch. The controller would have to somehow tell all workers to discard Timely IDs until they are consistent again.CreateDataflow
command, waits until has heard back from all other workers that they are also ready to install the dataflow. Only when all workers report that they are ready is the dataflow installed.Reproducing that thread here before it gets auto-expired:
Frank McSherry Hi folks! In chewing on reconciliation, I think we have a fairly significant architectural bug with multi-process replicas. Reconciliation at least is based on the principle that the workers in the replica can recover from a known state to some target state. This is great when there is a known state, but .. with multi-process replicas and partial failures we can get ourselves into not having a known state. :thread:
Frank McSherry Let's say that we have two processes in the replica, p1 and p2. The environment controller wants to construct dataflow A, and says as much to each replica. However, before the message gets through to p2, the controller crashes.
Frank McSherry When the controller comes back on line, the replicas are not in a known state. At least, the controller is not really sure whether the dataflow A command reached both replicas, and .. if it hasn't it can't really be sure what to do next. The
computed
s should restart in that case, as they are weirdly out of sync, but .. who actually knows this?Frank McSherry There are a few plausible remedies:
- On connection, each process advertises its received command history back to the coordinator, from which the coordinator can at least learn if everyone is in sync (and if not, restart the group).
- The controller just speaks with one process, who broadcasts the commands to other workers using timely's
Sequencer
dataflow (a built-in broadcast, whose failure results in the mesh failing).- The controller uses a more reliable mechanism to deliver the commands, such as recording them in a DB or putting them in Kafka.
Lukas Humbel [[ I'm probably missing something here, but: ]] Shouldnt the rehydrate mechanism take care of this case? When the controller restarts, it will cause
ActiveReplication::add_replica
to happen, which will rehydrate with the whole command history. Using that mechanism the computeds should catch up.Frank McSherry The issue I think is that the rehydration will tell them the goal state, but reconciliation is what takes each worker from its initial state to this goal state. If the initial state is not shared by all workers, we are in a bit of a mess. Moreover, it is hard for workers to know if they are in a shared initial state.
Lukas Humbel Conceptually (I'm not claiming that it is actually working like this right now), if the rehydration + reconciliation brings all the workers to the goal state, the initial state shouldnt matter? That of course requires "state" to really capture all state...
Frank McSherry Here's a concrete example:
- Controller builds a dataflow, maybe does a TAIL against it for a while, and then allows it to drop.
- Worker 1 sees "build, drop".
- Worker 2 sees "build".
- Controller restarts and asks to rebuild the dataflow because it needs it for a TAIL.
Worker 2 has a live running dataflow that worker 1 has closed. Worker 1 .. doesn't have a dataflow, and may need to create one. However, if it does it will wait indefinitely because worker 2 isn't about to create a new dataflow.
Frank McSherry The issue is that reconciliation isn't a purely local action. "Adding a dataflow" requires participation of the other workers to actually result in a running dataflow, as opposed to workers 1 and 2 each having half-formed dataflows.
Jan Re option 1: Would processes need to advertise their entire received history or would it be enough to send, e.g., the last received command, or the sequence number of the last received command, or the total count of commands received? The entire history can grow unboundedly large, no? Re option 2: Without any additional context this sounds like it would simplify the controller (as it only needs to talk to a single process per replica now) without adding complexity anywhere else (as everything exists already in timely), which would be great! I’m sure that’s not really the case though
Lukas Humbel I see. In your example: Worker 2 is not able to recognize that the other half of the dataflow is gone?
Frank McSherry
- There is a compacted representation of the history they could send, or they could just hash it I think. The goal isn't for the controller to know what each worker has installed so much as know that they are all in sync.
Frank McSherry
- I think this is good too, but it also introduces a bottleneck at one worker if the commands are large.
(TBC)
In the current compute command protocol, compute commands (except for
CreateTimely
) are distributed in an asymmetric fashion: Each command is only sent to the first process of the replica, which then distributes the command to the other processes using a dataflow.We introduced this distribution method to solve the issue that during reconciliation the compute controller would not know if any dataflows were only partially installed across the replica’s processes. By distributing the commands through timely, partially installed dataflows are avoided because the command distribution either succeeds, or the timely cluster crashes. (More context in Slack.)
The asymmetric distribution pattern introduces a number of edge-cases to our implementation:
CreateTimely
command is distributed differently from all the others.FrontierUpper([])
response for all collections that are dropped, even if semantically incorrect, because protocol clients need to know when they can clean up their frontier tracking state for dropped collections.We should think about alternative approaches to solving the above reconciliation problem that avoid having to introduce all these edge-cases.