MaterializeInc / materialize

The Cloud Operational Data Store: use SQL to transform, deliver, and act on fast-changing data.
https://materialize.com
Other
5.72k stars 466 forks source link

[Epic] Permit moving sources/sinks/materialized views between clusters #17417

Open benesch opened 1 year ago

benesch commented 1 year ago

Product outcome

Users can move sources, sinks, indexes, and materialized views between clusters.

Discovery

As part of the original cluster unification work (https://github.com/MaterializeInc/cloud/issues/4929), we added support for creating sources and sinks in a cluster:

CREATE SOURCE src IN CLUSTER clstr ...;
CREATE SINK snk IN CLUSTER clstr ...;

We don't support moving a source between clusters, though. You have to drop and recreate it in the desired cluster. This is problematic for sources, because there are almost certainly downstream dependencies of the source that need to be recreated. This is problematic for sinks, because you'll have to recreate the topic in the downstream Kafka broker.

Come to think of it, this is problematic for materialized views too, for the same reason as sources. You may wish to migrate a materialized view between clusters without having to recreate all downstream dependencies on the new materialized view.

The first step here is to design the syntax for this feature. Here's a strawman:

ALTER SOURCE src SET CLUSTER clstr;

Note that this is intentionally lacking parens (ALTER SOURCE src SET (CLUSTER = clstr);). This is for consistency with the analogous PostgreSQL command ALTER TABLE ... SET {SCHEMA|TABLESPACE} {schema|tblspc}. The rule is that only parameters configured in the WITH block go inside the parens; parameters that have dedicated syntax, like schemas, tablespaces, and clusters, get dedicated ALTER ... SET syntax too.

Work items

### `ALTER ... SET CLUSTER`
- [ ] Materialized views (#20725)
- [ ] Ingestions sources
- [ ] Webhook sources
- [ ] Sinks
- [ ] Index (we might not want to support this as we can have local dependencies on the export occurring in non-increasing order)

Decision log

morsapaes commented 1 year ago

Should this be scoped with #12972 in mind? It seems to address much of the heavy-lifting required to handle blue-green deployments.

benesch commented 1 year ago

That's a good question. I was thinking of this as something that could be designed and implemented independently of #12972, since it's so tightly scoped and IMO has a very obvious and straightforward design. #12972 is nebulous, and you're totally right that we'd probably want to be adjust its design in light of this feature.

rjnn commented 1 year ago

Is this even doable? I have no idea how technically feasible this even is.

benesch commented 1 year ago

Yeah, the code path isn’t fundamentally any more complicated than recovering from a crash. On reboot we already assume that the prior incarnation may still be around and need to be fenced out.

So, in theory, supporting this is as simple as sending a DropSource command to the original cluster, not dropping any of the shards associated with the source, and then sending a CreateSource command to the new cluster. I’m sure there’s some practical complexity I’m overlooking but I think the theory is sound.

aljoscha commented 1 year ago

Yeah, the code path isn’t fundamentally any more complicated than recovering from a crash. On reboot we already assume that the prior incarnation may still be around and need to be fenced out.

So, in theory, supporting this is as simple as sending a DropSource command to the original cluster, not dropping any of the shards associated with the source, and then sending a CreateSource command to the new cluster. I’m sure there’s some practical complexity I’m overlooking but I think the theory is sound.

I believe that's true as well! The devil's in the details, probably, though... :sweat_smile:

petrosagg commented 11 months ago

Recording a conversation with @dseisun-materialize on this topic:

Daniel

do you have any thoughts about the level of effort for moving sources across clusters?

Petros The level of effort consists of:

I deleted a big text that is related to the following:

So far the controller, for the most part, does not synchronize its actions with clusters. It sends asynchronous commands that declare what the target state should be and receives asynchronous responses about any feedback clusters want to communicate back

What I described above changes that and now there is a clear request/response protocol going on where the controller must wait for the old cluster to turn off the dataflow before moving the source to a new cluster

This is related to the fact that ingestions are not yet HA-compatible, and so we must avoid having two ingestion running concurrently on two clusters

One way of avoiding it is by waiting as I said above. The other way would be to make ingestions HA-compatible

If we lump HA-compatible ingestions in this work then I'd say it's a large amount of work because we'd need to make the following components HA:

If we choose to coordinate the dataflows between clusters then it's much easier, but has the potential downside that you can't start the ingestion on the new cluster, wait for it to rehydrate, and then cut over. Which I assume is something desirable Any ingestion with upsert state will have downtime during the time of the switch

benesch commented 11 months ago

This is related to the fact that ingestions are not yet HA-compatible, and so we must avoid having two ingestion running concurrently on two clusters

What goes wrong if we don't wait for the old cluster to turn off the ingestion before starting the ingestion on the new cluster? It should be safe (i.e., correct), right? The two clusters will just fight with each other over the ingestion until the old cluster receives the "DropDataflow" command, right?

If we choose to coordinate the dataflows between clusters then it's much easier, but has the potential downside that you can't start the ingestion on the new cluster, wait for it to rehydrate, and then cut over. Which I assume is something desirable Any ingestion with upsert state will have downtime during the time of the switch

I think it's fine not to do graceful cutover for v1 of this feature. "Downtime" for a source is just staleness, and many customers can tolerate that staleness.

antiguru commented 11 months ago

We've encountered a blocker for this within compute, which I'll explain next, with potential solutions. However, this is not a fully-baked analysis because I don't yet know enough about the controller and rest of the system that it interacts with to determine what gaps remain.

Up to now, the controller could rely on the invariant that any ID it saw would be created at some point, and dropped at a later point, from which on it'd never see it again. This allows it to tag state by ID because there is only ever one object that an ID describes. The controller maintains the write frontiers, read holds, and other related state per ID. Moving objects between clusters, and thus between controllers breaks this invariant.

Imagine we move an item from cluster A to B to A. In this scenario, A learns about an ID, is instructed to drop it, B learns about it. Later, we instruct B to drop it and create the item on A. At this point, we might run into problems. The controller maintains state that is eventually-consistent because it shares some state over the network with its cluster processes.

The invariant that IDs aren't reused solves the asynchronous state management problem because we can distinguish fresh and stale results by checking whether we were instructed to forget about an ID. This is not true anymore once we reuse IDs, in which case we cannot distinguish fresh and stale results.

We can think of different solutions to this:

Let's discuss the solutions.

CollectionId

At the boundary between coordinator and controller, we change GlobalIds to CollectionIds, which we then use throughout. We maintain a mapping from GlobalId to CollectionId to provide consistency for the coordinator.

Sequential

The coordinator delays creating a new instance of the same item until the old instance does not exist anymore. This requires a new API to communicate the state and state changes from the controller to the coordinator.

Rewrite

The coordinator assigns a new GlobalId to items when moving between clusters. Clusters do not need to know about the configuration change.

teskje commented 10 months ago

Here is another blocker for moving materialized views from one compute cluster to another: compaction of index dependencies.

Let me give an example. Suppose we have this setup:

mv is healthily hydrated an reading from index1. Now suppose the user wants to move it to cluster2. Because cluster2 also have an index on source, they expect mv to come up healthy there too.

But... what if index2 has been allowed to compact beyond the write frontier of mv? When mv is moved to cluster2, index2 won't be able to provide all the historical times mv still needs to not end up with gaps in its output.

I can think of a couple ways to "solve" this, none of them great:

  1. We decide that mv cannot use index2 and needs to read from source directly. This could make the cluster fall over (most likely due to an OOM).
  2. We automatically drop index2 and re-create it at an earlier time so it can fulfill mv's requirements. That would make for very surprising UX and might introduce unavailability for other things depending on index2.
  3. We wait with the move, or disallow it, until mv has caught up to index2. But there is no guarantee for this to ever happen.
  4. We skip times in mv's output. This would be a correctness bug whose seriousness depends on the assumptions downstream consumers make about our output.

Relevant Slack discussion.

antiguru commented 10 months ago

Implementing support for ALTER MATERIALIZED VIEW .. SET CLUSTER is currently on hold given the implications it has on timestamp selection and the effort required to change the controller to handle multiple incarnations of the same object. We might resume it if there's more need. This is independent of supporting ALTER .. SET CLUSTER for sources and sinks, which may or may not have the same problems.