TimelyDataflow / timely-dataflow

A modular implementation of timely dataflow in Rust
MIT License
3.26k stars 271 forks source link

Messages could have multiple capabilities #47

Open frankmcsherry opened 7 years ago

frankmcsherry commented 7 years ago

Right now, messages in timely dataflow carry a single capability. This simplification is helpful in that for several programs the capability's timestamp is semantically meaningful: all data in the message are treated as "arriving" or "occurring" at that timestamp. However, we could enforce single-capability messages as a discipline rather than a requirement, if it was valuable to have multiple capabilities.

I'm currently writing the "high resolution" version of differential dataflow, in which each piece of data also carries its own timestamp. There have been several cases where it would be convenient for a message to support multiple capabilities.

The general situation is: an input frontier advances from one antichain to another, and we would like to commit and transmit all updates whose times lie between the two antichains. The elements of the first frontier are sufficient capabilities to send the set of updates, but no single element is necessarily sufficient. Nor can we mint a capability at the meet of the capabilities we do hold. What happens instead is that we determine for each capability which elements it is responsible for, and commit and send these separately. This results in

  1. more and smaller messages sent around,
  2. for differential dataflow more and smaller batches that cost when we need to traverse a collection trace,
  3. artificial serialization when processing keys (as we must partition times by capability),
  4. increased code complexity as we need to handle all of this segmentation logic ourselves.

It seems reasonable to have messages carry an arbitrary number of capabilities, although it is likely to result in serious breakage of existing code, and potentially annoying ergonomics in the future. We could very easily layer a "single capability" abstraction on top to prevent the breakage and ergonomic horror, but it would be very helpful to be able to send and receive messages with multiple capabilities.

frankmcsherry commented 7 years ago

As a historical note: back in Naiad times we considered (and were maybe thinking of moving towards) a message format which was roughly Vec<(Time, Vec<Data>)>, where each time was bound to a range of data.

I think this is a little different, in that it is mostly an efficient way to boxcar messages together. It doesn't permit efficient interleaving of data with varying times (at least, not if each time can result in independent progress tracking traffic). Currently, some differential dataflow operators (arrange, group) produce (or would like to produce) output that is in the future of some held capability, but not necessarily grouped by those capabilities.

frankmcsherry commented 6 years ago

This issue came up again in conversation with @antiguru. We have an application where an operator wants to send a hunk of data that should take effect "at a frontier", and we want to send it in a way that the recipient can understand when it has gotten all of the associated data. Typically we see code where something is sent "at a time" and we sit on the data until the frontier has passed the time, but this doesn't seem to apply here: while we can tell the recipient about the intended frontier to await, nothing about messages hold open the frontier except for the time we actually use as the capability.

Consider trying to send something that should take effect at the frontier { (0, 5), (2, 1) }, where we choose some timestamp at which to send the data. If we choose timestamp (0, 5) then it could be that our next frontier is { (0, 5), (3, 2) }, and although this frontier has moved forward from the previous frontier, which would prove that we have received all of the data for the previous frontier, we didn't keep either (2, 1) or (3, 2) around for the recipient to see the shift in the frontier. Instead it just learns "frontier still not past (0, 5)", because indeed it isn't. Of course had we sent (2,1) instead, we could have the problem the other way around.

So in addition to providing multiple capabilities to downstream operators, which can be useful but faked out by sending multiple messages, we have the perhaps deeper problem that it can be hard to hold open multiple times and allow a recipient to notice progress along each of them. I suspect this could be faked out as well, by sending messages at each of the times in the frontier and installing a sequence number and count and making the payload optional, at which point the recipient could reassemble the intended frontier, make sure it had received all elements of it, and then check the input frontier to see if it had passed. But what a mess.