riverqueue / river

Fast and reliable background jobs in Go
https://riverqueue.com
Mozilla Public License 2.0
2.86k stars 68 forks source link

extract Client subscriptions into service #379

Closed bgentry closed 1 month ago

bgentry commented 1 month ago

This change extracts the Client subscriptions logic into a separate startstop.Service which can be started and stopped along with the other services. The important change that enables this is switching from a callback for job events to a channel for job events. The channel is passed to the completer during init, and the completer then owns it as the sole sender. When the completer is stopped, it must close the channel to indicate that there are no more job completion events to be processed.

This moves us closer to having all the key client services be able to be managed as a single pool of services, and they can all have their shutdown initiated in parallel (even though some must still wait for others to shut down first). Importantly, this paves the way for additional services to be added (even by external libraries) without needing to deal with more complex startup & shutdown ordering scenarios. I leveraged the startstop.StartStopBase type in all the completers to make this work cleanly.

Additionally, I updated the payload type for completer callbacks/notifications. Rather than sending a single job each time, they can now send slices of events. This is great for the batch completer, because it doesn't have to deal with mutexes or channel sends for every individual job it completed—instead it can send the whole batch through at once.

Currently based on #377.

bgentry commented 1 month ago

The test panic is almost certainly specific to the structuring of some specific tests and not reflective of anything we’ll hit in real usage. I’ll see if I can pin it down tomorrow.

bgentry commented 1 month ago

Can you add subscription_manager_test.go and put in some basic test cases? It'll be nice because it lets us test the struct in relative isolation. Probably just need a basic check or two and the standard StartStopStress.

@brandur the issue with startstoptest.Stress() is that it expects to be able to repeatedly Start() and Stop() a service with no additional steps. This subscriptionManager type doesn't quite work that way, at least not without the subscribeCh being closed prior to / concurrently with Stop(), and ResetSubscribeChan method being called in between runs. This is because its design relies on iterating over the subscribeCh in order to know when it has finished receiving all job events and can thus safely shut down, which is the best design I could think of for allowing these services to be capable of parallel shutdown while also waiting as necessary prior to shutting down.

I don't think there's an obvious way to extend the existing test to handle this different use case, however I will add some standalone test coverage to test the correct usage of this service.

bgentry commented 1 month ago

@brandur should be good to go, please take another look :pray:

bgentry commented 1 month ago

@brandur it's possible I'm misunderstanding it, but from what I'm seeing your proposal doesn't guarantee that the subscription manager will wait for all events prior to shutting down. That was the whole purpose of the channel and it was the reason that the subscription manager shutdown was previously a separate step after the services were shut down: being able to know that there are no more events coming, and therefore if we start closing client subscriptions they will not miss a single event (even after Stop() is called).

That particular property is one I think we need to preserve, as some of the upcoming stuff needs to be able to ensure that it has consumed and handled all incoming events for all processed jobs prior to stopping.

Might be easier to discuss offline?

bgentry commented 1 month ago

@brandur I definitely misread the patch on the first pass and it in fact looks good :pray: I've applied it here.