deepset-ai / haystack

:mag: AI orchestration framework to build customizable, production-ready LLM applications. Connect components (models, vector DBs, file converters) to pipelines or agents that can interact with your data. With advanced retrieval methods, it's best suited for building RAG, question answering, semantic search or conversational agent chatbots.
https://haystack.deepset.ai
Apache License 2.0
17.18k stars 1.88k forks source link

components should run concurrently when not explicitly waiting on inputs #8453

Open alex-stoica opened 4 days ago

alex-stoica commented 4 days ago

If a component is not blocked by explicit inputs from another node, it should run concurrently with other components to optimize pipeline execution. This unnecessary waiting behavior reduces pipeline performance.

For example, in a pipeline like

    A -> ch1
    B -> ch1
    ch1 -> C
    C -> D
    C -> E

A and B should run concurrently, as they have no dependencies on each other. D and E should also run concurrently, since neither is dependent on the other

However, in practice, the following behavior occurs:

You can replicate this behavior using any components. For my tests, I used the following:

class TaskSimulator:
    @component.output_types(started_at=str, ended_at=str, task_id=int, pipeline_id=str, next_task_id=int)
    def run(self, id: int, pipeline_name: str):
        started_at = datetime.now()
        time.sleep(random.uniform(4, 6))
        ended_at = datetime.now()
        return {
            "started_at": started_at.strftime('%Y-%m-%d %H:%M:%S'),
            "ended_at": ended_at.strftime('%Y-%m-%d %H:%M:%S'),
            "task_id": id,
            "pipeline_id": pipeline_name,
            "next_task_id": id + 1
        }

@component
class ColliderHelper:
    @component.output_types(started_at=str, ended_at=str, task_id=int, pipeline_id=str, next_task_id=int)
    def run(self, left_id: int, right_id: int, pipeline_name: str):
        started_at = datetime.now()
        time.sleep(random.uniform(4, 6))
        ended_at = datetime.now()
        return {
            "started_at": started_at.strftime('%Y-%m-%d %H:%M:%S'),
            "ended_at": ended_at.strftime('%Y-%m-%d %H:%M:%S'),
            "task_id": max(left_id, right_id),
            "pipeline_id": pipeline_name,
            "next_task_id": max(left_id, right_id) + 1
        }
Quang-elec44 commented 1 day ago

It seems that haystack does not support parallel execution. I spent time reading the document but currently, there is no solution.

btw, @alex-stoica, could you tell me how to visualize the pipeline after executing?

alex-stoica commented 5 hours ago

@Quang-elec44, regarding the visualization, the connection between the components should stay the same. However, the execution starting time must be much closer in independent components from the same level

Now, Haystack has a Cookbook (not native support) for a workaround - https://haystack.deepset.ai/cookbook/concurrent_tasks This tutorial shows how to group together components that we think they should execute concurrently. Multiple issues might occur from here:

  1. you have to always know which nodes / components are on the same level
  2. you have to build custom wrappers for (1)
  3. you have to explicitly move (1) into (2)
  4. the visualization will probably become linear, without any ramification, as all the ramifications from the same level will be inside a group
  5. it's important to have subsequent execution of nodes under those grouped in (1). For example A->B, A->C means wrapping (B,C) accoding to (1), (2). However, what if C->E->F->G ... ? It means this part must start executing immediately after the termination of C, which I am not sure happens with the fix from the cookbook
Quang-elec44 commented 5 hours ago

@alex-stoica Yeah, I read the tutorial but didn't find it useful. I think Haystack lacks dynamic/parallel graph execution, so the team needs to work more on this. Currently, I switch to langgraph since they support concurrent tasks very well.

alex-stoica commented 4 hours ago

I see your point. While it’s not a major issue for me, I was surprised to see this happen. This underscores why graph-based execution is often preferred. If the graph (or pipeline) runs synchronously, the benefits over traditional single-threaded, top-down execution are minimal. I understand that the pipelines built with Haystack aids in visualization and tracking I/O for each component, but execution-wise, there's no real advantage