ThreeDotsLabs / watermill

Building event-driven applications the easy way in Go.
https://watermill.io
MIT License
7.73k stars 407 forks source link

[watermill-redisstream] Subscribers can pick up up to 2 tasks while busy with another task #484

Open wk8 opened 1 year ago

wk8 commented 1 year ago

The current implementation for subscribers essentially starts 3 different routines:

  1. one reads messages from the stream, and pushes it to a channel
  2. another one monitors pending messages, and claims them when relevant, pushing them to that same channel
  3. the 3rd one listens to that channel, and actually processes them

The only thing that prevents the first 2 from reading too fast is that writes to the channel block while messages are being processed by the 3rd routine. However, nothing prevents each of the first 2 routines from claiming task while the 3rd routine is already busy processing an earlier message.

This might not be a problem for applications where at least one of the following is true:

  1. all tasks are processed very fast
  2. It doesn't really matter if some tasks wait a little longer than others to be picked up However, if one has tasks that can be rather slow to be processed, what ends up happening is that workers that are already busy pick up more work even though some other workers are sitting idle.

It's not too easy to solve this problem without also somewhat affecting performance for applications that have a lot of very quick tasks, since it would require to stop claiming tasks "ahead of time" while the worker is still processing an ongoing task. I have completely re-written subscribers in my fork to better fit my use case, but not sure how to make that a patch that you'd be okay merging here. Ideas/feedback welcome.

(on the plus side, not having all the extra routines does make the new implementation quite cleaner ;) less internal communication channels, everything is controlled simply with contexts)

AlexCuse commented 1 year ago

This does sound like a general backpressure problem but I am not sure I am following ThreeDotsLabs/watermill-redisstream#2 here. Is this attempting to claim messages published in-process before they hit redis?

I don't know the codebase here well but happy to work with you on a PR - in general talking with @roblaszczak we've tried to favor simplicity and predictability over performance so maybe the current behavior could be reintroduced as an "opt-in" behavior on top of your patch.

The way we do this kind of thing in the NATS middleware is to start N subscriptions in the client bound to their own goroutines, but each goroutine is trying to feed into the unbuffered subscription channel. So you tune your preference between performance and predictability by changing that number. You can see how that works a little looking at the benchmark readme - the benchmark is setup to use 1 or 3 subscribers per core in those 2 cases.