riverqueue / river

Fast and reliable background jobs in Go
https://riverqueue.com
Mozilla Public License 2.0
3.22k stars 86 forks source link

Periodic Jobs question #357

Closed sheldondz closed 3 months ago

sheldondz commented 3 months ago

We are trying the new dynamic Periodic jobs feature in 0.6.0, and are facing the following issues

Schedule a job to run every day at 8AM, base on timezone

schedule, err := cron.ParseStandard("CRON_TZ=Asia/Calcutta  0 8 * * *")
    if err != nil {
        r.logger.Error("failed to add calendar event", log.Error(err))
    }
    handle := r.jobManager.Client.PeriodicJobs().Add(
        river.NewPeriodicJob(
            schedule,
            func() (river.JobArgs, *river.InsertOpts) {
                return domain.CalendarEventJobArgs{
                    EventType:      "test event"
                    EventId:       1
                }, nil
            },
            nil,
        ),
    )

After this code is executed there is no entry added in the river_job table, if we set RunOnStart we see an entry and the worker also executes but after that the periodic jobs does not repeat.

We also tried a simple test with

periodicJobs := []*river.PeriodicJob{
        river.NewPeriodicJob(
            river.PeriodicInterval(15*time.Second),
            func() (river.JobArgs, *river.InsertOpts) {
                return domain.CalendarEventJobArgs{}, nil
            },
            &river.PeriodicJobOpts{RunOnStart: true},
        ),
    }

    riverClient, err := river.NewClient(riverpgxv5.New(s.storage.Pool), &river.Config{
        Queues: map[string]river.QueueConfig{
            river.QueueDefault: {MaxWorkers: 100},
        },
        Workers:      workers,
        PeriodicJobs: periodicJobs,
    })
    if err != nil {
        s.logger.Error("failed to create river client", log.Error(err))
        return nil, err
    }

    // Run the client inline. All executed jobs will inherit from jobCtx:
    if err := riverClient.Start(ctx); err != nil {
        // handle error
        s.logger.Error("failed to start river client", log.Error(err))
        return nil, err
    }

Here too an entry is added in the DB the worker runs immediately, after that the worker does not run again after 15 sec. Is there something i am missing here?

brandur commented 3 months ago

Here's a fully functional code sample based on your code above:

package main

import (
    "context"
    "fmt"
    "log/slog"
    "os"
    "time"

    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/riverqueue/river"
    "github.com/riverqueue/river/riverdriver/riverpgxv5"
)

type CalendarEventJobArgs struct{}

func (CalendarEventJobArgs) Kind() string { return "calendar_event" }

type CalendarEventWorker struct {
    // An embedded WorkerDefaults sets up default methods to fulfill the rest of
    // the Worker interface:
    river.WorkerDefaults[CalendarEventJobArgs]
}

func (w *CalendarEventWorker) Work(ctx context.Context, job *river.Job[CalendarEventJobArgs]) error {
    fmt.Printf("%s: CalendarEvented ran\n", time.Now().Format(time.DateTime))
    return nil
}

func doStart(ctx context.Context, logger *slog.Logger) (*struct{}, error) {
    dbPool, err := pgxpool.New(ctx, "postgres://localhost/river-periodic-job-test")
    if err != nil {
        return nil, err
    }

    periodicJobs := []*river.PeriodicJob{
        river.NewPeriodicJob(
            river.PeriodicInterval(15*time.Second),
            func() (river.JobArgs, *river.InsertOpts) {
                return CalendarEventJobArgs{}, nil
            },
            &river.PeriodicJobOpts{RunOnStart: true},
        ),
    }

    workers := river.NewWorkers()
    // AddWorker panics if the worker is already registered or invalid:
    river.AddWorker(workers, &CalendarEventWorker{})

    riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
        Queues: map[string]river.QueueConfig{
            river.QueueDefault: {MaxWorkers: 100},
        },
        Workers:      workers,
        PeriodicJobs: periodicJobs,
    })
    if err != nil {
        logger.Error("failed to create river client", "err", err)
        return nil, err
    }

    // Run the client inline. All executed jobs will inherit from jobCtx:
    if err := riverClient.Start(ctx); err != nil {
        // handle error
        logger.Error("failed to start river client", "err", err)
        return nil, err
    }

    return nil, nil
}

func main() {
    ctx := context.Background()
    logger := slog.New(slog.NewTextHandler(os.Stdout, nil))

    if _, err := doStart(ctx, logger); err != nil {
        panic(err)
    }

    foreverChan := make(chan struct{})
    <-foreverChan
}

Run it, and you can see the job running every 15 seconds:

$ go run main.go
2024-05-17 11:03:22: CalendarEvented ran
2024-05-17 11:03:37: CalendarEvented ran
2024-05-17 11:03:52: CalendarEvented ran
2024-05-17 11:04:07: CalendarEvented ran
2024-05-17 11:04:22: CalendarEvented ran
2024-05-17 11:04:37: CalendarEvented ran
2024-05-17 11:04:52: CalendarEvented ran
2024-05-17 11:05:07: CalendarEvented ran
2024-05-17 11:05:22: CalendarEvented ran
2024-05-17 11:05:37: CalendarEvented ran
2024-05-17 11:05:52: CalendarEvented ran
2024-05-17 11:06:07: CalendarEvented ran
2024-05-17 11:06:22: CalendarEvented ran
2024-05-17 11:06:37: CalendarEvented ran

Here's the same thing in the database:

river-periodic-job-test=# select kind, state, finalized_at from river_job order by id;
      kind      |   state   |         finalized_at
----------------+-----------+-------------------------------
 calendar_event | completed | 2024-05-17 02:02:05.964429-07
 calendar_event | completed | 2024-05-17 02:02:20.959239-07
 calendar_event | completed | 2024-05-17 02:02:35.958579-07
 calendar_event | completed | 2024-05-17 02:02:50.956588-07
 calendar_event | completed | 2024-05-17 02:03:05.958406-07
 calendar_event | completed | 2024-05-17 02:03:22.724301-07
 calendar_event | completed | 2024-05-17 02:03:37.725189-07
 calendar_event | completed | 2024-05-17 02:03:52.723318-07
 calendar_event | completed | 2024-05-17 02:04:07.724498-07
 calendar_event | completed | 2024-05-17 02:04:22.72267-07
 calendar_event | completed | 2024-05-17 02:04:37.724326-07

Based on code not shown in your sample above:

  1. Is your job unique? If so, it may be excluded from periodic insert because a conforming job is already in the database.
  2. Does your program keep running after Client.Start's been invoked? If it ends too soon, obviously jobs won't be inserted.
sheldondz commented 3 months ago

@brandur Thanks for the detailed code sample, i think its because my job is unique its failing to run again. Will check this and update.

sheldondz commented 3 months ago

I can confirm that removing the uniqueOpts it works as expected. Thanks.

brandur commented 3 months ago

You got it!