contribsys / faktory

Language-agnostic persistent background job server
https://contribsys.com/faktory/
Other
5.71k stars 228 forks source link

Faktory Batch Inconsistencies #402

Closed sravi4048 closed 2 years ago

sravi4048 commented 2 years ago

Are you using an old version? Have you checked the changelogs to see if your issue has been fixed in a later version?

https://github.com/contribsys/faktory/blob/master/Changes.md https://github.com/contribsys/faktory/blob/master/Pro-Changes.md https://github.com/contribsys/faktory/blob/master/Ent-Changes.md

When trying this batch logic, some of our jobs are getting dropped. The uuids variable represents a list of uuids we get from querying our db. We think this issue is due to a race condition, as a different number of jobs is being executed each time the code is run, and the jobs that are executed are not the same each time.

client, _ := faktory.Open()
b := faktory.NewBatch(client)
b.Success = faktory.NewJob("Successful job")
b.Jobs(func() error {
uuids, err := source.FetchAllConversationUUIDs(c)
if err != nil {
    return fmt.Errorf("Couldn't retrieve Conversation Ids: %v\n", err)
}
for _, id := range uuids {
    fmt.Printf("Batch sees convo ID: %v\n", id)
    job := faktory.NewJob(UPDATE_SINGLE_CONVERSATION_ANALYTICS_JOB, types.ConversationId(id.String()))
    err = b.Push(job)
    if err != nil {
        return fmt.Errorf("Couldn't push faktory job: %v\n", err)
    }
}
time.Sleep(5 * time.Second)
return nil
})
mperham commented 2 years ago

Can you give me a self-contained piece of code which reproduces the issue?

MoreNaruto commented 2 years ago

So to give some more context, we're able to see all the UPDATE_SINGLE_CONVERSATION_ANALYTICS_JOB Jobs being processed by the faktory worker with the Batch Id attached to each job. So this piece of logic works as intended:

err: = faktoryManager.Pool.With(func(client * faktory.Client) error {
    b: = faktory.NewBatch(client)
    b.Success = faktory.NewJob("Successful job")
    fmt.Println("Created batch no issues")
    return b.Jobs(func() error {
        var ids []int
    for i := 0; i< 10000; i++ {
        ids = append(ids, i)
    }
        for _, id: = range ids {
            job: = faktory.NewJob(UPDATE_SINGLE_CONVERSATION_ANALYTICS_JOB, id)
            err = b.Push(job)
            if err != nil {
                return fmt.Errorf("Couldn't push faktory job: %v\n", err)
            }
        }
        fmt.Println("Pushed faktory jobs no issues")
        return nil
    })
})

The problem is that most of the jobs aren't actually received by our faktory manager. Here's how it currently looks:

mgr: = worker.NewManager()
mgr.Concurrency = 5

mgr.ProcessStrictPriorityQueues("critical", "default", "bulk")
faktoryMgr.Register(UPDATE_SINGLE_CONVERSATION_ANALYTICS_JOB, func(ctx context.Context, args...interface {}) error {
    fmt.Printf("The id: %v has been received", args[0])
    return nil
})
go func() {
    err: = faktoryMgr.Run()
    if err != nil {
        faktoryMgr.Terminate(false)
    }
}()

From looking at my logs for the message: "The id: %v has been received", it shows up for some ids. Not sure if we might have the Faktory Manager incorrectly configured to handle batch jobs

mperham commented 2 years ago

Do you have other instances running, pulling jobs from that queue too? Possibly zombie worker processes?

mperham commented 2 years ago

Do you see any other processes on the Busy page?

MoreNaruto commented 2 years ago
Screen Shot 2022-05-06 at 11 29 42 AM

So we have two instances running at once. TLDR: We want to have our registered function process in on a different instance from our main app. Our main app sends a job and that's it. So does two concurrent processes causes problems? Right now, the app faktory manager has the same setup except the register logic is different

faktoryMgr.Register(UPDATE_SINGLE_CONVERSATION_ANALYTICS_JOB, func(ctx context.Context, args...interface {}) error {
    return nil
})
mperham commented 2 years ago

You target a specific subset of workers by using a different queue for that type of job. The worker process configuration controls which queues to fetch jobs from.

MoreNaruto commented 2 years ago

Awesome! That fixed the problem! Thanks Mike!