Open hassan-alnator opened 4 months ago
Yes, this is a "known characteristic" of Camunda 8's distributed, eventually-consistent architecture.
Some great thoughts and suggested approaches.
I like Option 2 for the SDK.
Option 1 involves changes to the core system, which we can't do here.
Option 3 is from a hypothetical future.
Option 2 looks good - it's a realistic ergonomic to address this scenario. In my integration tests, I use awaited timeouts to introduce delays - and I have to tune them sometimes depending on load and location. Having a subscribeable callback would be more ergonomic.
Cleaning up resources and dealing with timeouts are two aspects that I can see.
What do you think about an API like this:
const instance = await zbc.createProcessInstance({
bpmnProcessId: `c8-sdk-demo`,
variables: {
humanTaskStatus: 'Needs doing',
},
})
console.log(`[Zeebe] Finished Process Instance ${instance.processInstanceKey}`)
const flow = await camunda8.getProcessInstanceSequenceFlows({
processInstanceKey: res.processInstanceKey,
timeout: 10000,
pollInterval: 500
}).catch(() => console.log(`Didn't get it within ten seconds`)
pollInterval
is optional, with some sensible default.
Under the hood, the call is executed every pollInterval
until we get back a result, or we throw the 404 if we hit the timeout
.
Users can "subscribe" to the Promise outcome with a callback function in then
, or await it for synchronous looking code.
What are your thoughts on that?
I like that I think it will enhance the experience a lot , so basically something like this :
import { HTTPError } from 'got';
..... class code ....
public async getProcessInstance(
processInstanceKey: number | string
): Promise < ProcessInstance > {
const headers = await this.getHeaders();
const rest = await this.rest;
// can also be used later for a benchmarking utility
const requestStartTime = Date.now();
while(true) {
try {
const response = rest(`process-instances/${processInstanceKey}`, {
headers,
parseJson: (text) => losslessParse(text, ProcessInstance),
});
return response.json();
} catch (error) {
if (error instanceof HTTPError && error.response.statusCode === 404) {
// If we receive a 404, it means the process instance key is not found or not available yet
// We'll retry until we reach the timeout
if (Date.now() - requestStartTime >= timeout) {
throw new Error('Timeout reached while waiting for process instance sequence flows');
}
} else {
// Handle other types of errors
console.error('Error fetching process instance sequence flows:', error);
throw error;
}
}
// Wait for the specified poll interval before retrying
await new Promise(resolve => setTimeout(resolve, pollInterval));
}
};
the other concept I had will change the way the sdk is used so maybe its not a good idea for now
// Define a function generator that fetches data at regular intervals
async function* dataGenerator(interval: number): AsyncGenerator<any, void, unknown> {
while (true) {
try {
const data = await fetchData();
if (data) { // or 404
// If data is available, yield it
yield data;
// Break the loop if data is available
break;
}
} catch (error) {
console.error('Error fetching data:', error);
}
// Wait for the specified interval before trying again
await new Promise(resolve => setTimeout(resolve, interval));
}
}
// Usage example
(async () => {
const generator = dataGenerator(5000); // Fetch data every 5 seconds
for await (const data of generator) {
console.log('Data:', data);
// Use the fetched data here
}
})();
A related issue are the search API calls - on Operate and Tasklist you can search for processes.
const processes = await operate
.searchProcessInstances({
filter: {
processDefinitionKey,
state: 'ACTIVE',
},
})
That is not going to return a 404 while waiting for eventual consistency - it is going to return a 200 with zero results.
At some point it will return a 200 with one or more results.
So these APIs could benefit from a generator API.
What do you think about a search subscription that is an event emitter that wraps the dataGenerator and emits data event?
That might be a more familiar API for most developers. So something like:
const processSubscription = operate
.subscribeProcessInstances({
filter: {
processDefinitionKey,
state: 'ACTIVE',
},
})
processSubscription.on('data', process => {
console.log(process)
processSubscription.close()
// effectively processSubscription.once() with cleanup
})
@jwulf if we have fix multiple issues at once I think it would be great , the only thing I see here is that what if the search is for something that dose not exist , the subscription needs to have a timeout , so maybe we should merge both solution in one and add it to all our apis.
const processSubscription = operate
.subscribeProcessInstances({
timeout: 10000, // Optiona With default value of 5000
pollInterval: 500 // Optional with default value of 300
filter: {
processDefinitionKey,
state: 'ACTIVE',
},
})
processSubscription.on('data', process => {
console.log(process)
processSubscription.close()
// effectively processSubscription.once() with cleanup
})
Should the timeout default to something or should it default to infinite?
Think about this use case: refreshing tasks from tasklist as they appear, adding them to a tasklist in a frontend. That is a subscription that you would want to be persistent.
When an timeout for the subscription is specified, it should emit an event when it times out - and then maybe only if no data was received.
The timeout needs to have default to value just to avoid any memory leaks as this has a filter "its looking for something" the default can be something a little high like 50000 MS and the interval will keep refreshing when new data received.
in the use case mentioned (No process or task based filters ) I would design it around ServerSentEvents or WebSockets , something like fetch-event-source as it will let the browser handle the logic of having an interval on the front-end and on the backend its exactly like emit event.
operate
.subscribeProcessInstances({
filter: {
processDefinitionKey,
state: 'ACTIVE',
},
async onopen(response) {
},
onmessage(msg) {
},
onclose() {
},
onerror(err) {
}
});
we can add events to manage the full life cycle of the subscription like the methods above , in the case of any ID based filter the timeout will trigger the on error callback any state or assignee like filters will keep going.
SDK Component
Using the operate API Client with the following code :
Expected Behavior
Access to data related to the process while creating these objects on the fly returns 404 as it takes time to make these objects available which is confusing and results in an unpredictable application behavior as well as making it hard to unit test apps built with the SDK.
Current Behavior
Error for the Sequence Flow
Result of opening the same link in the browser immediately after the error:
Error for the Process Instance
Result of opening the same link in the browser immediately after the error:
Possible Solution
I think the main issue is related to how elastic search works and that it needs time to index the data unless its manually refreshed.
Option 1:
I suggest we introduce a method that allows the developer to refresh the index effected when needed (as optional considering the performance impact on ELK )or await for data to come back knowing that the create function returned 200 OK
Option 2:
another option is adding an option to get data as a streams and check the result in an interval and return when ready then kill the interval or something like function generators like Observables to subscribe to changes.
Option 3 (after browser support is resolved - ISSUE #79):
to elevate the experience of using these apis while building front-ends is using EventSource to get changes when its available
Steps to Reproduce
Context (Environment)
I am building a wizard application on my self-hosted environment that has each step represented as a userTask to be able to evaluate data and decide on the next step using DMNs and proper logic which requires real-time access to data and process variables to drive the UI.