OptimalBits / bull

Premium Queue package for handling distributed jobs and messages in NodeJS.
Other
15.57k stars 1.43k forks source link

Missing process handler in server-worker setup #1367

Open omairvaiyani opened 5 years ago

omairvaiyani commented 5 years ago

We have a setup wherein some servers only add jobs to the queue, and some are set as workers to register and process the queue.

However, the servers receive the following error when adding to the queue:

Error: Missing process handler for job type AssignmentsIssueUpdate
    at Queue.processJob (/var/app/current/node_modules/bull/lib/queue.js:1006:7)
    at process._tickCallback (internal/process/next_tick.js:68:7)

The servers do not define a process handler, we only want the workers to process the queue. Is this an expected error?

Bull version

Latest

omairvaiyani commented 5 years ago

Digging into the docs, I think I know what the issue is

I'm using named jobs, and under your guides, it states the following:

### Named jobs
It is possible to give names to jobs. This *does not change any of the mechanics of the queue* but can be used for clearer code and better visualization in UI tools:

But then states:

Just keep in mind that every queue instance require to provide a processor for every named job or you will get an exception.

Does this not break the "mechanics of the queue"? The general documentation gives an example of having a Producer queue and a counter-part Consumer queue, presumably on different instances - how can we implement named jobs with this pattern?

stansv commented 5 years ago

As I know, once you call process() on a Queue object instance, you have to support all job types which can be added to appropriate Redis queue (i.e. you shoud also call process() on that Queue instance and supply handlers for each type). So that producer should work if you do not call process():

new Queue("my-first-queue").add("AssignmentsIssueUpdate", {
    foo: "bar"
});
omairvaiyani commented 5 years ago

Okay got you, I had to re-read that a few times - what you're saying is that its all-or-none. If an instance, in our case, the Producer server tries to process even 1 job type, it must process ALL job types. That's helpful, thank you.

stansv commented 5 years ago

@omairvaiyani correct, it's just an alternative to writing one huge job handler with switch-case block inside, and if there's a job of unsupported type you won't be able to return job back to queue in default case. I'd recommend to use separate queues for each job type if you want workers to ignore jobs they can't handle.

omairvaiyani commented 5 years ago

@stansv So I've tried this theory but I doesn't appear to hold true.

I made a simplified version of our Producer server and tried the following Server 1:

const queueA = new Queue("A");
queueA.process("jobA", () => null);
await queueA.add("jobB", {}); // this didn't throw an error

It's hard for me to reason properly as to why this snippet didn't fail.

stansv commented 5 years ago

I'm not sure why add() call itself even should fail, the "Missing process handler" error not related to adding a job. When I've tried your example the job itself failed with that error.

const test = async () => {
    const queueA = new Queue("A");
    queueA.process("jobA", () => null);
    try {
        const job = await queueA.add("jobB", {}); // this didn't throw an error
        console.log(JSON.stringify(job));
        setTimeout(async () => { console.log(JSON.stringify(await queueA.getJob(job.id))); }, 1000);
    } catch (error) {
        console.log("Error: " + error);
    }
};

test();

output:

{"id":"3","name":"jobB","data":{},"opts":{"attempts":1,"delay":0,"timestamp":1561732792689},"progress":0,"delay":0,"timestamp":1561732792689,"attemptsMade":0,"stacktrace":[],"returnvalue":null,"finishedOn":null,"processedOn":null}

{"id":"3","name":"jobB","data":{},"opts":{"attempts":1,"delay":0,"timestamp":1561732792689},"progress":0,"delay":0,"timestamp":1561732792689,"attemptsMade":1,"failedReason":"Missing process handler for job type jobB","stacktrace":["Error: Missing process handler for job type jobB\n    at Queue.processJob (/.../node_modules/bull/lib/queue.js:1006:7)\n    at processTicksAndRejections (internal/process/task_queues.js:89:5)"],"returnvalue":null,"finishedOn":1561732792798,"processedOn":1561732792795`
omairvaiyani commented 5 years ago

That's interesting, fetching a job where a processor wasn't issue resulted in an error?

stansv commented 5 years ago

When you define a processor on a queue instance (queueA.process()), Bull internally adds a subscriber to Redis queue. After that subscriber takes new job from queue and realizes that there's no processor with matching job type registered. In result, job is marked as failed. (I suspect there can be a race condition, when new job is acquired before your queue setup code is completed.)

Fetching/adding a job should not produce such errors.