acaloiaro / neoq

Queue-agnostic background job library for Go, with a pleasant API and powerful features.
MIT License
270 stars 4 forks source link

Wrong queue handling job #56

Closed rcy closed 1 year ago

rcy commented 1 year ago

I have setup 2 queues and am adding jobs to one of them, and specifying a RunAfter to delay them. Some of the jobs are being handled by the wrong queue.

Perhaps I am very confused about how I am using this api... here's my program:

package main

import (
    "context"
    "log"
    "time"

    "github.com/acaloiaro/neoq"
    "github.com/acaloiaro/neoq/handler"
    "github.com/acaloiaro/neoq/jobs"
    "github.com/acaloiaro/neoq/types"
)

var be types.Backend

func main() {
    var err error
    ctx := context.Background()

    be, err = neoq.New(ctx)
    if err != nil {
        panic(err)
    }

    err = be.Start(ctx, "notify-delivery", handler.New(handleDelivery))
    if err != nil {
        panic(err)
    }
    err = be.Start(ctx, "notify-friend", handler.New(handleFriend))
    if err != nil {
        panic(err)
    }

    log.Print("started worker")

    for i := 1; i < 10; i++ {
        NotifyDelivery(int64(i))
    }

    time.Sleep(10 * time.Second)
}

func NotifyDelivery(deliveryID int64) (string, error) {
    log.Printf("NotifyDelivery %d", deliveryID)
    return be.Enqueue(context.Background(), &jobs.Job{
        Queue:    "notify-delivery",
        Payload:  map[string]any{"id": deliveryID},
        RunAfter: time.Now().Add(1 * time.Second),
    })
}

func NotifyFriend(friendID int64) (string, error) {
    log.Printf("Enqueue NotifyFriend %d", friendID)
    return be.Enqueue(context.Background(), &jobs.Job{
        Queue:   "notify-friend",
        Payload: map[string]any{"id": friendID},
    })
}

func handleDelivery(ctx context.Context) error {
    j, err := jobs.FromContext(ctx)
    if err != nil {
        return err
    }
    log.Printf("handleDelivery job id: %d, payload: %v", j.ID, j.Payload)
    return nil
}

func handleFriend(ctx context.Context) error {
    j, err := jobs.FromContext(ctx)
    if err != nil {
        return err
    }
    log.Printf("handleFriend job id: %d, payload: %v", j.ID, j.Payload)
    return nil
}

and the output:

go run .
2023/08/17 18:46:32 started worker
2023/08/17 18:46:32 NotifyDelivery 1
2023/08/17 18:46:32 NotifyDelivery 2
2023/08/17 18:46:32 NotifyDelivery 3
2023/08/17 18:46:32 NotifyDelivery 4
2023/08/17 18:46:32 NotifyDelivery 5
2023/08/17 18:46:32 NotifyDelivery 6
2023/08/17 18:46:32 NotifyDelivery 7
2023/08/17 18:46:32 NotifyDelivery 8
2023/08/17 18:46:32 NotifyDelivery 9
2023/08/17 18:46:33 handleDelivery job id: 5, payload: map[id:5]
2023/08/17 18:46:33 handleFriend job id: 1, payload: map[id:1]
2023/08/17 18:46:33 handleFriend job id: 2, payload: map[id:2]
2023/08/17 18:46:33 handleDelivery job id: 4, payload: map[id:4]
2023/08/17 18:46:33 handleDelivery job id: 2, payload: map[id:2]
2023/08/17 18:46:33 handleDelivery job id: 3, payload: map[id:3]
2023/08/17 18:46:37 handleFriend job id: 8, payload: map[id:8]
2023/08/17 18:46:37 handleFriend job id: 7, payload: map[id:7]
2023/08/17 18:46:37 handleFriend job id: 9, payload: map[id:9]
2023/08/17 18:46:37 handleFriend job id: 6, payload: map[id:6]

notice that both handleDelivery and handleFriend are called, it should only ever be handleDelivery.

acaloiaro commented 1 year ago

Thanks a lot for the reproduction code along with the report. This looks like a clear and obvious bug. I'll do some testing of my own and try to get this worked out.

acaloiaro commented 1 year ago

If you want to test out the branch on the PR, it should be good to go. I'm on a new system and still need to setup postgres and redis so I can run tests locally.

Once tests are passing locally and CI, I'll go ahead and merge it.

rcy commented 1 year ago

works here, thanks!

acaloiaro commented 1 year ago

Excellent!

As a side note, I'll look into making sure that PR merges don't auto close issues before I get confirmation from the opener.

Cheers