taylorchu / work

gocraft/work v2 prototype
MIT License
145 stars 22 forks source link

Seeing jobs dequeued/run multiple times for long-running cron jobs, despite same jobId/start time #92

Open ashley-walsh opened 1 year ago

ashley-walsh commented 1 year ago

Hi there,

I am trying to have a cron job that only runs once every X hours (or daily, etc.) despite there being multiple instances of my application. When the job is instantaneous (or close, i.e. prints a line and returns) it works as expected (we run the job with a UniqueJobId and our uniqueness constraint is triggered (see below).

...
foundJobs, err := c.BulkFindJobs(job.ID)
    if err != nil {
        return err
    }

    if len(foundJobs) > 0 && foundJobs[0] != nil {
        logrus.Warnf("Did not enqueue Job: %s in Queue: %s due to uniqueness constraint", job.ID, jobParams.jobQueueName)
        return nil
    }
...

But when the job is longer (our job takes several minutes to complete), the unique constraint is ignored across instances and the job is dequeued several times and wrongly runs several times (once per instance). We've tried to use InvisibleSec but we have found that other jobs just run after that time period-- i.e. if the job is set to run at 5:00 and InvisibleSec is 60, one instance's job runs (correctly) at 5:00 and another runs at 5:01. We've also tried to see what we can do with EnqueueDelay but that does not seem to be working either.

Any help/insight would be greatly appreciated! See below for how we are setting up our cron service.

// called on application start-up 
func main() {
...
redisClient := application.BuildRedisClient()
jobsClient := application.BuildJobsClient(redisClient)

core := core.New(
core.Config{
                     ...
JobsClient:            jobsClient,
RedisClient:           redisClient,
...
        })

... 

group.Go(func() error {
cron.CronHandler(jobsClient, context.Background())
return nil
})

}
// JobsHandler, also called on start-up
func JobsHandler(redisClient *redis.ClusterClient, handlerFunc work.ContextHandleFunc) {
    jobWorker := work.NewWorker(&work.WorkerOptions{
        Namespace: jobs.NAMESPACE,
        Queue:     work.NewRedisQueue(redisClient),
        ErrorFunc: func(err error) {
            log.Println(err)
        },
    })

    jobOpts := &work.JobOptions{
        MaxExecutionTime: time.Minute,
        IdleWait:         time.Second,
        NumGoroutines:    4,
        HandleMiddleware: []work.HandleMiddleware{
            logrus.HandleFuncLogger,
            catchPanic,
        },
    }

    for queueName := range jobs.JOB_QUEUES {
        jobWorker.RegisterWithContext(string(queueName), handlerFunc, jobOpts)
    }

    jobWorker.Start()
}
// cron service
package cron

import (
    "context"
    "main/entities/jobs"
    "main/lib/errors"

    "github.com/robfig/cron/v3"
)

func CronHandler(jobsClient jobs.Client, ctx context.Context) {
    c := cron.New()
    c.AddFunc("50 * * * *", func() { enqueueOurJob(jobsClient, ctx) })
    c.Start()
    return
}

func enqueueOurJob(jobsClient jobs.Client, ctx context.Context) {
    uniqueId := "uniqueId"
    enqueueJobParams, err := jobs.CreateEnqueueJobParams(jobs.CreateEnqueueJobParamsArgs{
        Name:         jobs.OurJob,
        UniqueJobId:  &uniqueId,
    }, &jobs.OurJobPayload{})

    err = jobsClient.EnqueueJob(ctx, *enqueueJobParams)
}

func (c *client) EnqueueJob(ctx context.Context, jobParams EnqueueJobParams) error {
    job := work.NewJob()

    if jobParams.uniqueJobId != nil {
        job.ID = *jobParams.uniqueJobId
    }

    if jobParams.enqueueDelay != nil {
        job = job.Delay(*jobParams.enqueueDelay)
    }

    if err := job.MarshalJSONPayload(string(jobParams.jobPayload)); err != nil {
        return err
    }

    foundJobs, err := c.BulkFindJobs(job.ID)
    if err != nil {
        return err
    }

        // uniqueness constraint
    if len(foundJobs) > 0 && foundJobs[0] != nil {
        logrus.Warnf("Did not enqueue Job: %s in Queue: %s due to uniqueness constraint", job.ID, jobParams.jobQueueName)
        return nil
    }

    err = c.enqueue(job, &work.EnqueueOptions{
        Namespace: NAMESPACE,
        QueueID:   string(jobParams.jobQueueName),
    })
    if err != nil {
        return err
    }

    return nil
}
taylorchu commented 1 year ago

I think you should use this middleware to wrap your enqueuer: https://pkg.go.dev/github.com/taylorchu/work@v0.2.9/middleware/unique.

It maps a job into a unique key and a unique duration.

The way how you did it will have race conditions.

ashley-walsh commented 1 year ago

Thank you so much for the fast response! The issue is, we have varying jobs already in existence that will require different unique durations (i.e. the cron job above runs daily, but others run every 3 days). Is there a non-duration solution? For example, is it possible to just base it off of whether the job ID is unique and whether a job with that same ID is currently in the queue?

Or if we are going about this incorrectly, what is the best way to proceed?

taylorchu commented 1 year ago

uniq function can switch by enqueue options https://pkg.go.dev/github.com/taylorchu/work@v0.2.9#EnqueueOptions too.