Closed Ni55aN closed 5 years ago
I have implemented wrapper for PgBoss which allows to fail running jobs on shutdown. So far, I'm not completely sure about its stability. I did some tests and they all passed without problems (that is, after crashing the server, all incomplete jobs will be resumed)
class Queue {
constructor(id, dbConnection, debug = false) {
this.id = id;
this.boss = new PgBoss(dbConnection);
this.handler = null;
this.activeJobsId = [];
this.boss.on('error', error => console.error(error));
this._stop = false;
this._debug = debug;
}
setOptions(subscribeOptions, publishOptions) {
this.subscribeOptions = subscribeOptions;
this.publishOptions = publishOptions;
}
async start() {
await this.boss.start();
}
async stop() {
this._stop = true;
if(this.activeJobsId.length)
await this.boss.fail(this.activeJobsId);
await this.boss.stop();
}
async subscribe(callback) {
await this.boss.subscribe(this.id, this.subscribeOptions, async job => {
this._debug && console.log(`started ${job.id}`);
if(this._stop) throw '_stop';
this.activeJobsId.push(job.id);
await callback(job);
this.activeJobsId.splice(this.activeJobsId.indexOf(job.id), 1);
this._debug && console.log(`completed ${job.id}`);
});
}
async publish(data) {
return await this.boss.publish(this.id, data, this.publishOptions)
}
}
How to use:
const queue = new Queue('some-queue', 'postgres://...', true);
queue.setOptions({ teamSize: 8, teamConcurrency: 5 }, { retryLimit: 3, retryDelay: 0 });
(async function() {
await queue.start();
await queue.subscribe(async (job) => {
await delay(job.data.time);
// throw 'fail';
});
await queue.publish({ time: 5000 });
await queue.publish({ time: 1000 });
await queue.publish({ time: 2000 });
})();
async function shutdown() {
console.log('<\nshutdown')
await queue.stop();
console.log('shutdown\n>')
process.exit(1);
}
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
Hey there! pg-boss monitors the state of all jobs and will automatically expire those which have not received a complete()
or fail()
. The default expiration is 15 minutes, so just adjust this down to your expected times.
@timgit what happens if the job is processed longer than the expiration time (several processes/servers are working on tasks)
Will be the job started by another server?
How the job is handled on failure depends on the retry configuration on the job. If you have retyLimit set to 1, for example, it will be picked up again after expiration.
I have retyLimit = 3. Is it will be picked by server 2
despite the server 1
did't completed/failed the job?
Yes, it's first come, first served for jobs if you have multiple instances/subscribers
but how will the server 2
know that it should not process the expired job?
If you have retries enabled, you want the job to be tried again. I’m not sure I’m understanding your question.
I have next options:
Steps:
As a result a job with retry
status cannot be completed
The issue in your configuration is that your job expired after 10s and is considered failed. If your job takes 25s, you should plan for that and not set it to 10s.
You said that the 2nd server picked up the retry after the first job expired. This is exactly what you want to happen. I don’t see why you think this is a problem. This use case is also not something specific to pg-boss, but rather how any distributed system with job retries would work. You need to determine “how long is long enough before I consider this server died while processing my job?” Expiration is a fail-safe, and usually indicates something unexpected happened.
@timgit can the expirein
value be dynamic? For example, when job may take longer than usual, but I don't want to expire it
expireIn is a config on the publish side. You could add an onComplete subscription, check for failed=true and state=expired, then republish with a longer expiration if that fits your use case.
@timgit I don't want to cancel current job since it's goes on successfully
I don't know the specifics of your use case, so I can only comment on generalities here. In general, just use expiration "the longest this job should ever take". If that means that sometimes the job will take 24 hours, for example, just set the job expiration to 24 hours. I imagine if these are important enough then you build another set of jobs which wake up and monitor such a long-running process.
I call
await boss.stop ()
, but theactive
jobs remain in the database and do not resume after the server restart. Should I callboss.fail()
for each active tasks manually or does pg-boss have a built-in methods?