riverqueue / river

Fast and reliable background jobs in Go
https://riverqueue.com
Mozilla Public License 2.0
2.86k stars 68 forks source link

Track started state of start/stop services + cleaner functions instead of channels #415

Closed brandur closed 1 week ago

brandur commented 2 weeks ago

This is one that I've had on the backburner for a while, but deprioritized since it'll take review bandwidth and we had more important things going on. I pulled it up to the top because we've seen seeing a crazy rate of intermittent failures out of AddManyAfterStart in the periodic job enqueuer [1] [2]. I'm not entirely sure yet because I'm having trouble reproducing it locally, but reading the code I believe the problem is that we start the client and then start adding periodic jobs, but the start is a race condition because we're not guaranteed that the service performed a loop before the periodic jobs were added.

startService(t, svc)

svc.Add(
        &PeriodicJob{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)},
)
svc.Add(
        &PeriodicJob{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true},
)

If that is the case, I believe this change can help with that.

Its main feature is that it adds a Started channel to the start/stop infrastructure that's closed when a service finishes starting up:

type Service interface {
    ...

    // Started returns a channel that's closed when a service finishes starting,
    // or if failed to start and is stopped instead. It can be used in
    // conjunction with WaitAllStarted to verify startup of a constellation of
    // services.
    Started() <-chan struct{}

We then provide an easy helper called WaitAllStarted that lets callers easily wait for a series of startups to come up:

// WaitAllStarted waits until all the given services are started (or stopped in
// a degenerate start scenario, like if context is cancelled while starting up).
//
// Unlike StopAllParallel, WaitAllStarted doesn't bother with parallelism
// because the services themselves have already backgrounded themselves, and we
// have to wait until the slowest service has started anyway.
func WaitAllStarted(services ...Service) {
    ...

This allows us to modify tests like the periodic job enqueuer above to wait on a service start, thereby hopefully fixing our intermittency problems:

startService := func(t *testing.T, svc *PeriodicJobEnqueuer) {
    t.Helper()

    require.NoError(t, svc.Start(ctx))
    t.Cleanup(svc.Stop)

    startstop.WaitAllStarted(svc)
}

We also modify start/stop's returns slightly so that a started function is added to enable this new feature, but also changes stopped from a channel to a function, which looks a bit nicer and has better ergonomics compared to closing a channel:

func (m *QueueMaintainer) Start(ctx context.Context) error {
    ctx, shouldStart, started, stopped := m.StartInit(ctx)
    if !shouldStart {
        return nil
    }

    go func() {
        started()
        defer stopped() // this defer should come first so it's last out

        ...

[1] https://github.com/riverqueue/river/actions/runs/9770889034/job/26972789182?pr=413 [2] https://github.com/riverqueue/river/actions/runs/9770889034/job/26972789334?pr=413

brandur commented 2 weeks ago

@bgentry Okay this turned out not to fix the intermittency problem (opened #416 for that instead), but I still think it's not a bad refactor. Mind taking a look?

brandur commented 1 week ago

Thanks!