Closed sthm closed 7 months ago
It seems that this isn't unique to Kafka sources. Load generator sources, for example, exhibit the same behavior.
I believe the heart of the issue is that no path currently exists for ingestion dataflows to perform graceful shutdown upon termination of their backing replica. Since each one is responsible (at least implicitly) for emitting health statuses, sending a final HealthStatusUpdate
before shutting down seems like a possible path forward.
I noticed that the process orchestrator kills any subprocesses it manages, so the above solution won't quite work here unless that behavior is tweaked. I imagine this would be different by default in k8s, but not sure.
Would love to try and tackle this, but definitely need some guidance on the above!
I believe the heart of the issue is that no path currently exists for ingestion dataflows to perform graceful shutdown upon termination of their backing replica.
Exactly right.
Since each one is responsible (at least implicitly) for emitting health statuses, sending a final
HealthStatusUpdate
before shutting down seems like a possible path forward.I noticed that the process orchestrator kills any subprocesses it manages, so the above solution won't quite work here unless that behavior is tweaked. I imagine this would be different by default in k8s, but not sure.
Relying on graceful shutdown has at least two pitfalls:
paused
as there is still a replica running.(1) is a problem irrespective of this work. Perhaps we need to introduce a replica_id
column to the mz_{source|sink}_status_history
table, and then each source/sink can write only to its own replica.
(2) is trickier. We need to find a way to set the source/sink status to paused
iff the number of replicas drops to zero, without races if replicas are rapidly created/dropped. The storage controller is the place that is best positioned to make this determination, but even it doesn't have perfect information. Dropping a replica is asynchronous; the storage controller doesn't know when that drop has completed. If we had the storage controller set the status to paused
at the moment a replica was dropped, it'd still be possible for the following sequence of events to occur: 1) replica drop command accepted, 2) controller kills process/pod, 3) storage controller sets status to paused
, 4) clusterd process sets status to starting
, 5) clusterd process receives kill signal and exits.
I wonder if our best bet is to revisit this design decision:
Screenshot from internal design document.
These problems seem much easier in a world where the storage controller mediates all status writes. At the moment that the controller processes a drop_replica
command, it can set the status of the source to paused
and know that that status won't be overwritten, since any future status updates would have to flow through the controller but the controller has terminated all contact with that replica.
Requesting thoughts from @bkirwi and @aljoscha. I think this is probably a change that deserves a design doc.
I wonder if our best bet is to revisit this design decision [...]
I think this is likely, yes. The current many-writers approach has scaled fine - at least on the write path - but the data rates we see in practice don't seem prohibitive for managing this state upstream. And there are new reasons to prefer writing this data from a central location; aside from the multiple source replica question you mention, it's also table stakes if we ever want to expose historical data using the as of
-type approach discussed here: https://github.com/MaterializeInc/materialize/issues/18823.
We should do some thinking about how to make sure to bound the resource pressures on environmentd
, and to make sure that this doesn't prevent us from sharding it out in the future. A design doc sounds like a good idea!
(1) is a problem irrespective of this work. Perhaps we need to introduce a replicaid column to the mz{source|sink}_status_history table, and then each source/sink can write only to its own replica.
Makes sense to me for that relation! For mz_source_statuses
it might be worth it to obscure that and display a status derived from all replica statuses. E.g. if one is paused we could display something like status: degraded
.
Overall, I'm definitely in agreement that it would be simplest for status updates to be written by the storage controller, although I don't have context on the current loads on environmentd
. I would think there's some leeway to tweak the frequency of status updates or writes in the case that it becomes an issue.
Would be great if I can help out where it makes sense (the design doc, reviews, etc.). Assuming it's not a burden on the team 😄
For
mz_source_statuses
it might be worth it to obscure that and display a status derived from all replica statuses. E.g. if one is paused we could display something likestatus: degraded
.
Exactly what I was thinking too!
We should do some thinking about how to make sure to bound the resource pressures on
environmentd
, and to make sure that this doesn't prevent us from sharding it out in the future. A design doc sounds like a good idea!
I'm not too worried about this myself, for what it's worth. Each clusterd
process sends back FrontierUppers
to the storage controller on every tick (same for the compute controller), and it's a totally reasonable amount of traffic. Source/sink status updates should be much lower volume than FrontierUppers
responses in the steady state. I think we'd just need to make sure that a crash-looping source doesn't send back updates in a hot loop. Seems quite manageable though.
Would be great if I can help out where it makes sense (the design doc, reviews, etc.). Assuming it's not a burden on the team 😄
Absolutely. I think if you wanted to take a stab at writing the design doc, you should! Our design doc process is detailed here: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md
I would cover:
StorageResponse
—probably something like StorageResponse::ObjectStatus
.storage::healthcheck
module will evolve to send back a StorageResponse::ObjectStatus
to the controller rather than writing the status directly to the status shard.ObjectStatus
responses and write the results to the mz_{source,sink}_status_history
shard. (This needs to happen off the main thread.) paused
status is emitted even in the face of adversarial orderings:
ObjectStatus
arrives at controller indicating source is starting. (It may be possible to structure the code to ensure this cannot happen.)environmentd
crashes and reboots before the controller writes the paused status to the status shard.mz_{source,sink}_status_history
schemas to include a replica_id
.You'll probably find that you'll need to prototype as you go in order to write the design document. I find prototyping and writing the design document in parallel to be a powerful technique. The prototype validates the design; writing down the design improves the code; and the mentality of "prototype, not production code" keeps you from getting too attached to the code before other folks have had a chance to weigh in on the design.
Appreciate the advice! I’m definitely down to give it a shot.
Just want to call out that my free time is relatively less this coming month, so this work may take me a little longer. But happy to get started!
No problem! We appreciate the help. If this work ends up being urgent we can always have you hand it off to someone here at Materialize.
this should be done now: https://github.com/MaterializeInc/materialize/pull/23222
What version of Materialize are you using?
v0.50.3 (d48f33fa2)
How did you install Materialize?
Materialize Cloud
What is the issue?
When all replicas of a source cluster are dropped, the source can no longer ingest new data. However, the
mz_internal.mz_source_statuses
is still reporting that the source isrunning
.Steps to reproduce: create a source cluster and add a source to the cluster using
IN CLUSTER
. Drop the replica from the source cluster. Enjoy.I would have expected that the source status reports stalled or something similar.
Relevant log output
No response