feldera / feldera

Feldera Continuous Analytics Platform
https://feldera.com
Other
300 stars 33 forks source link

Input that arrives together can be split across multiple input steps #1980

Open blp opened 2 weeks ago

blp commented 2 weeks ago

ZSetHandle and IndexedZSetHandle use CollectionHandle::dyn_append() to add data as input to a circuit. That function in turn breaks a vector of data into one chunk per worker. It adds each chunk to its worker thread's input vector atomically, but the overall operation is not atomic. That means that, for example, with two workers W1 and W2 and hence two chunks C1 and C2, there are three possibilities:

This means that if, for example, one chunk (C1 or C2) advances a waterline and the other chunk (C2 or C1) contains event data that is dropped by the waterline after advancement, then the behavior will differ based on an internal race that can't be avoided (except by using only a single worker).

blp commented 2 weeks ago

@ryzhyk This is what I arrived at after our discussion this morning. Threading Kafka will exacerbate this race but I believe it is there already.

ryzhyk commented 2 weeks ago

@blp , I think this is correct. This means that we can reorder data within a buffer, effectively increasing lateness up to the largest possible difference between two timestamps in the same buffer. So we need to either make dyn_append atomic or make sure that it feeds records to the pipeline in the FIFO order (a weaker, but sufficient requirement).

But neither of these will solve the problem with multithreaded connectors reordering data. For that we also need some way to make sure that we have a complete prefix of messages read from a Kafka partition without gaps.

ryzhyk commented 2 weeks ago

We may also want to think about higher-level abstractions, like transactions, but that still requires the connector to set transaction boundaries correctly.