contribsys / faktory

Language-agnostic persistent background job server
https://contribsys.com/faktory/
Other
5.66k stars 226 forks source link

Fail jobs immediately when worker crashes #468

Closed section1Q84 closed 5 months ago

section1Q84 commented 5 months ago

When worker crashes jobs sits in busy until it's expired then requeued. Is any way to fail and requeues job as soon as possible? Thanks in advance!

mperham commented 5 months ago

you can lower the reservation time for that job type using the “reserve_for” attribute. The default is 1800 seconds but lowering it to 60 seconds or 300 seconds will get you quicker retries. Also, keep in mind that if your job is crashing the worker process, you don’t want to retry too quickly or your app will spend a significant amount of time simply booting the worker process.

section1Q84 commented 5 months ago

@mperham Thanks for reply. I do a job that sending http request to check order status every 10 seconds in 10 mins. That means this job will sits in busy for 10 mins. But When I restart faktory for app upgrade in production, the job won't be retried until reach reserve_for. So I lost my order status in the remain time because of the every 10 seconds order query function won't be called.

I saw the doc says:

Upon seeing "terminate", the worker process should wait up to N seconds for any remaining jobs to finish. After 25 seconds (see below), the worker should send FAIL to Faktory for those lingering jobs (so they'll restart) and exit.

Then I test for this but what I found is that the worker seems don't send FAIL to Faktory for those lingering jobs (so they'll restart), because code is hangs in https://github.com/contribsys/faktory_worker_go/blob/main/manager.go#L122:

func (mgr *Manager) Terminate(reallydie bool) {
    mgr.mut.Lock()
    defer mgr.mut.Unlock()

    if mgr.state == "terminate" {
        return
    }

    mgr.Logger.Info("Shutting down...")
    mgr.state = "terminate"
    close(mgr.done)
    mgr.fireEvent(Shutdown)
->  mgr.shutdownWaiter.Wait()
    mgr.Pool.Close()
    mgr.Logger.Info("Goodbye")
    if reallydie {
        os.Exit(0) // nolint:gocritic
    }
}

Until the busy jobs finish, then mgr.Pool.Close().

I try to figure out a workaround, I write code as the following:

faktoryMgr.On(worker.Shutdown, func(manager *worker.Manager) error {
    manager.Pool.With(func(conn *faktory.Client) error {
        var job *faktory.Job
        var e error
        for {
            job, e = conn.Fetch("labors")
            if e != nil {
                return e
            }
            if job == nil {
                return nil
            }
            conn.Fail(job.Jid, errors.New("force labors jobs fail because of app restarting"), nil)
        }
    })
    return nil
})

However, the Fail method causes the task to fail directly, rather than pushing it into retries.

mperham commented 5 months ago

It sounds like your deploy is killing the worker process instead of giving it time to finish pending jobs and shut down cleanly. The Faktory worker library does attempt to FAIL unfinished jobs so it can immediately restart them after restart but it can’t do this if you don’t give it time to shutdown. It’s common to make this mistake with Kubernetes.

section1Q84 commented 5 months ago

I give it time to shutdown, but I found that mgr.shutdownWaiter.Wait() will wait up to 10 mins in my case. Because I use channel to force job running for 10 minutes. What I wonder is that if there is a method to force job to retry immediately?

mperham commented 5 months ago

Ah right, it looks like FWR does but FWG doesn't have a hard shutdown timer. It will wait minutes for jobs to finish. That's not really desired; I should implement a 30 sec hard shutdown.

mperham commented 5 months ago

Fixed.