acaloiaro / neoq

Queue-agnostic background job library for Go, with a pleasant API and powerful features.
MIT License
270 stars 4 forks source link

No payload in job handler #135

Closed adamdecaf closed 5 hours ago

adamdecaf commented 6 hours ago

I've got the following code that's enqueuing jobs. I also had this as a JSON string, but neither option has worked.

    ctx := context.Background()
    jobID, err := w.queue.Enqueue(ctx, &jobs.Job{
        Queue: w.conf.Tracking.Queue.QueueName,
        Payload: map[string]interface{}{
            "type":   "repository",
            "source": r.Source,
            "owner":  r.Owner,
            "name":   r.Name,
        },
    })

Inside the job handler I'm not seeing the payload extracted from postgres.

func (w *Worker) handleJobs() handler.Handler {
    queueName := w.conf.Tracking.Queue.QueueName

    return handler.NewPeriodic(func(ctx context.Context) error {
        job, err := jobs.FromContext(ctx)
        if err != nil {
            return err
        }

        jobType, ok := job.Payload["type"].(string)
        if !ok {
            return fmt.Errorf("job=%d unexpected type: %T", job.ID, job.Payload["type"])
        }

        w.logger.Info(fmt.Sprintf("handling job %d", job.ID),
            slog.String("job_type", jobType),
        )

        switch strings.ToLower(jobType) {
        case "repository":
            return w.handleRepositoryJob(job)
        }

        return nil
    },
        handler.Concurrency(1),
        handler.Queue(queueName),
    )
}

Printing out the job gives a nil payload

&jobs.Job{ID:2, Fingerprint:"d7349e805971e0554e438b492f5fa755", Status:"new", Queue:"every_1_minutes_starting_at_1", 

Payload:map[string]interface {}(nil), 

Deadline:<nil>, RunAfter:time.Date(0, time.December, 31, 18, 9, 24, 0, time.Local), RanAt:null.Time{NullTime:sql.NullTime{Time:time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC), Valid:false}}, Error:null.String{NullString:sql.NullString{String:"", Valid:false}}, Retries:0, MaxRetries:(*int)(nil), CreatedAt:time.Date(2024, time.October, 25, 16, 11, 0, 1309000, time.Local)}
adamdecaf commented 5 hours ago

I've seen this happen a lot because the map isn't initialized before the read. I don't know if pgx.RowToAddrOfStructByName will properly handle this or not. I didn't see a pgx method where we could init the map before scanning the row, so this may just be a limitation of pgx/neoq.

https://github.com/acaloiaro/neoq/blob/v0.70.0/backends/postgres/postgres_backend.go#L1025

acaloiaro commented 5 hours ago

Hey @adamdecaf, I could be misunderstanding the situation here, but it looks like you're trying ot retireve the Payload of a Periodic job, since the handler shown above is created with handler.NewPeriodic.

Periodic jobs with payloads are not a use case that's been implemented. See this example for the expected way to use periodic jobs.

Maybe you can share what you have in mind for periodic jobs with payloads, if you want to help spec out a new feature.

adamdecaf commented 5 hours ago

Oh interesting. I'm trying to crawl a bunch of Github repos (mostly their forks) and was trying to use periodic jobs to do the scheduling for me. It's just handle to enqueue a bunch of recurring jobs with metadata on each.

I can use regular jobs and just enqueue the next run as at a future time since payloads are supported on regular jobs.

acaloiaro commented 5 hours ago

You might want to consider putting the logic of determining which repos to fetch in a periodic worker. That's how periodic jobs are intended to be used, and why they doesn't take a payload.

You can also schedule non-periodic jobs from periodic jobs. That way you can parallelize the work over more workers, since periodic jobs are serialized. E.g. fetch the repos-to-be fetched from the periodic job and schedule each fetch onto a separate, non-periodic queue with a bunch of workers/high concurrency.

adamdecaf commented 5 hours ago

Gotcha, I'll look at doing that. I figured jobs can be scheduled from other jobs. If it's not mentioned anywhere that periodic jobs don't provide a payload that'd be helpful for the next person. Thanks for your help!