Closed sebmih closed 4 years ago
I'd have to look at your queue to be able to make some guess. startAfter is core to how the queue works. No job can ever be fetched from the queue unless startAfter is in the past. It doesn't matter what the specific value of startAfter is, as long as the startAfter < now()
condition is satisfied in SQL.
If you want to look at the table to investigate, compare your queue with the job fetch query: https://github.com/timgit/pg-boss/blob/master/src/plans.js#L195-L215
Ok, thanks, that makes sense. Is there something that we did that might affect the way the job fetching mechanism works?
The only tinkering we did in this area was when we called the subscribe()
method:
await jobQueue.subscribe(
queueName,
{ newJobCheckIntervalSeconds, teamSize: 1, teamConcurrency: 1 },
jobHandler,
)
Oh and here's an item from our pgboss.job
table:
https://i.imgur.com/6Pjq0fP.png
So the job you see in the picture will get executed once a new job is scheduled on that queue.
Try removing the options
Hmm, not quite an option. We added the newJobCheckIntervalSeconds
in order to increase the interval in which the system pools the database. And the teamSize
and teamConcurrency
options were added as we had the situation in which the job handler for one job was called back for another job :(
if you use async functions as your handlers, you shouldn't have any concurrency issues, as subscribe will await it before fetching another job
Our handlers are always async
but we still had concurrency issues when teamSize
and teamConcurrency
were both set to 5
.
Maybe what we're doing wrong is the way we use the queues. How specific should they be? Can we have a single queue for the entire application? A queue for each entity in our application?
How many queues you need is always bound to your use cases, but in general, I try to create the queues to represent as small an atomic action as I can, not the other way around. This is why features like retention, retries and throttling exist. Depending on the job, you may be willing to discard some via throttling or debouncing. Another example would be thousands of sensor devices needing to fetch only their assigned job, requiring thousands of different queues.
Hey there,
So I've removed all those options when subscribing to a queue and sadly the issue persists. Right now in my job
table there are a lot of jobs with the startAfter
column set in the past and nothing happens to them.
I think I know what's going on. So I scheduled a job for May 31st. Then I stopped the server and went into the job
table and changed the startAfter
date to a value in the past. Once the server restarted the job was no longer executed.
If I had left the server untouched, the job would have been executed at the moment when startAfter
was set in the past.
So apparently restarting the server somehow affects which queues are being listened to. Is there anything I can do to address this?
The problem with the issue you're raising is that I can't reproduce what you're describing. To restate my original reply, every job in the system cannot be processed unless the startAfter < now(), so by definition, startAfter is always in the past.
system cannot be processed unless the startAfter < now()
I agree and that is how it should work. Sadly, it does not work like that after I restart the server.
So even if startAfter < now()
the job is not executed if the server has restarted.
How is the server restarted? Any possible special circumstances? Is it possible that boss.start()
isn't called?
I don't think there are any special circumstances. I simply close the server and then start it again using the pubsweet start
command.
We are in deep trouble. Our production environment has tens of thousands of jobs that have not been executed :( they just hang there. It's like no one is subscribed or listens to those queues anymore.
I'd enable query logging in your development env (https://stackoverflow.com/questions/722221/how-to-log-postgresql-queries), replicate the issue and then look for the fetchNextJob (https://github.com/timgit/pg-boss/blob/master/src/plans.js#L195-L215 ) query, see if it's still running. If it is, you at least have some ground to stand on to look for the issue.
Alternatively, you could try running the query yourself, perhaps the jobs that are hanging are locked for some erroneous reason (I can't see how it could happen while the rows keep the 'created' state, but it's worth verifying.
SELECT *
FROM pgboss.job
WHERE state < 'active'
AND startAfter < now()
ORDER BY priority desc, createdOn, id
LIMIT 1
FOR UPDATE SKIP LOCKED
If that returns 0 rows, yet your job table is full of jobs, it's because they are locked (the for update skip locked
is crucial). I suspect this to not be the case, and it's more likely that the fetchNextJob
query isn't running. But again, worth to check.
Is there any reason why fetchNextJob
wouldn't be running?
I ran the query you gave me and it returned 47000+ results.
The only way I could get some of the jobs to be executed was to change the queue name
column to a queue that has been recently created.
For example, I changed a queue name from email-d7dfcdbf-5bf3-4017-8549-03a8bb401d5c
to journal-activation
and that job was executed imediately.
Is there any connection between the queue name and the fetchNextJob
query?
I see the query does check the queue name
WHERE state < '${states.active}'
AND name LIKE $1
AND startAfter < now()
but I'm not sure how it affects it.
Can you post your subscribe()
handlers and how you call them during your app startup? The behavior you're describing doesn't sound right. For example, in the readme, there is a queue some-queue
used by the line await boss.subscribe(queue, someAsyncJobHandler)
If you were to use this code in an app, you would start it up by calling await readme()
I'm not sure if this is useful, but I've seen behavior that sounds similar to this when editing already-executed jobs to look as if they haven't been dequeued, and only ever in development instances. Could you also share an example of how you're enqueuing new jobs?
@timgit I've finally figured it out, thanks to the most amazing @jure
You were on to something: I wasn't subscribing to my queues again on startup. So when the server restarted, all previous queues (and handlers) were gone, lost in the mists of time.
So the way I fixed this is by running await boss.subscribe(queue, someAsyncJobHandler)
each time our server restarted for all of our queues.
and how you call them during your app startup?
I didn't and sadly that's not so obvious in the documentation.
Another thing that I figured out is that each queue needs to have a specific job handler to run. For example the queue named emails-about-the-world-ending
should have a specific handler called handleEmailsAboutTheWorldEnding
. Not understanding the connection between queues and handlers caused us a lot of hassle which good have been prevented via better documentation.
I'll submit an MR in the need future in the docs which might help others avoid the disaster I'm into right now.
So, let's say that we have some jobs on
queue1
and theirstartAfter
property is in the past for all of them. If no other job is scheduled inqueue1
, then no job on that queue is completed, even though they should be. Does anyone have any idea what's going on?