taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
6.14k stars 402 forks source link

Parent job does not execute when sharing dependency with another job #1021

Open bilalshaikh42 opened 2 years ago

bilalshaikh42 commented 2 years ago

We have a series of processing steps linked to particular Id. Some of these steps depends on others, and some dependencies are shared. We have defined a flow as follows:


const runId =SomeSharedID

const JobA = {
      name: 'A',
      queueName: QueueA,
      opts: {
        jobId: `A-${runId}`,
      },
      data: {
        runId: runId,
      },
    };

    const JobB = {
      name: 'B',
      queueName: QueueB,
      opts: {
        jobId: `B-${runId}`,
      },
      data: {
        runId: runId,
      },
    };

    const JobC = {
      name: 'C',
      queueName: QueueC,
      data: {
        runId: runId,
      },
      opts: {
        jobId: `C-${runId}`,
      },
// Needs A and B
      children: [JobA, JobB]
    };

    const JobD = {
      name: 'D',
      queueName: QueueD,
      data: {
        runId: runId
      },
      opts: {
        jobId: `D-${job.id}`,
      },
      // Also needs  A and B
      children: [JobA, JobB],
    };
    const FlowC = await flowProducer.add(JobC);
    const FlowD = await flowProducer.add(JobD);

JobA and JobB are manually given ids since they should not be repeated for multiple dependent parents

In this situation, since both JobC and JobD share two jobs that have the same id as children, only the first added one will be triggered. Is JobD not being triggered an intentional behavior? Or is this a bug?

roggervalf commented 2 years ago

hi @bilalshaikh42, thanks for submitting this issue, so currently one job only can have 1 parent

ximus commented 2 years ago

multiple parents would be great!

I have an app flow where I need to generate a PDF to s3, then notify the client via an SMS and an email.

All those steps should ideally remain loosely couple pieces of code, not reference each other directly.

Ideal I would have:

+--> Generate PDF (child job)
      +--> SMS (parent job)
      +--> Email PDF (parent job)

I guess I can link them in series with the current abilities of bullmq:

+--> Generate PDF (child job)
      +--> SMS (child and parent job)
            +--> Email PDF (parent job)

The downside here is a) some lost efficiency: the latter two jobs could run concurrently b) reduced resiliency: if sms job fails, then email job doesn't run, which is too bad

or for now I'll couple the email and SMS jobs into one job, losing some granularity and code separation. or maybe there's a solution using events? listen for PDF complete and queue the two latter jobs then. Will investigate ...

Thanks for BullMQ, a solid contribution to the node world

ShadowStrikerJ commented 2 years ago

Is there any update regarding this?

From the testing I did, it seems like if you put the full flow in at the same time one of those parent jobs just won't run.

However, if you do it a second time ( after the child jobs have completed ), the parent jobs recognize their dependencies are completed just fine and both parent jobs run.

Is this something that should be happening? Its also worth noting that in the first case it just fails silently, which probably isn't intended behavior.

manast commented 2 years ago

@ShadowStrikerJ Unfortunatelly flows do not support multiple parents, and I do not know if and/or when they will be supported.

imperfect-circuits commented 1 year ago

The alternative (which we use) is to create the two new jobs within one parent job, then they can processed indepedently. E.g.

parent: jobParent
children: jobA, jobB

jobParent() {
   create jobC
   create jobD
}
pbell23 commented 1 year ago

@imperfect-circuits This solution looks interesting but it adds complexity. If the jobC succeeds but not the jobD it will be necessary to make fail jobParent and then it will be necessary to add code to reexecute jobD and not jobC. Also this assumes that jobC and jobD have no children as they are no longer part of the flow tree as soon as they are manually created by jobParent.

matpen commented 1 year ago

@ShadowStrikerJ Unfortunatelly flows do not support multiple parents, and I do not know if and/or when they will be supported.

@manast could you expand on whether this limitation is due to specific technical reasons, or simply not implemented yet? Maybe someone in the community might want to pitch in!

manast commented 1 year ago

It increases complexity and edge cases maybe an order of magnitude, which use case is so important that will require this functionality?

matpen commented 1 year ago

which use case is so important that will require this functionality?

Every workflow with multiple dependencies will need this functionality. In addition to the example in the OP, I will expand the example from the documentation.

Currently the example looks like this:

const flow = await flowProducer.add({
  name: 'renovate-interior',
  queueName: 'renovate',
  children: [
    { name: 'paint', data: { place: 'ceiling' }, queueName: 'steps' },
    { name: 'paint', data: { place: 'walls' }, queueName: 'steps' },
    { name: 'fix', data: { place: 'floor' }, queueName: 'steps' },
  ],
});

But suppose that, more realistically, painting the ceiling and the walls can only be approached after the floor is fixed. In this case, both paint jobs will depend on the floor fix, which in turn should have two parents.

The dependency graph now looks as follows: image

I see no way to implement this situation, apart from:

manast commented 1 year ago

Yeah, I am aware of the theoretical cases, I mean a real-world case where this is needed and no simple workaround can be found. For instance, in your example above you could just have one queue for fixing the floor, and in the worker for said queue you can add the rest of the flow as the last step before returning.

matpen commented 1 year ago

@manast The case I described was a simple one just as a basis for discussion. Of course, simple workarounds can be found for this and other cases: the workaround you suggest is basically based on the flow pattern, as mentioned above.

It does have, however, the problem that it does not scale well: with a more involved dependency graph, where there are multiple nodes at every level, one must resort to topological sorting. And this will prevent the flow from maximizing concurrency. To visualize this, please consider the following image: image

By using the flow pattern here, it is evident that we are loosing efficiency: paint_stairs could run as soon as fix_stairs is finished, but has to wait for fix_floor so that it can be run together with the other two paint tasks.

Please understand that here I am just trying to make the point for a genuine use case, not a "theoretical scenario". Whether such a case emerges often enough to justify the implementation effort, I would prefer to leave to the judgement of the community. I just thought to mention this, because I found many requests for this feature:

That said, thank you for working on a great library!

zaquas77 commented 9 months ago

Hi @manast and @matpen I have same problems.

In my situation I've a tools where the users define jobs and theirs depends. I like run, when possibile, in parallels job.

Thanks again for the great work!

zaquas77 commented 8 months ago

Hi @manast sorry for my insistence, but have you got some idea about this request? Could I help you in some way?

Thanks to @hhopkins95 for his discussion

thanks again.

x-etienne commented 6 months ago

+1

chingiz19 commented 4 months ago

+1

lucard17 commented 1 month ago

+1

roggervalf commented 1 month ago

hi guys, we are aware of this topic but we don't have an expected time to revisit it. Still this is a good issue to discuss your expectations

jacob-israel-turner commented 1 week ago

@manast I have a real-world use-case for you.

I have 3 jobs I need to run against a document, and 1 job that will parse and cache the document contents. The document parsing is a really expensive step, so I will not be doing it in all 3 jobs - it will be done as a pre-processing step.

The 3 jobs require the contents of the document in order to run. I need to create a flow like this:

Job A parses the document and caches it. Job B reads the document contents and pulls out information X (only runs after Job A completes). Job C reads the document contents and pulls out information Y (only runs after Job A completes). Job D reads the document contents and pulls out information Z (only runs after Job A completes).

To be clear, I have not installed BullMQ in this project yet. I've used it in projects in the past (when it was still called Bull), so it was top of mind when I needed a solution for this project. However, the inflexibility of flows may be a showstopper for usage in this project. I anticipate having many more flows like this in the future, and I need a tool that will support my real-world use-cases.

jacob-israel-turner commented 1 week ago

Yeah, I am aware of the theoretical cases, I mean a real-world case where this is needed and no simple workaround can be found. For instance, in your example above you could just have one queue for fixing the floor, and in the worker for said queue you can add the rest of the flow as the last step before returning.

This is a possible workaround, but it's not very reasonable. When I'm creating the complex flow at the start, I have all of the data (including sensitive credentials) that are required for the entire flow. If I have to break the flow into multiple flows, then I have to shim this data through every flow, muddying the business logic and making the flows brittle.

As it stands, the flow I'm planning would need to be broken up into at least 3 flows, unless the child-parent relationship is made more flexible.

EDIT:

To shed some more light on what I've planned out considering the BullMQ constraints:

flowchart TD
    Start["Job 1: Initialize Processing<br/><small>Inputs:<br/>- Document Links[]<br/>- API Token<br/>- API Base URL</small>"]

    subgraph VerifyFlow["Document Verification Flow"]
        Check["Job 2:<br/>Validate Document Type"]
        CreateNext["Job 3:<br/>Initialize Processing"]
    end

    Abort["Abort:<br/>Invalid Document Type"]

    subgraph ParseFlow["Document Processing Flow"]
        Parse1["Job 4:<br/>Primary Parser"]
        Parse2["Job 5:<br/>Secondary Parser"]
        Merge["Job 6:<br/>Combine Results & Cache"]
    end

    subgraph ExtractFlow["Data Extraction Flow"]
        Extract1["Job 7:<br/>Extract Primary Data"]
        Extract2["Job 8:<br/>Extract Secondary Data"]
        Extract3["Job 9:<br/>Extract Tertiary Data"]
        Save["Job 10:<br/>Persist to Database"]
    end

    Start -->|"For each Document..."| Check
    Check -->|"❌"| Abort
    Check -->|"✅"| CreateNext
    CreateNext -->|"Document Link"| Parse1
    CreateNext -->|"Document Link"| Parse2
    Parse1 -->|"Primary Results"| Merge
    Parse2 -->|"Secondary Results"| Merge
    Merge --> Extract1
    Merge --> Extract2
    Merge --> Extract3
    Extract1 -->|"Dataset 1"| Save
    Extract2 -->|"Dataset 2"| Save
    Extract3 -->|"Dataset 3"| Save

    style Start fill:#ffd,stroke:#333,color:#000
    style Check fill:#f9f,stroke:#333,color:#000
    style CreateNext fill:#f9f,stroke:#333,color:#000
    style Parse1 fill:#f9f,stroke:#333,color:#000
    style Parse2 fill:#f9f,stroke:#333,color:#000
    style Merge fill:#bbf,stroke:#333,color:#000
    style Extract1 fill:#bfb,stroke:#333,color:#000
    style Extract2 fill:#bfb,stroke:#333,color:#000
    style Extract3 fill:#bfb,stroke:#333,color:#000
    style Save fill:#fbb,stroke:#333,color:#000
    style Abort fill:#fee,stroke:#f66,color:#000

    style VerifyFlow fill:#fff8,stroke:#00f,stroke-width:2px,stroke-dasharray: 5 5
    style ParseFlow fill:#fff8,stroke:#00f,stroke-width:2px,stroke-dasharray: 5 5
    style ExtractFlow fill:#fff8,stroke:#00f,stroke-width:2px,stroke-dasharray: 5 5

Where Job 10 needs to make a final request to an API to persist the results. If I do this in 3 flows, I have to shim API token and API base URL through each request.

If a child could have multiple parents, then I could do all of this in a single flow, and provide the credentials to the final job from the first job when the flow is initialized.

Let me know if you have any questions or a more reasonable workaround @manast.

jacob-israel-turner commented 1 week ago

I decided to implement this using temporal.io instead. The inflexibility of creating the workflow graph was the primary deciding factor.