TimelyDataflow / differential-dataflow

An implementation of differential dataflow using timely dataflow on Rust.
MIT License
2.54k stars 183 forks source link

join: count effort on inputs to the `results` closure #389

Closed teskje closed 1 year ago

teskje commented 1 year ago

Previously, the join operator's effort counting was based on the outputs of the passed results closure. This had the unfortunate effect that fueling would become ineffective in cases where the results closure always returns empty iterators. In such scenarios, the join operator would only yield once it has exhausted its inputs, negatively impacting concurrent operators and possibly application interactivity.

Having the results closure return empty iterators is useful when the caller does not care about the results of a join anymore (e.g. when the dataflow is shutting down). By returning nothing from results, the updates queued up before the join can be drained quickly, without feeding additional updates to downstream operators.

This commit attempts to improve the situation by changing the way the join operator counts its effort. Instead of counting the number of results outputs, we can count the number of inputs. That way, even if results decides to stop emitting updates, join fueling continues to work as expected.

The new behavior is consistent with the half join operator from dogsdogsdogs, which also uses the input, rather than the output, of the output_func for its work counting.

Motivation

The specific motivation for this PR is https://github.com/MaterializeInc/materialize/pull/18927, in which we attempt to speed up shutdown of join dataflows by stopping emission of updates from the join closure once the dataflow cancellation was observed. This strategy works nicely with delta joins but degrades interactivity with linear joins due to the different effort counting behavior.

Performance

I added performance measurements for this change to the "Linear Join" section of https://github.com/MaterializeInc/materialize/pull/18927#issuecomment-1521346755. In summary, we should expect ~1% of slowdown due to incrementing effort every time we invoke the join closure, rather than only once at the end.

I was wondering if we could instead estimate the effort upfront using:

effort += thinker.history1.edits.len() * thinker.history2.edits.len();

That would probably mitigate the performance impact, but might also be incorrect. It looks like the thinker spends quite some effort to avoid a quadratic output.

Relevant Slack discussion.

teskje commented 1 year ago

Closing in favor of #390.