TimelyDataflow / timely-dataflow

A modular implementation of timely dataflow in Rust
MIT License
3.26k stars 271 forks source link

Local validation of progress updates #107

Open frankmcsherry opened 6 years ago

frankmcsherry commented 6 years ago

Ideally the progress API exposed by Operate trait, by which operators report input messages consumed, capabilities dropped and held, and output messages produced, would have some pleasant invariants about batches of updates by which we could increase our confidence in their global properties.

One intended (but not satisfied) invariant is "each newly held capability must be greater or equal to some consumed input message or held capability" and "each send output must have time greater or equal to some consumed input message or held capability".

This invariant is not currently satisfied because subgraphs eagerly report information about the future that are known to be true, but aren't justifiable yet.

For example, consider a subgraph with two inputs, managing an operator that consumes input and may hold capabilities. Imagine this graph is executed across multiple workers. One subgraph instance can receive a report from its managed operator that it has consumed a message and now holds a capability. The subgraph currently chooses to report that information upwards (it now holds a capability) but it is not yet in a position to indicate which of the two inputs the message came in through (that information is perhaps with the other worker, who performed the ingestion and sent a progress update relating this, but it has not yet been received).

Although this information is not incorrect, nor does it lead to errors in the protocol (known errors, at least), it is nonetheless confusing from the point of view of invariants maintained. The subgraph appears to be claiming a capability without consuming any input messages. There is the intent to do so in the future, and in this case we know that the only way the report of a consumed message can arrive is through the subgraph.

Perhaps the subgraph should delay this information, even though it knows it will happen, until the progress update "makes sense". It could wait until it hears about the message that justified the capability, and only then report claiming the capability; as long as the system only needs to know about the frontier, this would be the first moment it could advance to the frontier because until this point the unacknowledged message blocked the frontier.

This would allowed us to get closer to imposing invariants on each batch of updates, which should increase confidence in the protocol, at the expense of complicating the responsibility of the subgraph. On the positive side, this buffering would result in less movement of progress updates, communicating changes less frequently.

frankmcsherry commented 4 years ago

Recent work observes that our current progress validation was too relaxed to catch all errors (#327) but the fix ended up being too severe and caught non-errors (#331). One conclusion is that, yet again, we might want a more locally verifiable protocol.

One fix that I think makes some sense now (although it did not when this issue was filed) is simply to not propagate SharedProgress updates when they are not well-formed. The code should be structured so that there is no obligation to drain produced updates, and a simple local test could determine if the updates are locally supported ("upright", in the terms of the TLA proof). This would "slow down" progress updates, but only in cases where the results should not affect the frontier anyhow (correctness of the protocol would imply that there is a positive record yet to be subtracted that precedes non-upright updates). It might even improve performance if it increases batching of updates.

On the down-side: I'm not sure of a great way to perform the validation that is not potentially quadratic in the number of updates. For each positive update we need to identify a negative update that precedes it, where the "precedes" relation need not be a total order. We have historically had issues with quadratic logic introducing instability (as batches get larger the code gets slower, which has spiraled in to collapse).

cc @utaal