taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
6.25k stars 408 forks source link

Branching during Flow execution #2436

Open vicpara opened 9 months ago

vicpara commented 9 months ago

I am interested in knowing if this type of flow execution is possible. This is not necessarily a feature request. I'm using BullMQ with NodeJS.

In a given flow, the parent-children dependency tree is given at flow creation time.

Let's say that during the execution of a child task there is a boolean result 'answer'. There are two chained strands of tasks that depend on this child task result 'answer'. Is it possible to run one branch and ignore the other depending on 'answer' value? Say for 'true' run one branch and for 'false' run the other?

Currently I'm designing a mechanism that relies on failing the first child of the non-executable branch and using 'ignoreDependencyOnFailure' when the branch results merge back together. I was wondering if there are other primitives or if there's a more idiomatic way of achieving this.

shaunakv1 commented 7 months ago

@vicpara We have a similar use case and we use what we are calling a Orchestrator Worker pattern.

You have Orchestrator Queue. The workers in this queue can span Child Flow Tree with the parent being the main orchestrator worker.

When your Child Flow Tree finishes, your parent orchestrator worker executes. Here you can make a decision to span another flow tree based on previous tree's result.

You can design quite complex flow patterns this way.

Here's a code sample ( not complete, just an example)

class FlowOrchestrator {
    static async startProcess(jobDefination) {
        const orchestratorFlow = new FlowProducer({ connection });

        const childJobs = [{
            name: ``,
            opts: {},
            queueName: CHILD_QUEUE1,
            data: jobDefination,
        },
        {
                name: ``,
                opts: {},
                queueName: CHILD_QUEUE1,
                data: jobDefination,
        }];

        const orchestratorJob = {
            name: `${ORCHESTRATOR_QUEUE}_child1`,
            opts: {},
            queueName: ORCHESTRATOR_QUEUE,
            data:{},
            children: childJobs
        }

        await orchestratorFlow.add(childJobs);
    }

    static async startStep2(jobDefination) {
        const orchestratorFlow = new FlowProducer({ connection });

        const childJobs = [{
            name: ``,
            opts: {},
            queueName: CHILD_QUEUE2,
            data: jobDefination,
        },
        {
            name: ``,
            opts: {},
            queueName: CHILD_QUEUE2,
            data: jobDefination,
        }];

        const orchestratorJob = {
            name: `${ORCHESTRATOR_QUEUE}_child2`,
            opts: {},
            queueName: ORCHESTRATOR_QUEUE,
            data: {},
            children: childJobs
        }

        await orchestratorFlow.add(childJobs);
    }

    /**
     * BullMQ Orchestrator Worker Function
     */
    async start() {
        this.orchestratorWorker = new Worker(ORCHESTRATOR_QUEUE, async job => {
            const jobData = job.data;
            const currentJob = await job.asJSON()

            // In thise steps, we can now check return of the previous flow, 
            // and then decide to start the next step or not
            // or select some other step based on your logic
            if (currentJob.name.includes('_child1')) {
                await FlowOrchestrator.startStep2(job);
            }
            if (currentJob.name.includes('_child2')) {
                await FlowOrchestrator.startStep3(job)
            }
            return true;

        }, {
            connection,
            concurrency: 1
        });
    }
}

Note that in the steps, you can spwan more nested flow producers and chain them in a way that they always return to your main orchestrator. You can design very complicated workflows and decision trees this way. Another good thing about this is you can maintain the data flow between your steps using the main orchestrator.

Hope it helps.