deepset-ai / canals

A component orchestration engine
https://deepset-ai.github.io/canals/
Apache License 2.0
27 stars 2 forks source link

re-introduce variadics to support Joiner node #122

Closed masci closed 11 months ago

masci commented 11 months ago

This is an attempt to make progress on the topic of connecting multiple outputs to the same downstream input without incurring in the complexity needed by #116

This solution is based on two requirements:

  1. We will still need a "joiner" node to fan-in multiple outputs into one input
  2. We need the concept of "variadic" input to be re-introduced so we can simplify the joiner node

Compared to previous attempts at adding the same feature in Canals, the behaviour of a variadic socket was oversimplified: any value coming to the socket is appended to a list - it will be component's responsibility to decide what to do with the values.

Specific notes about the implementation details are in the PR comments, please have a look at them

The code in this PR was tested with the following use case: image

julian-risch commented 11 months ago

Would an arbitrary number of List[Document] inputs work the same way as in your StringJoiner? I tried this PR with the DocumentJoiner I am working on. Ran into a problem connecting a List[Document] output of a retriever to a Variadic[List[Document]] input of a DocumentJoiner:

PipelineConnectError: Cannot connect 'bm25_retriever.documents' with 'joiner.documents': their declared input and output types do not match. 'bm25_retriever':

  • documents (List[Document]) 'joiner':
  • documents ((typing.List), available

Here is a simplified example showing the error. Full example is in my draft PR.

ZanSara commented 11 months ago

@julian-risch I think the example is doing something different than the PR (although it's also a valid usecase)

The PR makes possible to connect N components that output Document into a component with a single input Variadic[Document]. While in your case it seems that you're trying to connect a List[Document] to a Variadic[Document].

Basically the idea is to read Variadic typing by just removing the Variadic keyword: so you're trying to connect Document and List[Document] sockets, which then doesn't work.

Did I interpret the example correctly or did I miss something?


EDIT: ok, after reading it again I think this example should work. It's a bit strange (you're using the Joiner as a no-op) but the typing is right: List[Document] and Variadic[List[Document]]

Let's debug this before merge!

ZanSara commented 11 months ago

@masci FYI, there is a rendering issue with the Variadic type. Error messages are much harder to parse right now:

image

This should read:

Cannot connect 'bm25_retriever.documents' with 'joiner.documents': their declared input and output types do not match.
'bm25_retriever':
 - documents (List[Document])
'joiner':
 - documents (List[Document], available)
masci commented 11 months ago

@julian-risch good catch, I don't inspect the type recursively, that's a bug!

ZanSara commented 11 months ago

Taken with a minor change from @julian-risch's notebook:

from canals import component
from canals.component.types import Variadic
from haystack.preview import Document
from typing import List

@component
class DocumentTextJoiner:

    @component.output_types(output=str)
    def run(self, *documents: Variadic[List[Document]]):
        """
        Take List[Document] from multiple input nodes and join their texts
        into a single text returned in output. Since `documents`
        is Variadic, we know we'll receive a List[List[Document]].
        """
        return {"output": " ".join([doc.text for docs in documents for doc in docs])}

DocumentTextJoiner().run(
    [Document(text="a"), Document(text="b")], 
    [Document(text="c")]
)
# returns {'output': 'a b c'} as expected

But then:

from haystack.preview import Pipeline
from haystack.preview.components.embedders import SentenceTransformersTextEmbedder
from haystack.preview.document_stores import InMemoryDocumentStore
from haystack.preview.components.retrievers import InMemoryBM25Retriever, InMemoryEmbeddingRetriever

document_store = InMemoryDocumentStore()
p = Pipeline()
p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever")
p.add_component(instance=DocumentTextJoiner(), name="joiner")
#...
p.connect("bm25_retriever", "joiner")
PipelineConnectError: Cannot connect 'bm25_retriever.documents' with 'joiner.documents': their declared input and output types do not match.
'bm25_retriever':
 - documents (List[Document])
'joiner':
 - documents ((typing.List), available
ZanSara commented 11 months ago

My intuition is that this comes from our type-matching function types_are_compatible: https://github.com/deepset-ai/canals/blob/429c3475c8ca007fd3db60d40917fe205e7bcf27/canals/type_utils.py#L19C22-L19C22

We should make sure it works properly with the Variadic type, which is different for all the others because it needs one end to be unwrapped but not the other. Checking for it and unwrapping before checking the type compatibility should fix the bug.

ZanSara commented 11 months ago

Uh, hold on merging for a little longer, I might have found another small issue similar to Julian's

ZanSara commented 11 months ago

Here it is: https://colab.research.google.com/drive/1En-X7E4gL4_5PyX1pfLNCDaLWkMm1H01?usp=sharing

Basically variadic input is not compatible with our Pipeline.run() validation. This code should work:

document_store = InMemoryDocumentStore()
p = Pipeline()
p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="retriever")
p.add_component(instance=DocumentTextJoiner(), name="joiner")

p.connect("retriever", "joiner")

p.run({"retriever" : {"query": "why?"}, "joiner": {"documents": [Document(text="a")]}})

However it now fails:

ValueError: The input documents of joiner is already sent by: ['retriever']

Where the input documents of joiner is variadic.

ZanSara commented 11 months ago

Here is the culprit: https://github.com/deepset-ai/canals/blob/main/canals/pipeline/validation.py#L93-L95

We should check for variadics there as well and it should be good.


I found also another slight weirdness about variadics in the pipeline's input node, but I will test again after the issue above is fixed.