paed01 / bpmn-engine

BPMN 2.0 execution engine. Open source javascript workflow engine.
MIT License
860 stars 166 forks source link

bpmn-engine is not waiting for async calls #172

Closed ermarkar closed 1 year ago

ermarkar commented 1 year ago

I have integrated bpmn-engine with ding-dong agi server.

So in flow i mentioned voice files to play

const listener = new EventEmitter();
listener.on('activity.start', async (elementApi, engineApi) => {
   console.log(elementApi.name);
   await streamFile(voiceFilePath)
});

listener.on('activity.wait', (elemntApi, instance) => {
  console.log(`${elemntApi.type} <${elemntApi.id}> of ${instance.name} is waiting for input`);
  elemntApi.signal('don´t wait for me');
});

await engine.execute({
  listener
});

it is logging all the activity names but it is not playing any file using streamFile, even i am using await.

how can i solve this?

paed01 commented 1 year ago

An eventhandler is immediate and will not be awaited. If you want to postpone the execution on start you have to use an extension.

ermarkar commented 1 year ago

Thanks for the great library and quick response, but still bit confused.

so instead of listener for activity.start, i should use extensions? like this

const engine = new Engine({
  name: 'My IVR listener',
  source,
  extensions: {
    myExtentension
  }
});

const listener = new EventEmitter();

listener.on('wait', (elementApi) => {

  // async tasks here

  elementApi.signal();
});

engine.execute({
  listener
});

function myExtentension(activity) {
  activity.on('start', async () => {
    // await -- async task 1 -> getting data from server
    // await -> asyync task2 -> streaming file 
    // await -> async task 3 

    // do i need to publish this????
    activity.broker.publish('format', 'run.form', {form});
  });
}

and for wait listener would work? or for that i need to listen the same wait insside myExtension?

paed01 commented 1 year ago

Have a look at this test but you will replace the getForm with your agi-api-call.

The activity execution will wait until the endRoutingKey is published. You decide what to pass when publishing the format end routing key. The content will be added to the message content on the next step in the execution - i.e. the lifecycle of an activity.

ermarkar commented 1 year ago
bpmnEngine = Engine({
        name: 'IVR calls Listener',
        extensions: {
          saveToResultVariable,
          myExt(activity) {

            const endRoutingKey = 'run.form.end';

            activity.on('enter', async (elementApi, engineApi) => {
              console.log("myext:", activity.name);
              await FlowActivityHandler.onStart(elementApi, engineApi);
              activity.broker.publish('format', endRoutingKey, { "helloKey": "hello" });
            });
          }
        }
      });

but still it is not waiting and not playing the files, am i still missing something?

ermarkar commented 1 year ago

i added

activity.on('enter', async (elementApi, engineApi) => {
              console.log("myext:", activity.name);
              activity.broker.publish('format', 'run.form.start', { endRoutingKey });

              try {
                await FlowActivityHandler.onStart(elementApi, engineApi);
              } catch (err) {
                console.log("file: agi-server.js:117 ~ activity.on ~ err:", err);
              }
              console.log("firing activing end ->>>>>>>>> ")
              activity.broker.publish('format', endRoutingKey, { "helloKey": "hello" });
            });

now this is behaving in sync manner, but it got stuck on bpmn:IntermediateThrowEvent

fileToPlay -> welcome firing activing end ->>>>>>>>> element api ------ undefined bpmn:IntermediateThrowEvent element api ------ undefined bpmn:IntermediateThrowEvent firing activing end ->>>>>>>>>

its not moving further

paed01 commented 1 year ago

Perhaps the referenced test can shed some light on the issue.

ermarkar commented 1 year ago

@paed01 thanks for the help, but still I am facing same, I have used this now

function bpmnActivityHandler(bpmnActivity) {
  if (bpmnActivity.type === 'bpmn:Process') return;
  return {
    activate() {
      bpmnActivity.on('enter', async (elementApi, engineApi) => {
        bpmnActivity.broker.publish('format', 'run.format.start', { endRoutingKey: 'run.format.complete' });
        try {
          await FlowActivityHandler.onStart(elementApi, engineApi);
        } catch (err) {
          // end call
        }
        bpmnActivity.broker.publish('format', 'run.format.complete', { msg: "testmm" });
      }, { consumerTag: 'format-on-enter' });
    },
    deactivate() {
      bpmnActivity.broker.cancel('format-on-enter');
    },
  };
}

still it got stuck on bpmn:IntermediateThrowEvent. though there are other bpmn:IntermediateThrowEvent before the first streamFile that is working fine means it proceeds

this is under subprocess like this.. it is not moving to other subprocess

image

ebfore greetings there are service subprocess that too containes bpmn:IntermediateThrowEvent but it works for that, greeting is the first subprocess having files to play

Am i still missing something?

ermarkar commented 1 year ago

i added this and it is proceeding all the elements now

function bpmnActivityHandler(bpmnActivity) {
  const eventsToIgnore = ['bpmn:Process', 'bpmn:SubProcess', 'bpmn:IntermediateThrowEvent'];
  if (eventsToIgnore.includes(bpmnActivity.type)) {
    return;
  }
  return {
    activate() {
      bpmnActivity.on('enter', async (elementApi, engineApi) => {

and the bpmn:UserTask that needs user input I am handling using listener like this

// waiting for user input
    activityListener.on("wait", onWait);

    await bpmnEngine.execute({
      listener: activityListener,

is this correct?

paed01 commented 1 year ago

If it works I guess so :)

Nice work.

ermarkar commented 1 year ago

Thanks for the help and clues!!! is there any document for this..

bpmnActivity.broker.publish('format', 'run.format.start', { endRoutingKey: 'run.format.complete' });

like all the elements in this are still confusing, what are other event that can be published etc

thanks again :)