flipkart-incubator / varadhi

Apache License 2.0
26 stars 4 forks source link

add Core consumer impl; capable of processing messages from a consumer client. #113

Open gauravAshok opened 8 months ago

gauravAshok commented 8 months ago

Consumer task? (ctx(event_loop))

todo: explore if post_process needs to be batched!, everything before the post_process is batched upto some extent. post_process is the only step that is per msg. enqueue from outside onto the ctx, is also a candidate for contention. Should the event-loop be responsible for http/remote requests? probably not, as one is highly IO bound and other is compute bound. IO threads need to be responsive.

aayustark007-fk commented 8 months ago

How do we handle Consumer failure scenarios and prevent duplicate delivery during recovery? Eg: Consumer task fails post ConcurrencyControl::deliver for some of the messages. So, the offset is not committed to the messaging stack. Now, when the consumer comes back up, or the workload is assigned to another consumer. It will try to deliver those messages again. Are we fine with that?

gauravAshok commented 1 month ago

No progress on it yet. expected to pick this up on 28th oct.

anuj-flipkart commented 1 week ago

Pending:

  1. Stopping of consumer.
  2. Async HTTP Client implementation
  3. Testing
gaurav-ashok commented 6 days ago

Testing in progress. E2E consumption is happening.

2024-11-20 04:43:58 INFO  ProcessingLoop:108 - Delivered message. status: 200
2024-11-20 04:43:58 INFO  ProcessingLoop:108 - Delivered message. status: 200
2024-11-20 04:43:58 INFO  ProcessingLoop:108 - Delivered message. status: 200
2024-11-20 04:43:58 ERROR ProcessingLoop:75 - unexpected error in fetching messages from msgSelector
java.lang.IllegalStateException: Only one request is allowed at a time
    at com.flipkart.varadhi.consumer.MessageSrcSelector.nextMessages(MessageSrcSelector.java:48) ~[classes/:?]
    at com.flipkart.varadhi.consumer.processing.ProcessingLoop.run(ProcessingLoop.java:72) ~[classes/:?]
    at com.flipkart.varadhi.consumer.concurrent.EventExecutor.runSafely(EventExecutor.java:81) ~[classes/:?]
    at com.flipkart.varadhi.consumer.concurrent.EventExecutor.run(EventExecutor.java:68) ~[classes/:?]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
    at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]

But looks like there some invariants being violated. Need to debug them.