riverqueue / river

Fast and reliable background jobs in Go
https://riverqueue.com
Mozilla Public License 2.0
2.86k stars 68 forks source link

Dynamic queue handling #396

Open PumpkinSeed opened 3 weeks ago

PumpkinSeed commented 3 weeks ago

We have a use-case where we need to add queues dynamically. The description of the use-case: we have multiple Kind of jobs, and one particular is the manage_rewards. This Kind connects to a server where the rate limit handled per route. Sometimes happens that one route blocks the other jobs because it's not handled properly so we figured out that we create queues per route, since those handled parallel. But for this we want to add queues in runtime.

I tweaked the code and created the following method:

func (c *Client[TTx]) AddQueue(queue string, queueConfig QueueConfig) {
    archetype := &baseservice.Archetype{
        Logger:     c.config.Logger,
        Rand:       randutil.NewCryptoSeededConcurrentSafeRand(),
        TimeNowUTC: func() time.Time { return time.Now().UTC() },
    }

    c.producersByQueueName[queue] = newProducer(archetype, c.driver.GetExecutor(), &producerConfig{
        ClientID:          c.config.ID,
        Completer:         c.completer,
        ErrorHandler:      c.config.ErrorHandler,
        FetchCooldown:     c.config.FetchCooldown,
        FetchPollInterval: c.config.FetchPollInterval,
        JobTimeout:        c.config.JobTimeout,
        MaxWorkers:        queueConfig.MaxWorkers,
        Notifier:          c.notifier,
        Queue:             queue,
        RetryPolicy:       c.config.RetryPolicy,
        SchedulerInterval: c.config.schedulerInterval,
        StatusFunc:        c.monitor.SetProducerStatus,
        Workers:           c.config.Workers,
    })
    c.monitor.InitializeProducerStatus(queue)
}

After that I ran the following:

if err := riverClient.Stop(ctx); err != nil {
    t.Fatal(err)
}

riverClient.AddQueue("new_queue", river.QueueConfig{
    MaxWorkers: 10,
})

if err := riverClient.Start(ctx); err != nil {
    t.Fatal(err)
}

If you agree with these changes or you have any suggestion let me know and I can send a PR about it. We tested with 1800 queue's for the same Kind with 10.000 jobs per queue. That worked all fine.

Note we are using River in production with millions of jobs and that works awesome.

brandur commented 3 weeks ago

Note we are using River in production with millions of jobs and that works awesome.

That's awesome to hear! Thanks for checking in about it.

Code looks mostly right, with my only notes that instead of initializing a new archetype, it's better just to reuse the one from client.baseService, and since the newProducer code is quite a large block that gets repeated twice and might be prone to drift, we'd probably want to extract into its own function. We also might need some sort of mutex on the producers map in case AddQueue is being called simultaneously with start/stop.

@bgentry Thoughts on this one? I'm not sure I'd generally recommend this as a pattern, but it doesn't seem too harmful to allow it.

bgentry commented 3 weeks ago

I'm not opposed to this idea, though as my comments in #399 show it's more complex than it may seem at first glance. Particularly for removal because of complex shutdown considerations. We can continue the discussion on specifics in that PR now that it's open.