paed01 / bpmn-engine

BPMN 2.0 execution engine. Open source javascript workflow engine.
MIT License
884 stars 167 forks source link

Asynchronous/delayed signal: How? #143

Closed pmullot closed 3 years ago

pmullot commented 3 years ago

Congrats for a super cool library! It is awesome!

My problem is the following:

In aAWS Lambda function (ephemeral) scenario and with this simple process

[...]
  <process id="theProcess" isExecutable="true">
    <startEvent id="start" />
    <userTask id="task" />
    <endEvent id="end" />
    <sequenceFlow id="flow1" sourceRef="start" targetRef="task" />
    <sequenceFlow id="flow2" sourceRef="task" targetRef="end" />
  </process>

[...]
listener.once('wait', elementApi => {
   // we've reached the user task.
   // we save the state to some DB
   // we terminate the engine and exit the lambda function
});

How should I implement the case of the user action being resolved well after the lambda function has been shut down (and the state of the engine saved to db) Let's imagine that when the user completed his/her task, a new HTTP call is made to a different lambda function, which loads the previously saved state. Now I suppose we have to "signal" the engine that the task is done. How do I do that?

paed01 commented 3 years ago

Thank you. Have you explored the getPostponed function? or you may even use the engine signal function after the execution is resumed.

pmullot commented 3 years ago

Thanks for the hint. I'm trying to get use the getPostponed function to work... This is my code:

[...]
listener.once('wait', async (elementApi, engineApi) => {
  const state = await engine.getState();
  fs.writeFileSync('./some-random-id.json', JSON.stringify(state));
  console.log(JSON.stringify(state, null, 2));
});

[...]

if (fs.existsSync('./some-random-id.json')) {
  let state = fs.readFileSync('./some-random-id.json');
  state = JSON.parse(state);
  engine = Engine().recover(state);
  const postponed1 = engine.execution.getPostponed();
  engine.resume(
    {
      listener,
    },
    (err, execution) => {
      if (err) throw err;
      console.log(`User sirname is ${execution.environment.output.task.sirname}`);
    }
  );
  const postponed2 = engine.execution.getPostponed();
}

The problem I have is:

paed01 commented 3 years ago

There should be a isResumed or similar on elementApi.content, or perhaps on elementApi.fields.redeliveried. Trying to resolve this on my phone, so sorry for the brief answers.

paed01 commented 3 years ago

Actually it should be content.isRecovered for reasons I don't remember.

listener.once('wait', async (elementApi, engineApi) => {
  if (elementApi.content.isRecovered) return;
  const state = await engine.getState();
  fs.writeFileSync('./some-random-id.json', JSON.stringify(state));
  console.log(JSON.stringify(state, null, 2));
});
pmullot commented 3 years ago

It is indeed. I wrote the same code as you Thanks a lot to have taken the time to answer my questions.

I’ve also tried to use the signal function from the execution object, which works great but might have a sort of a bug:

As stated in the docs, as arguments to the signal function, you pass an idparameter to indicate which task you want to signal for and an executionId and lastly any other key/pair argument as the message payload. However, when the message is received by the listener and you check the content of elementApi.content.output, you can find both idand executionId there. In my opinion and to be consistent with just signaling from the elementApi, those 2 fields should be stripped out of the message, since they aren’t really part of the payload.

pmullot commented 3 years ago

maybe I should open a new thread for that and close this one as it is solved.