lovoo / goka

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
BSD 3-Clause "New" or "Revised" License
2.36k stars 173 forks source link

Behavior of View Sync #440

Open aditya-msd opened 1 year ago

aditya-msd commented 1 year ago

Recently I have encountered the below scenario .

I have 2 processors .

Processor 1 does some processing on a list of items and then stored the metadata like length of list , no of items and other info in its group table .

Processor 2 reads some message from a topic and does some processing. Additionally it also has a view of the group table of Processor 1 .

If Processor has processed 10 items , then in the group table , we would meta of sth like ListLength = 10 and so on. Additionally its also expected that Processor 2 would receive 10 items in its topic .

What I have seen is the following :

  1. Processor 2 reads say 1-2 items . and tries to get the metadata from the View of Processor 1 Table . It does not find it .
  2. Processor 2 reads say the next 8 items , this time its able to get the View of Processor 1 Table and then does processing .

Effectively , what happening is that the batch of 10 items is kind of stuck , since the first 2 items did not get processed cleanly due to the View not having the metadata .

How do I solve this problem , basically I want to keep track of the unprocessed 2 items , to effectively conclude the batch of 10 is fully processed . Is there a way to retry or some kind of push back to self topic or sth .

To me this looks like there is some time taken for the View to get updated .

frairon commented 11 months ago

Hey @aditya-msd, very sorry for the long response time - somehow I missed the issues here.

What you're describing is a series of processing steps, which require transactional semantics.

To me this looks like there is some time taken for the View to get updated .

You're right. When a processor writes a value to is table using SetValue, it takes time until the message is sent to Kafka and then consumed by the view again, so we cannot rely on that happening on time.

Right now, I would see three ways how to solve this: first, like you described, resending the input-message to a "retry"-topic. That could be achieved by sending it to the Loop-topic. Some times that's necessary if you want to avoid spoiling the input-topic as it maybe consumed by other processors as well. The problem with this approach is, that the processor needs to know when to re-schedule the message, which is sometimes not possible. Also you need to make sure the loop has an exit-criteria, otherwise messages might end up circulating forever if the processing-condition is never met.

The second approach means avoiding the use of views. For this we should consider the operations done by processors 1 and 2 as part of a single transaction. One of them starts, performs operations and then sends all information necessary (i.e. the state of the transaction) to processor 2, which does its operations, maybe sends the transaction back etc. That may look complex at first but it avoids any race conditions and is deterministic, so gives you less headaches later :).

Third option that works sometimes: we decouple the conflicting operations in a timely manner. For example:

Hope that helps, let me know if you have more problems or how you solved it :)

Thanks!