TimelyDataflow / timely-dataflow

A modular implementation of timely dataflow in Rust
MIT License
3.25k stars 272 forks source link

Mark drop_dataflow as stable #519

Open benesch opened 1 year ago

benesch commented 1 year ago

cc @teskje @antiguru @frankmcsherry

Just wanted to queue this up since MaterializeInc/materialize#18442 is inching closer to completeness. If there are remaining concerns about stabilizing drop_dataflow (beyond 2673e8c), I figured this would be a good way to draw them out. Marked as a draft, since we obviously we shouldn't merge this until all those concerns are resolved.


Between 2673e8c60 and ff516c81, drop_dataflow appears to be working well, based on testing in Materialize (MaterializeInc/materialize#18442). So remove the "public beta" warning from the docstring.

Fix #306.

frankmcsherry commented 1 year ago

This is not going to merge without a great deal more attention, I'm afraid!

benesch commented 1 year ago

Great! :D

I am personally quite nervous about using drop_dataflow in production in Materialize without a clearer articulation of the remaining problems with drop_dataflow. I'd love to find a way to fix these problems (or justify why they won't be problems in Materialize's use of timely) that doesn't involve us discovering these problems the hard way in production!

frankmcsherry commented 1 year ago

For me, the issue is that "the remaining problems" are that things in timely (and many distributed systems) work not because they have an enumeration of remaining problems that we burn down, but because the things have specific reasons that they are meant to work. We then debug any non-adherence to those specific reasons, but the reasons need to exist first.

Here, drop_dataflow is some amount like de-allocating a Rust object behind a shared reference. Other folks in the system still have its name, still plan on doing things with it, and ... idk what sorts of assumptions get violated if it ceases to exist. The principle used to be "a dataflow is dropped once all participants have both created and eventually confirmed completion of the dataflow", with the progress tracking mechanism as the "reason" this is safe. With drop_dataflow that reason doesn't apply anymore, and .. it's unknown what class of thing might go wrong, and whether / how to fix it.

So, foremost of the remaining problems is the absence of an explanation for on what basis drop_dataflow should be safe to call, what the new dataflow lifecycle looks like (can you call drop_dataflow mid dataflow construction?), and what this means for folks who want to correctly use timely by relying on operator shutdown.

benesch commented 1 year ago

So, foremost of the remaining problems is the absence of an explanation for on what basis drop_dataflow should be safe to call, what the new dataflow lifecycle looks like (can you call drop_dataflow mid dataflow construction?), and what this means for folks who want to correctly use timely by relying on operator shutdown.

Thanks. This is really helpful.

petrosagg commented 1 year ago

So, foremost of the remaining problems is the absence of an explanation for on what basis drop_dataflow should be safe to call, what the new dataflow lifecycle looks like

Here is a starting point for an explanation that we can fortify or reject if it leaks from anywhere. drop_dataflow is safe to call on the basis that timely operates safely in the presence of an unwinding panics and asynchronous networks. In other words, I think an explanation for drop_dataflow can be derived be reducing its functionality to an equivalent set of events that we already deem to be safe.

To construct the equivalence of drop_dataflow to a set of already safe operations let's first consider a timely cluster running a single dataflow. This is a big assumption but we will come back to it in the end. In this setting when a worker panics (either during or after dataflow construction) all the operator state will be dropped and destructors will run. Additionally, we can hide this event from all the other workers by assuming that the network is down for an arbitrary amount of time. Therefore we can imagine a drop_dataflow implementation like so:

fn drop_dataflow_panic() {
    disable_network();
    panic!();
}

We can now split our safety argument into two subgoals. a. That when a worker calls drop_dataflow_panic the rest of the workers do not take any action that violates safety b. That when a worker calls drop_dataflow_panic that worker itself does not take any action that violates safety

Property a follows from the existing assumption about timely being panic safe, as already described. Importantly, property a holds regardless of whether the other worker panicked or not! In other words, for all behaviors of a worker W that are indistinguishable from drop_dataflow_panic the non panicking workers won't violate safety.

This gives us latitude to change drop_dataflow_panic as we please, and as long as its behavior is equivalent when observed over the network we won't violate property a.

Property b is trivially true for the current implementation of drop_dataflow_panic because the process halts and makes no further actions. If a process makes no actions then it is also impossible to take a safety violating action, since there are none taken.

This split is important because changing drop_dataflow_panic to an equivalent implementation has turned from a distributed system analysis problem to a single threaded program analysis problem. In other words, given an alternative implementation we only need to show that its single threaded execution produces the same observable behavior over the network to the one of disable_network(); panic!().

Timely's abstraction of inter-worker communication is provided by the Allocate trait, and each worker is aware of all the network channels allocated for a given dataflow. Therefore we can replace the drop_dataflow_panic from above to something like this (pseudocode) which I would argue behaves identically to drop_dataflow_panic when observed over the network:

/// Disables the network of the given dataflow id and immediately drops it
/// Panics if the dataflow is under construction
fn drop_dataflow_real(&mut self, dataflow_id: usize) {
    for channel in self.dataflow_channels[dataflow_id] {
        channel.disable();
    }
    if dataflow_under_construction {
        panic!();
    } else {
        self.dataflows.remove(dataflow_id);
    }
}

We have almost arrived to the conclusion that we want but we have made a huge assumption, namely that the timely cluster is running a single dataflow. If the cluster is running more than one dataflow then drop_dataflow_panic and drop_dataflow_real are no longer equivalent because in that setting drop_dataflow_real blocks only part of the network and drops only part of the state.

As far as I understand, this is the crux of the issue. If we can say that timely clusters can run multiple dataflows only as an optimization and the programmer should assume that each dataflow is its own failure domain that can crash independently then drop_dataflow is safe.

If instead we want to guarantee to timely programmers than when a dataflow panics on a worker no other dataflow gets to run on that worker, further elaboration is needed and it's unclear if drop_dataflow is safe.

On its face, having each dataflow be its own failure domain sounds like a fine model to program against. The programmer can imagine that every time they call worker.dataflow(..) this spins up a separate timely cluster that will run that singular dataflow. Admittedly this puts some amount of burden to the programmer that needs to be careful when using primitives that share state across dataflows (e.g exporting an arrangement) but the model is clear and can be reasoned about.

@frankmcsherry would love to hear your thoughts on this analysis