camunda-community-hub / zeebe-client-node-js

Node.js client library for Zeebe Microservices Orchestration Engine
https://camunda-community-hub.github.io/zeebe-client-node-js/
Apache License 2.0
152 stars 38 forks source link

Would like to handle jobs in batch #134

Closed jbeaudoin11 closed 4 years ago

jbeaudoin11 commented 4 years ago

Hi !

Something i was wondering about, could we add batch support for worker ? Let say i have 500 jobs which each of them fetch data to an external API or database for example. Batching them in a single request would help reduce the load on these external resources and also reduce latency in general. Example :

const zbBatchWorker = zbc.createBatchWorker(
    'test-worker',
    'demo-service',
    {
        batch: 50, // # of jobs per batch
        timeout: 1000, // or 1 sec
    },
    async (payloads, complete) => {
        const ids = jobs.map((j) => j.id);
        const users = await fetch(`someUrl/user?ids=${ids.join(',')}`);

        for(const [key] of Object.keys(ids)) {
            const index = Number(key);
            const user = users[index];

            try {
                // ...Do some work... 

                if(...) {
                    complete.success(index, ...)
                } else {
                    complete.failure(index, ...)
                }
            } catch(error) {
                complete.error(index, ...)
            }
        }

        /*
            When the batch is done,
            this commit each job and
            call the right procedure (grpc)
            base on 'complete'
        */
        await complete.done();
    }
);

I can pass the number of jobs per batch or a timeout after which the batch is handle anyway. That said, it could be link to the maxActiveJobs propertie.

jwulf commented 4 years ago

maxActiveJobs is 32 by default. See this section of the README: https://github.com/creditsenseau/zeebe-client-node-js#create-a-task-worker

The job handler is already the predicate function on an async map function (forEach with a callback - the complete methods), so it is doing what you are thinking of already.

See here: https://github.com/creditsenseau/zeebe-client-node-js/blob/master/src/zb/ZBWorker.ts#L275

Maybe I'm not understanding your use case...

How is your task not something that should be done as an array property in a single task?

Each of those jobs is a different instance of a task in the BPM model. Is there a reason why you don't batch it in your model?

If they can be batched in the worker, both logically and temporally, how are not correlated that way in the model?

jbeaudoin11 commented 4 years ago

The job handler is already the predicate function on an async map function (forEach with a callback - the complete methods), so it is doing what you are thinking of already.

Not exactly, yes the worker process multiple job concurrently, but they share the same handle definition. What i'm trying to to do is let the handle manage the concurrency.

Maybe I'm not understanding your use case...

Maybe it's worth mentioning, I'm in the process of evaluating Zeebe to move an old implementation of workflow engine. We are currently processing hundreds of jobs per sec, this means hundreds/thousands workflow instances per workflow simultaneously, 0-10 workflows per user. It adds up. A workflow can be super simple, like 1 step, or more complex like 20 steps. But we have 16 job types to create these steps, 8 actions, 8 conditions with some timer steps.

In our implementation, a worker has control hover the whole batch it's trying to process. This gives us more flexibility like external API calls can be group into 1 single call which reduce a lot of the traffic on that external API.

How is your task not something that should be done as an array property in a single task?

Like i said, my goal is to reduce the number of external calls done by the job worker. Tasks of the same job type can share part of the code to optimize the processing ie fetch data from external API. Make 1 request vs 500.

Each of those jobs is a different instance of a task in the BPM model. Is there a reason why you don't batch it in your model?

Because it doesn't make sense to do it like that. I'm not trying to fetch multiple items related to a single job, generally 1 job = 1 fetch. What I'm trying to do is to batch at the worker level since we have lots of concurrent instances.

Part of the job can be shared in a batch, but we still want to keep control of individual instances.

Does it make more sense ?

jwulf commented 4 years ago

You can actually do it now, like this:

import { ZBClient, ActivateJobsRequest } from 'zeebe-node'

const zbc = new ZBClient()

const req: ActivateJobsRequest = {
    maxJobsToActivate: 2,
    requestTimeout: 2000,
    timeout: 2000,
    type: 'my-task-type',
    worker: 'test-worker',
}

console.log((zbc as any).gRPCClient)
const stream = (zbc as any).gRPCClient.activateJobsStream(req)
stream.then(res => res.on('data', console.log))

If you are using TypeScript you'll need to erase typing of zbc, because gRPCClient is marked private. (and heads-up: it is changing to grpc in 0.23.0-alpha1, so assign it to another const in one place in your code, so the impact is minimal for you).

This is the branch that is coming in: https://github.com/jwulf/zeebe-client-node-js/tree/grpc-refactor

You would use the ZBClient.completeJob() and ZBClient.failJob() methods, as the ZBWorker lib does.

You can look through the source code for further information.

I am not going to add support for this use-case any time soon. It's a whole different state management situation. You can use the Node client as a low-level lib to do all the kind of custom coded solutions that are the way it is done with the other clients, for sure.

(Having said that - I'm easily excited about cool new things, and the next time I take a shower or go to sleep I may wake up possessed by a vision for this).

My focus with the library has been to provide an opinionated solution that lifts the logistics out of the frame and lets you focus just on the logic and the model. Dan Shapir's Zeebe Node NestJS integration (which is what I use in production) takes that even further.

But yeah, you can totally use it to do it in any way that you like.

The public API is stable, and if we stay in communication, I can make a stable API for your use-case - even if the library doesn't immediately support it as a first-class pattern. The changes in 0.23.0 will impact the naming of the grpc client component, but most of the other state management you wouldn't be using, and the underlying gRPC protocol methods, and the Zeebe Node public APIs are stable.

jbeaudoin11 commented 4 years ago

Alright, tbh i was already thinking about forking and doing it my self :P.

Might have a PR at some point for that.

jwulf commented 4 years ago

Yeah, there is like retry logic for operations and things like that that would be useful to reuse in this scenario.

The lightest weight solution would be to give the ZBClient an ActivateJobs method that allows you to specify the request, and returns a batch.

Then you are responsible for managing the completion of each job using the decoupled completion pattern.

Yeah, I can see a low-orbit solution. I'll add the public API for activateJobs on ZBClient and it will roll out this week with the 0.23.0 release.

It actually just takes that decoupled completion pattern another step forward.

jbeaudoin11 commented 4 years ago

I was thinking of creating a new ZBBatchWorker inspired by the ZBWorker.

Maybe they could share some logic.. Will see

jwulf commented 4 years ago

That will be cool.

jwulf commented 4 years ago

About this:

const zbBatchWorker = zbc.createBatchWorker(
    'test-worker',
    'demo-service',
    {
        batch: 50, // # of jobs per batch
        timeout: 1000, // or 1 sec
    },

The timer resolution in the broker is a 30-second sweep, so you can't guarantee anything less than that for job timeouts.

It was a mistake I made putting times in ms. See here: https://github.com/zeebe-io/zeebe/issues/3944

jbeaudoin11 commented 4 years ago

I'm not sure if we are talking about the same thing. Maybe my naming is wrong, in google pubsub it would be :

{
    maxMessages: 50,
    maxMilliseconds: 1000,
}

It's either 50 jobs or wait 1s before processing the next batch. So you can have less than 50 jobs.

jwulf commented 4 years ago

Ah, in Zeebe it is: 50 jobs, and give me n seconds to complete them before passing them out to another worker request.

jbeaudoin11 commented 4 years ago

Yes I'm describing a different behavior. Is it not ActivateJobsRequest.requestTimeout ?

jwulf commented 4 years ago

What do you think about extending the job, by appending success failure and error properties to it?

Then the API would be:

batch.map(job => didItWork? job.success() : job.failure())

You don't have to manage references or state then in your business logic.

You wouldn't be limited to using that - you can still destructure the data for the job and complete it with the job key as an argument to a method. It would be more like a convenience method.

Also, the error completion is not for exceptions in code, it is for business errors - like "User's payment was declined", that are modeled as exceptional flows. BPMN Error Event.

Code exceptions are failure - they communicate: "I blew up, give the job to another worker to retry or raise an operational incident if retries are exhausted".

jwulf commented 4 years ago

There are two timeouts.

One is requestTimeout - how long your request should be pending before the broker releases the blocking long poll of your pending request. That's not going to be an issue for your use case, by the sound of it. By default the broker terminates requests at 15s, unless you request more.

The other is timeout - that is the amount of time the broker gives the worker exclusively responsibility for carrying out and reporting the outcome of the work. When that time expires, if jobs have not been completed, the broker will return those jobs in response to requests from other workers.

After that time, this worker can still complete them, up the moment that another worker reports completion before they do; but this worker cannot fail them now. They are already being "retried".

Zeebe guarantees at-least-once delivery of jobs.

jwulf commented 4 years ago

One last thought, about the method signature. 0.23.0 adds overloads to createWorker to remove the worker id (it turns out not be so useful that it should be required), and add a single param object signature.

See here for more details: https://www.joshwulf.com/blog/2020/02/refining-method-signature/

jbeaudoin11 commented 4 years ago

Yes I think it would be better to receive a job obj with callbacks, variables and headers. Since we need to do individual grpc calls anyways there is no real benefit of having a complete.done method for the batch.

requestTimeout, is what i was trying to describe then. Is there a minimum value ? In our case, some job types are rarely used to build workflow, this means it could be nice to not always poke for new tasks every milliseconds if the "queue" is empty, workers will use minimal ressources in those down time.

jwulf commented 4 years ago

https://zeebe.io/blog/2019/08/long-polling/

You probably want to read this one too:

https://zeebe.io/blog/2019/12/zeebe-performance-profiling/

jwulf commented 4 years ago

I can see how to build this, re-using the machinery in place to get the ergonomics of retries.

It might be best to lift the existing ZBWorker to a ZBWorkerBase class, then extend it to get the specialisations of the poll request and the handler shape for batch and single job workers. I'll take a look at it today.

jwulf commented 4 years ago

OK, I've done it. I had the thought at the beginning:

Favour composition over inheritance

But I did it as a base class and two classes that extend it. Getting the Generics to work was a bit of work.

I think that it can be refactored to a single class, because the only difference between them is the shape of the taskhandler function, and then how the worker calls that taskhandler (mapping it over the jobs, or passing in the array).


There is an issue, though, I think, with this approach.

The ZBWorkers now have a minJobBatchSize property.

It is not a statement about the broker or the state of the jobs on the broker, but rather a statement about the capacity of the worker - which is the only thing the worker has knowledge of.

However, there is no way to guarantee that this is the minimum number of jobs that the worker will receive.

Rather, this is the minimum capacity that the worker must have before it will request more jobs.

For example:

There is no way to guarantee any kind of batching in the worker, unless I add a "poll period parameter", which will make configuration complex (as well as the code). Even then, nothing is guaranteed. You are really waiting for jobs to buffer on the broker.

My initial intuition is confirmed. If your workloads can be correlated in the worker, they need to be correlated in the model you build.

Or, you have to implement buffering in the worker, using an array and a timer, and using the complete.forwarded() method, like this:

// State machine to execute a handler on multiple jobs every ${time} or ${batchSize}

const JobBuffer = ({handler, timeout, batchSize) => {
   let jobs = []
   const loop = () => setTimeout(execute, timeout * 1000)
   let t = loop()
   const execute = () => {
      clearTimeout(t)
      handler([...jobs])
      jobs = []
      t = loop()
   }
   return {
      buffer: job => {
        jobs.push(job)
        if (jobs.length >= batchSize) {
           execute()
        }
      },
      count: () => jobs.length,
    }
}

// Every 60s or 10 jobs
const jobBuffer = JobBuffer({
  timeout: 60,
  handler: jobs => jobs.forEach(job => zbc.completeJob(job.key, {})),
  batchSize: 10
})

zbc.createWorker({
  taskType: 'jobs-to-batch',
  timeout: 70, // 70 seconds
  taskHandler: (job, complete) => {
    jobBuffer.buffer(job)
    complete.forwarded()
  })
})

OK, so I put this state machine in the ZBBatchWorker:

02:41:34.277 | zeebe |  [generic-test (batch)] Executing batched handler with 5 jobs
02:41:34.277 | zeebe |  [generic-test (batch)] Got 5 jobs...
CREATING
CREATING
CREATING
CREATING
CREATING
02:41:36.779 | zeebe |  [generic-test (batch)] Executing batched handler with 5 jobs
02:41:36.779 | zeebe |  [generic-test (batch)] Got 5 jobs...
CREATING
CREATING
CREATING
CREATING
CREATING
02:41:39.312 | zeebe |  [generic-test (batch)] Executing batched handler with 5 jobs
02:41:39.312 | zeebe |  [generic-test (batch)] Got 5 jobs...
CREATING
CREATING
STOPPING
jwulf commented 4 years ago

Will be out this week in the 0.23.0-alpha.1 release.

Here are the docs from the README:

The ZBBatchWorker Job Worker

The ZBBatchWorker Job Worker batches jobs before calling the job handler. Its fundamental differences from the ZBWorker are:

You can use the batch worker if you have tasks that benefit from processing together, but are not related in the BPMN model.

An example would be a high volume of jobs that require calls to an external system, where you have to pay per call to that system. In that case, you may want to batch up jobs, make one call to the external system, then update all the jobs and send them on their way.

The batch worker works on a first-of batch size or batch timeout basis.

You must configure both jobBatchMinSize and jobBatchMaxTime. Whichever condition is met first will trigger the processing of the jobs:

You should be sure to specify a timeout for your worker that is jobBatchMaxTime plus the expected latency of the external call plus your processing time and network latency, to avoid the broker timing your batch worker's lock and making the jobs available to another worker. That would defeat the whole purpose.

Here is an example of using the ZBBatchWorker:

import { API } from './lib/my-awesome-external-api'
import { ZBClient, BatchedJob } from 'zeebe-node'

const zbc = new ZBClient()

// Helper function to find a job by its key
const findJobByKey = jobs => key => jobs.filter(job => job.jobKey === id)?.[0] ?? {}

const handler = async (jobs: BatchedJob[], worker: ZBBatchWorker) => {
    worker.log("Let's do this!")
    const {jobKey, variables} = job
    // Construct some hypothetical payload with correlation ids and requests
    const req = jobs.map(job => ({id: jobKey, data: variables.request}))
    // An uncaught exception will not be managed by the library
    try {
        // Our API wrapper turns that into a request, and returns
        // an array of results with ids
        const outcomes = await API.post(req)
        // Construct a find function for these jobs
        const getJob = findJobByKey(jobs)
        // Iterate over the results and call the succeed method on the corresponding job,
        // passing in the correlated outcome of the API call
        outcomes.forEach(res => getJob(res.id)?.success(res.data))
    } catch (e) {
        jobs.forEach(job => job.failure(e.message))
    }
}

const batchWorker = zbc.createBatchWorker({
    taskType: 'get-data-from-external-api',
    taskHandler: handler,
    jobBatchMinSize: 10, // at least 10 at a time
    jobBatchMaxTime: 60, // or every 60 seconds, whichever comes first
    timeout: 80 // 80 second timeout means we have 20 seconds to process at least
})
jbeaudoin11 commented 4 years ago

I think it's exactly what i want in term of functionality. Thanks !