riverqueue / river

Fast and reliable background jobs in Go
https://riverqueue.com
Mozilla Public License 2.0
3.59k stars 94 forks source link

Cron job not working. #640

Closed kashyap2108 closed 1 month ago

kashyap2108 commented 1 month ago

const (
    dsn       = "#"
)

type SortArgs struct {
    // Strings is a slice of strings to sort.
    Strings []string `json:"strings"`
}

type SortWorker struct {
    // An embedded WorkerDefaults sets up default methods to fulfill the rest of
    // the Worker interface:
    river.WorkerDefaults[SortArgs]
}

func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
    fmt.Printf("Received job: %+v\n", job)
    sort.Strings(job.Args.Strings)
    fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
    return nil
}

func (SortArgs) Kind() string { return "sort" }

func InitRiverClient(ctx context.Context) (*river.Client[pgx.Tx], error) {
    dbPool, err := pgxpool.New(ctx, dsn)
    if err != nil {
        return nil, fmt.Errorf("failed to create database pool: %w", err)
    }
    periodicJobs := []*river.PeriodicJob{
        river.NewPeriodicJob(
            river.PeriodicInterval(1*time.Minute),
            func() (river.JobArgs, *river.InsertOpts) {
                return SortArgs{Strings: []string{"banana", "apple", "cherry"}}, &river.InsertOpts{
                    MaxAttempts: 5,
                    Priority:    0,
                }
            },
            &river.PeriodicJobOpts{RunOnStart: true},
        ),
    }
    riverClient, err := river.NewClient[pgx.Tx](riverpgxv5.New(dbPool), &river.Config{
        Queues: map[string]river.QueueConfig{
            river.QueueDefault: {MaxWorkers: 100},
        },
        JobTimeout:   10 * time.Minute,
        MaxAttempts:  5,
        PeriodicJobs: periodicJobs,
        Workers:      RegisterWorkerTasks(),
        Logger:       slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})),
    })
    if err != nil {
        return nil, fmt.Errorf("failed to create River client: %w", err)
    }
    if err := riverClient.Start(ctx); err != nil {
        fmt.Println("Failed to start River client: %v", err)
    }
    // Listen for system interrupts for graceful shutdown
    go func() {
        sigs := make(chan os.Signal, 1)
        signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
        <-sigs
    }()

    // Wait for context cancellation
    <-ctx.Done()
    return riverClient, nil
}

func RegisterWorkerTasks() *river.Workers {
    riverWorkers := river.NewWorkers()
    river.AddWorker[SortArgs](riverWorkers, &SortWorker{})
    return riverWorkers
}

func main() {
    ctx := context.Background()
    runMigrations(ctx)
    client, err := InitRiverClient(ctx)
    if err != nil {
        log.Fatalf("Failed to initialize River client: %v", err)
    }
    defer client.Stop(ctx)

    // Wait for interrupt signal
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)
    <-c

    fmt.Println("Shutting down...")
}

func runMigrations(ctx context.Context) {
    dbPool, err := pgxpool.New(ctx, dsn)
    migrator, _ := rivermigrate.New[pgx.Tx](riverpgxv5.New(dbPool), nil)
    res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, nil) // Apply all available up migrations
    if err != nil {
        os.Exit(1)
    }
    for _, version := range res.Versions {
        fmt.Printf("Applied River migration version %d", version.Version)
    }
}```

have attached a demo code for cron job, no tasks are being picked up
bgentry commented 1 month ago

Are there any logs to go with this? What are you seeing at the database level in the river_job table? Are there rows for these jobs, do they have errors on them, etc?

bgentry commented 1 month ago

Without more info there's nothing I can do to help debug this. I can assure you that scheduled jobs do work in general—the River UI demo uses them exclusively in addition to underlying test coverage. If there's a more specific issue going on please let me know some of that extra info on what you're seeing in the databsae.