This project aims to provide a neat Kotlin Coroutines API to Zeebe Gateway. Right now there is just a worker coroutine API.
I decided to create it in trying to gain all performance that I can gain using Kotlin Coroutines stack. So, if you replace blocking calls with Coroutines suspension you can take more jobs in parallel, instead of one (in Zeebe Client Java default settings).
You can see the performance comparison test for yourself, but in my machine, the numbers are next:
For Zeebe Client it took 41.186149961s to process 40
...
For Coworker it took 1.522231647s to process 40
So, Zeebe Client Java duration / Coworker duration = 27.056427346106805
So, the same worker with delay, but in the reactive stack takes in 27 times less time to complete process instances.
<dependency>
<groupId>org.camunda.community.extension.kotlin.coworker</groupId>
<artifactId>coworker-core</artifactId>
<version>x.y.z</version>
</dependency>
ZeebeClient
instance, for example, ZeebeClient.newClient()
Cozeebe
instance zeebeClient.toCozeebe()
Create a new Coworker instance:
val coworker = cozeebe.newCoWorker(jobType, object : JobHandler {
override suspend fun handle(client: JobClient, job: ActivatedJob) {
val variables = job.variablesAsMap
val aVar = variables["a"] as Int
val bVar = variables["b"] as Int
variables["c"] = aVar + bVar
client.newCompleteCommand(job).variables(variables).send().await()
}
})
coworker.open()
JobHandler
methods)It requires:
First, you need to add dependency:
<dependency>
<groupId>org.camunda.community.extension.kotlin.coworker</groupId>
<artifactId>coworker-spring-boot-starter</artifactId>
<version>x.y.z</version>
</dependency>
Then, if you need to define Zeebe Worker with coroutines, like this:
@Coworker(type = "test")
suspend fun testWorker(jobClient: JobClient, job: ActivatedJob) {
someService.callSomeSuspendMethod(job.variables)
jobClient.newCompleteCommand(activatedJob.key).send().await()
}
Note:
suspend
@Coworker
.await()
instead of .join()
in the example upward.Sometimes you need to provide some data in a coroutine context (an MDC map, for example) based on the job.
To do so, you have to override additionalCoroutineContextProvider
from JobCoworkerBuilder
. Something, like this:
client.toCozeebe().newCoWorker(jobType) { client, job ->
// your worker logic
client.newCompleteCommand(job).send().await()
}
// override additionalCoroutineContextProvider
.also { it.additionalCoroutineContextProvider = JobCoroutineContextProvider { testCoroutineContext } }
// open worker
.open().use {
// logic to keep the worker running
}
If you are using the Spring Boot Starter, you need just to create a bean with the type JobCoroutineContextProvider
in your Spring context. Like this:
@Bean
fun loggingJobCoroutineContextProvider(): JobCoroutineContextProvider {
return JobCoroutineContextProvider {
MDCContext()
}
}
Sometimes, you want to override the default error handling mechanism. To do so, you need to customize your worker like this:
client.toCozeebe().newCoWorker(jobType) { job: ActivatedJob, jobClient: JobClient ->
// worker's logic
}
.also {
// override job error handler
it.jobErrorHandler = JobErrorHandler { e, activatedJob, jobClient ->
if (e is IgnorableException) {
jobClient.newCompleteCommand(activatedJob).variables(mapOf("ignored" to true)).send().await()
} else {
jobClient.newFailCommand(activatedJob).retries(activatedJob.retries - 1).send().await()
}
}
}
If you are using the Spring Boot Starter, you need to define a JobErrorHandler
bean in your context:
@Bean
open fun customErrorHandler(): JobErrorHandler {
val defaultErrorHandler = DefaultSpringZeebeErrorHandler()
return JobErrorHandler { e, activatedJob, jobClient ->
logger.error(e) { "Got error: ${e.message}, on job: $activatedJob" }
defaultErrorHandler.handleError(e, activatedJob, jobClient)
}
}
Warning: It is highly recommend to use the DefaultSpringZeebeErrorHandler
wrapper to wrap your error handling logic. More info in: https://github.com/camunda-community-hub/kotlin-coworker/issues/54
This works basically the same as in the Spring Zeebe project.
So, you can override values in the @Coworker
annotation with type foo like this:
zeebe.client.worker.override.foo.enabled=false
Note: you can't use the SpEL and properties placeholders in this value. You should return the same type in the @Coworker
annotation.
The exception is Duration
. You should return Long
values in milliseconds.
If you want to redefine org.camunda.community.extension.coworker.spring.annotation.Coworker
parameters, you should use SPeL to define annotation values.
See the property's JavaDoc for the type that should be resolved.
Also, you may use property placeholders (${}
) in the annotation parameters to replace them with configuration properties if needed.
As an example you may refer to the test.
If you want to observe your coworkers, there is a port of some metrics from the Spring Zeebe project in the Coworker's Spring Boot Starter:
camunda.job.invocations
action
- what happens to a job
activated
- The job was activated and started to process an itemfailed
- The processing failed with some exceptiontype
- job's typeJobClient