deepset-ai / haystack

:mag: LLM orchestration framework to build customizable, production-ready LLM applications. Connect components (models, vector DBs, file converters) to pipelines or agents that can interact with your data. With advanced retrieval methods, it's best suited for building RAG, question answering, semantic search or conversational agent chatbots.
https://haystack.deepset.ai
Apache License 2.0
14.5k stars 1.71k forks source link

Allow Pipelines to be run/reused in "SuperPipelines" #7638

Closed mikebellerU closed 1 week ago

mikebellerU commented 2 months ago

Hi -- I have been using haystack to build out some complicated RAG pipelines. They are too complicated to build in a single Pipeline. I would like to be able to "compose" sub-pipelines together. This will allow for building complex pipelines from smaller ones, and would also allow for reuse of smaller pipelines in various ways.

Here is a trivial example of what I'd like to be able to do. In a real use case the subpipelines p1, p2 would course be larger and more complicated, and do something useful!

from haystack import Pipeline
from haystack.components.others.pipeline import PipelineComponent
from haystack.components.converters import OutputAdapter

p1 = Pipeline()
p1.add_component("adap", OutputAdapter(
    template="Hello {{inp}}", output_type=str))
p2 = Pipeline()
p2.add_component("adap", OutputAdapter(
    template="Goodbye {{inp}}", output_type=str))
p = Pipeline()
p.add_component("pipeline1", PipelineComponent(p1))
p.add_component("pipeline2", PipelineComponent(p2))
p.connect("pipeline1.adap:output", "pipeline2.adap:inp")
print(p.run(data={"pipeline1": {"adap:inp": "Paris"}}))

Notes:

Alternatives Considered:

Additional context

Some amazing things this could enable: What if we had a ParallelPipelineComponent that can run multiple copies of the same Pipeline in parallel using a ThreadPoolExecutor or using Dask/Ray/something! It would be fairly easy to do I think once we had PipelineComponent.

masci commented 2 months ago

@vblagoje for visibility

Redna commented 1 month ago

I did an implementation for the ParallelPipelineComponent some time ago. Maybe you can reuse something here.

https://github.com/Redna/haystack-extensions/blob/main/src/haystack_extensions/components/concurrent_runner/runner.py

@component
class ConcurrentPipelineRunner:
    """
    This component allows you to run multiple pipelines concurrently in a thread pool.
    """

    def __init__(self, named_pipelines: List[NamedPipeline], executor: Optional[ThreadPoolExecutor | None] = None):
        if type(named_pipelines) != list or any(
            [type(named_pipeline) != NamedPipeline for named_pipeline in named_pipelines]
        ):
            raise ValueError("named_pipelines must be a list of NamedPipeline instances")

        names = [named_pipeline.name for named_pipeline in named_pipelines]
        if len(names) != len(set(names)):
            raise ValueError("All components must have unique names")

        for named_pipeline in named_pipelines:
            component.set_input_type(self, named_pipeline.name, {named_pipeline.name: Dict[str, Any]})

        output_types = {}
        for named_pipeline in named_pipelines:
            output_types[named_pipeline.name] = Dict[str, Any]
        self.pipelines = named_pipelines
        self.executor = executor

    def run(self, **inputs):
        if self.executor is None:
            with ThreadPoolExecutor() as executor:
                final_results = self._run_in_executor(executor, inputs)
        else:
            final_results = self._run_in_executor(self.executor, inputs)

        return {named_pipeline.name: result for named_pipeline, result in zip(self.pipelines, final_results)}

    def _run_in_executor(self, executor: ThreadPoolExecutor, inputs: Dict[str, Any]):
        results = executor.map(lambda c: c[0].pipeline.run(data=inputs[c[1]]), zip(self.pipelines, inputs.keys()))
        return [result for result in results]

https://github.com/Redna/haystack-extensions/blob/main/tests/test_runner.py

def test_concurrent_pipeline_runner(self):
        component_call_stack = []

        def callback(component):
            component_call_stack.append(component)

        simple_component_1 = SimpleComponent(wait_time=0.09, callback=callback)
        pipeline1 = Pipeline()
        pipeline1.add_component("simple_component", simple_component_1)

        simple_component_2 = SimpleComponent(wait_time=0.02, callback=callback)
        pipeline2 = Pipeline()
        pipeline2.add_component("simple_component", simple_component_2)

        concurrent_pipeline_runner = ConcurrentPipelineRunner(
            [NamedPipeline("pipeline1", pipeline1), NamedPipeline("pipeline2", pipeline2)]
        )

        overall_pipeline = Pipeline()

        overall_pipeline.add_component("concurrent_pipeline_runner", concurrent_pipeline_runner)

        results = overall_pipeline.run(
            data={
                "concurrent_pipeline_runner": {
                    "pipeline1": {"simple_component": {"increment": 1}},
                    "pipeline2": {"simple_component": {"increment": 2, "number": 10}},
                }
            }
        )

        assert results == {
            'concurrent_pipeline_runner': {
                'pipeline1': {'simple_component': {'number': 6}},
                'pipeline2': {'simple_component': {'number': 12}},
            }
        }
        assert len(component_call_stack) == 2
        assert component_call_stack[0] == simple_component_2
        assert component_call_stack[1] == simple_component_1
mikebellerU commented 1 month ago

thanks @Redna -- it's an interesting solution. One of the problems I was struggling with is addressed here -- the setting of input and output types. Another issue though that I still struggle with -- is whether the data management part is getting quite complicated. The fact that the output looks like this:

       assert results == {
            'concurrent_pipeline_runner': {
                'pipeline1': {'simple_component': {'number': 6}},
                'pipeline2': {'simple_component': {'number': 12}},
            }
        }

Someone trying to get results from these many pipelines has to have a lot of knowledge about the internals of the pipelines. Do you know of any guidance on a "good" way to build reusable pipelines with Haystack? Reusable in the sense that they are flexible, but where the user of the reusable pipeline doesn't have to know all the details of the internal components of the pipeline in order to run it.

vblagoje commented 1 month ago

Hey @mikebellerU and @Redna , before jumping onto some of these great ideas about pipeline executors, let's focus on making https://github.com/deepset-ai/haystack/compare/massi/pipeline-component actually work, the main "issue" remaining to be (de)serialization of these super components so they can be saved/loaded/reused/shared perhaps not only by yourself but within community. What do you think about that? I'm working on some other items right now but would love to contribute in the coming weeks.

mikebellerU commented 1 month ago

I tried playing around with the @masci component (I tweaked it so it would work at least for my case). And here is what I learned: Quickly it all gets too hard to manage the levels of data input and output. To invoke the "parent" pipeline, you may have to understand the detailed 'run' signature of the"child" pipeline.

Right now, to invoke a typical Haystack 2.0 RAG pipeline, I have to write something like response = pipeline.run(data={'retriever':... , 'embedder': .... , 'llm':....}), and then when it returns I have to pick out response['answer_builder']['answers'][0].data to get the result I'm interested in. Wouldn't it be better if there was a way I could encapsulate the knowledge about running this pipeline into a runner method (or some other name), with a signature like: answer = rag_pipeline.runner(query=..,docstore=...) ? This method could live alongside run potentially as a wrapper for it, but would allow for a type-and-parameter-checked reusable pipeline, that abstracts its internal details.

TLDR: Solving composability of pipelines I think needs some thought about how to abstract away the underlying details, and that should factor into the design of PipelineRunner.

masci commented 1 month ago

We're rolling out this feature as part of the "experimental" package, you can follow this PR https://github.com/deepset-ai/haystack-experimental/pull/9

mikebellerU commented 1 month ago

We're rolling out this feature as part of the "experimental" package, you can follow this PR deepset-ai/haystack-experimental#9

Hi @masci. Thanks I will check it out. -- is haystack-experimental intended to be package that I install separately from haystack so that I can access additional components?

masci commented 1 month ago

Hi @masci. Thanks I will check it out. -- is haystack-experimental intended to be package that I install separately from haystack so that I can access additional components?

It is a separated package, but the idea is to make it a dependency of haystack-ai, so users can access experimental features by just changing import paths.

masci commented 2 weeks ago

After an internal sync we decided not to pursue this component further. The solution doesn't feel quite right and the use cases that would benefit the most (like agentic pipelines) still have unclear requirements.

Currently the problem can be addressed by breaking down pipelines into smaller ones when possible, or considering to have bigger components.

If anyone from the community wants to take this on, this could make a good integration.

shadeMe commented 1 week ago

Closing this issue for now.