timgit / pg-boss

Queueing jobs in Postgres from Node.js like a boss
MIT License
2.05k stars 157 forks source link

Suggestion: Concurrent jobs limit for subscribe api #63

Closed robgraeber closed 6 years ago

robgraeber commented 6 years ago

Hey there, I've been using pg-boss for some simple worker server stuff and seems to work pretty well, good job!

So I have a worker server that I only want to run a single job concurrently at a time, and I'm kinda surprised there's not a config for that. I'm going to try doing it with fetch() but wanted to make a suggestion for the future.

timgit commented 6 years ago

Seems like a good fit for fetch() and complete() in my opinion. It's rather difficult to accomodate this requirement with a polling interval, especially if your interval is less than the time it takes to complete the job. What kind of configuration did you have in mind?

robgraeber commented 6 years ago

Something like boss.subscribe('job_name', {concurrentJobs: 1}, jobHandler). Where calling job.done() would be mandatory and would decrement the concurrent job count and poll again.

This is the helper function I made for my project:

/**
 * Substitute subscribe function that only allows 1 concurrent job.
 * @param boss PgBoss instance.
 * @param jobName Job name in the queue.
 * @param jobHandler Function that accepts a job parameter.
 * @param pollingFrequency The polling frequency in ms.
 */
export function subscribe<ReqData, ResData>(
  boss: PgBoss,
  jobName: string,
  jobHandler: PgBoss.SubscribeHandler<ReqData, ResData>,
  pollingFrequency = 1000
) {
  function fetchJob(): void {
    boss
      .fetch(jobName)
      .then(jobHandlerWrapper)
      .catch(() => setTimeout(fetchJob, pollingFrequency));
  }

  function jobHandlerWrapper(job: PgBoss.JobWithDoneCallback<ReqData, ResData> | null): void {
    if (!job) {
      setTimeout(fetchJob, pollingFrequency);
    } else {
      job.done = function() {
        fetchJob();
        return boss.complete(job.id);
      };
      jobHandler(job, job.done);
    }
  }

  fetchJob();
}
timgit commented 6 years ago

I have similar use cases where I want to limit concurrency and I've been using fetch, complete and fail to do that. I think your approach should work fine for what you need. I see subscribe as a convenience helper over those api functions. You do have me thinking how I could build something like this, which kind of sounds like a handler returning a promise for the poller to wait on. If this were passed in, I guess it could be used instead of the current callback technique.

timgit commented 6 years ago

In 3.0, you'll need to set teamConcurrency to 1 and make sure your handler returns a promise. This will make sure subscribe only runs 1 job at a time.

JulesAU commented 6 years ago

I'm currently testing this feature out in v3, but it doesn't seem to work the way I'd expect.

In my mind, the use case for limiting concurrency is "ensure that a maximum of [x] tasks are running concurrently at any point in time" to avoid exhausting the available resources on the host.

But what appears to be happening when I use teamSize & teamConcurrency is that a teamSize number of jobs are started when subscribe is called, but no further jobs will be started until all of the jobs started in the previous iteration are completed.

So if there is one long-running job, then all other jobs are queued until that one job is complete. Which means that, most of the time, effective concurrency = 1; regardless of what you've set teamConcurrency to.

timgit commented 6 years ago

Thanks for the feedback, and the explanation on how this feature works. You're right, if you opt into setting teamConcurrency and return a promise, pg-boss will simply wait for a Promise.all() to resolve, then fetch the next batch. There's several ways to mitigate this, from increasing your subscribers, to breaking apart queues into how long it typically takes to finish jobs in that queue.

The intended use case for teamConcurrency is "you have a large volume of jobs, and you don't want to exhaust the event loop in cases when normal throughput may be affected". If you don't use this, you have to be more conservative with setting batch sizes and fetch intervals to plan for periods of high volume.

You can always fall back to fetch() and complete() with advanced use cases as well. Hope this helps!

timgit commented 6 years ago

I'm interested in enhancing this, btw. 3.0 will go out as-is here in a week or 2. https://github.com/sindresorhus/p-queue is a candidate dep for how this might work in a future release. If that's the case, I may push bluebird back to a devDependency.