reugn / go-quartz

Minimalist and zero-dependency scheduling library for Go
https://pkg.go.dev/github.com/reugn/go-quartz/quartz
MIT License
1.79k stars 84 forks source link

Jobs are taken out of the queue when being re-scheduled #119

Closed 0xavi0 closed 8 months ago

0xavi0 commented 8 months ago

Description I found this when stress testing quartz. I saw this code takes a job being triggered out of the jobs queue in order to apply the next run time and reschedule.

When running a job, if the scheduler is running that part of the code and, in a different routine, we try to get that job (for deleting it, modifying or whatever) we get a job not found error. The scheduler will push the job again to the queue with the new next run time, but the other routine could be creating a duplicate job because it could not find the current one.

The queue is just pushing jobs, so my understanding is that job duplication is possible.

For example, this simple code might be duplicating jobs if GetScheduledJob is called right at that part of the code mentioned above and the job is out of the queue for being rescheduled.

jobKey := quartz.NewJobKey("test")
jobDetail, err = scheduler.GetScheduledJob(jobKey)
if err != nil {
  // job was not found
  scheduler.ScheduleJob(quartz.NewJobDetail(NewTestJob("test"), jobKey), quartz.NewSimpleTrigger(time.Millisecond*JobCycle))
}

This is an example to recreate the issue: (at least I'm able to recreate in my system)

package main

import (
    "context"
    "fmt"
    "os"
    "time"

    "github.com/reugn/go-quartz/quartz"
)

const (
    JobCycle       = 10
    ChangeJobCycle = 30
    GetRetries     = 3
)

// --------------------- TestJob -----------------------------
var _ quartz.Job = &TestJob{}

type TestJob struct {
    Name string
}

func testJobDescription(name string) string {
    return fmt.Sprintf("testjob-%s", name)
}

func TestJobKey(name string) *quartz.JobKey {
    return quartz.NewJobKey(name)
}

func NewTestJob(name string) *TestJob {
    return &TestJob{
        Name: name,
    }
}

func (j *TestJob) Execute(ctx context.Context) error {

    j.doTheJob(ctx)

    return nil
}

func (j *TestJob) Description() string {
    return testJobDescription(j.Name)
}

func (j *TestJob) doTheJob(_ context.Context) {
    fmt.Printf("This is the super job, my name is: %s\n", j.Name)
}

// --------------------- END TestJob -----------------------------

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    scheduler := quartz.NewStdScheduler()

    scheduler.Start(ctx)

    name := "TEST-JOB"
    jobKey := quartz.NewJobKey(name)
    // Schedule the job.
    scheduler.ScheduleJob(quartz.NewJobDetail(NewTestJob(name), jobKey), quartz.NewSimpleTrigger(time.Millisecond*JobCycle))

    // This will change the job name every ChangeJobCycle milliseconds
    go changeJob(ChangeJobCycle, scheduler, name)

    scheduler.Wait(ctx)
}

func getScheduledJobWithRetries(scheduler quartz.Scheduler, key *quartz.JobKey, retries int) (quartz.Job, error) {
    var err error
    var jobDetail quartz.ScheduledJob
    for retries > 0 {
        jobDetail, err = scheduler.GetScheduledJob(key)
        if err == nil {
            return jobDetail.JobDetail().Job(), err
        }
        // This should not be executed, as the job is scheduled initally and it's never intentionally deleted
        fmt.Printf("RETRYING because of error: [%v]. Number of retries left: [%d]\n", err, retries)
        time.Sleep(1 * time.Millisecond)
        retries--
    }
    return nil, err
}

func changeJob(waitTime time.Duration, scheduler quartz.Scheduler, name string) {
    for {
        time.Sleep(waitTime * time.Millisecond)
        jobKey := quartz.NewJobKey(name)
        job, err := getScheduledJobWithRetries(scheduler, jobKey, GetRetries)
        if err != nil {
            fmt.Printf("Could not get job after %d retries: %v\n", GetRetries, err)
            os.Exit(2)
        }
        myjob, ok := job.(*TestJob)
        if !ok {
            fmt.Printf("The job %s was not a TestJob\n", job.Description())
            return
        }
        fmt.Printf("\n*** I got the job with name: %s, changing the name ***\n\n", myjob.Name)
        myjob.Name = time.Now().String()
    }
}

If you run that code you will see it retries calling GetScheduledJob, which should never happen because the job is scheduled and never deleted. The example just gets the job every ChangeJobCycle milliseconds and changes the Name of the job.

In order to see the retries without the rest of the noise you can do:

$ go run main.go | grep "RETRYING"

I'm getting a few lines:

RETRYING because of error: [job not found: default::TEST-JOB]. Number of retries left: [3]
RETRYING because of error: [job not found: default::TEST-JOB]. Number of retries left: [3]
RETRYING because of error: [job not found: default::TEST-JOB]. Number of retries left: [3]
RETRYING because of error: [job not found: default::TEST-JOB]. Number of retries left: [3]
RETRYING because of error: [job not found: default::TEST-JOB]. Number of retries left: [3]

I was able to recreate this in master and also in 0.9.0 (I haven't checked other versions)

Expected behaviour I would expect to get the job even if it's being executed (or rescheduled) at that moment.

Suggestion Apply next time to run without taking the job out of the queue. Maybe using a pointer for priority just like for job in scheduledJob and just changing the priority value.

reugn commented 8 months ago

@0xavi0, you are right. This effect is likely to occur under load and can be mitigated using retries. A straightforward solution would be to place the code responsible for reading and rescheduling the job under the scheduler's lock, and to add that lock to other relevant methods. Let's keep this issue open to consider alternative options.

reugn commented 8 months ago

Pull request #121 ensures the atomicity of the fetchAndReschedule operation by acquiring the scheduler's mutex to lock the critical section. Please let me know if you are still able to reproduce the issue after applying the fix.

0xavi0 commented 8 months ago

I'm not able to reproduce with https://github.com/reugn/go-quartz/pull/121. Thanks for the fix! Any plan when this is going to be released?

reugn commented 8 months ago

The plan is to release next week.

reugn commented 8 months ago

v0.11.2 has been released.

0xavi0 commented 8 months ago

Thanks!