crclark / fdb-streaming

trying out FoundationDB with a toy stream processing library
BSD 3-Clause "New" or "Revised" License
4 stars 0 forks source link

User-specified triggers #14

Open crclark opened 4 years ago

crclark commented 4 years ago

Once data flows into a table, it's resting unless we specify some conditions under which it should be sent downstream again. Beam calls these conditions "triggers". They can be a function of time, the state of the aggregation in each row, etc. In cases where we are updating the table row with atomic ops, we must be careful to not pay the performance penalty of reading the table state in the transaction unless the user has specified a trigger that reads the table state.

crclark commented 4 years ago

we must be careful to not pay the performance penalty of reading the table state in the transaction unless the user has specified a trigger that reads the table state.

thought about this more and realized that in many basic use cases, this could cause crazy contention. It might make more sense to only allow periodic triggers and make the user filter downstream.

Couple semi-related thoughts:

  1. think of an example where a user really wants to express scan when the underlying fold is commutative and events arrive in arbitrary order -- does it ever make sense?
  2. In some use cases, we want to hide/censor the table contents until something has happened. For example, if we are grouping by user shopping sessions, aggregating total user spending, then sending that downstream to an average-revenue-by-store aggregation, we don't want to emit the user spending unless the user actually checks out -- abandoned carts should never be included in the total. This is kind of a silly example since we could just aggregate checkout events instead. Anyway, in this case we don't want to send the aggregation downstream until it is "finished" in some sense. Would these conditions usually be monotonic? Does that suggest some optimization?
crclark commented 4 years ago

For a periodic time-based trigger, we want to know if key k has been written to since the last time the trigger flushed. If k was written to repeatedly, we shouldn't write multiple k/v pairs -- each one wastes space and increases superfluous work downstream.

The trouble is doing this in such a way that neither the table writer nor the trigger process experiences conflicts.

crclark commented 4 years ago

rough idea:

read from the same upstream source as the table writer. Maintain set of distinct table keys seen in memory, flush messages containing table keys periodically based on time or size of the set. Downstream reader has to follow one level of indirection to get the message from the table, but that seems unavoidable, unless the downstream reader wants scan behavior on data that could be arbitrarily stale. Assumption is that the stream wants to always be showing the freshest possible data, so send the key and have the reader look up the latest value in the table.

I have a vague strategy that uses two pointers: a lead pointer keeping track of what was last read into our in-memory set, and a lagging pointer keeping track of the oldest message that's in our in-memory set that hasn't been flushed downstream yet. If recovering from a crash, move lead pointer back to equal lag pointer. This general strategy could be helpful for a lot of things. To track when to advance the lag pointer, could we use a priority search queue? It could simultaneously track the minimum versionstamp in memory, along with tracking the set of distinct keys. Not sure how to enforce the trigger-every-n-seconds policy with this solution, though -- flush at minimum every n seconds? Like any reader, we could have fallen arbitrarily far behind. This two-pointer strategy could be applied to many other operations, too.

Problems:

  1. What if high write throughput and all keys are unique? We could stall.
  2. Two readers reading and computing the same keys on the same data -- could the table writer do this itself to save work?

Another idea: be even lazier. The thing that consumes the trigger stream is responsible for reading the topic upstream of the table, collecting distinct tableKeys, using them to periodically look up table values, etc. It could even be one of the new data sources in https://github.com/crclark/fdb-streaming/issues/25.