AI orchestration framework to build customizable, production-ready LLM applications. Connect components (models, vector DBs, file converters) to pipelines or agents that can interact with your data. With advanced retrieval methods, it's best suited for building RAG, question answering, semantic search or conversational agent chatbots.
Is your feature request related to a problem? Please describe.
Currently, if a step in a pipeline fails, an exception is raised and any information from the previous steps is lost. This is problematic in multi-step pipeline when the outputs of one components may be the cause of a later component failing, and where the time or cost of rerunning an earlier step is high. For example, suppose a pipeline is created with the following steps:
Input: Local audio filepath
Step 1: Transcribe audio content to text using Whisper
Step 2: Classify text with LLM, returning a (hopefully) valid JSON string
Step 3: Validate LLM JSON string
Output: Return JSON string
If Step 3 fails, the output of steps 1 & 2 are lost and neither can be returned as a partial result. Having that data available might avoid the need for reprocessing it, or would enable the ability to return a partial result to the requester, which can be very useful for debugging purposes. And while you could implement a Document store and cache checker for when a pipeline is retried, this will not work for stateless applications or in distributed systems, along with only having compatibility with Documents.
As it stands, without the ability to keep a hold of intermediate outputs, you cannot use the Pipeline component and instead need to run each component separately, manually passing the outputs of each component to the next.
Describe the solution you'd like
Ideally the pipeline should take an additional init argument defining the ideal behaviour in cases where the pipeline run fails. For example:
errors='raise' - If a pipeline error occurs, the exception is raised and nothing is returned.
errors='return' - If the pipeline fails, the final result is still returned (with only some of the keys populated), along with a pipeline_outcome key within the result. This would likely require a change to the schema of the pipeline to prevent name clashes - e.g. {"pipeline_outcome" "success", "outputs": {"llm": "Category: phone call"}}
Furthermore, if a pipeline contains a fork where the outputs from step 1 are passed to two parallel components (A and B), it would be nice to allow component B to continue running even if component A has already failed. This is a stretch feature but would be useful in instances where every partial step is still valuable to the output. For example, suppose I have a business process where I need to process a customer's email and generate a summary, a sentiment score, and a list of all names that appear in the email. This process helps augment an existing manual process, and all outputs from the pipeline will be reviewed by a human.
In this situation, even if the sentiment scoring step fails, there is still a lot of value in returning the outputs of the two other steps, as this avoids the need for a human to write them from scratch. By having 2/3 steps filled for them, they now only need to spend time generating the sentiment score, instead of having to write a large summary and extracting the phone numbers as well.
Describe alternatives you've considered
A Document Store is one alternative, but this is unsuitable for Bytestream content (without additional converters) and adds needless complexity. Another alternative would be the ability to supply the final_outputs dictionary to the Pipeline.run command, so that if the pipeline fails, you caller still has a reference to the dictionary and can inspect the results. This would allow for very minimal changes to the existing Pipeline component and allow the user to implement a try/catch block to handle cases where the pipeline fails.
These are some really good ideas. Thank you for writing this up @michaeltremeer We'll talk internally about this and update you accordingly. cc @silvanocerza
Is your feature request related to a problem? Please describe. Currently, if a step in a pipeline fails, an exception is raised and any information from the previous steps is lost. This is problematic in multi-step pipeline when the outputs of one components may be the cause of a later component failing, and where the time or cost of rerunning an earlier step is high. For example, suppose a pipeline is created with the following steps:
Input: Local audio filepath Step 1: Transcribe audio content to text using Whisper Step 2: Classify text with LLM, returning a (hopefully) valid JSON string Step 3: Validate LLM JSON string Output: Return JSON string
If Step 3 fails, the output of steps 1 & 2 are lost and neither can be returned as a partial result. Having that data available might avoid the need for reprocessing it, or would enable the ability to return a partial result to the requester, which can be very useful for debugging purposes. And while you could implement a Document store and cache checker for when a pipeline is retried, this will not work for stateless applications or in distributed systems, along with only having compatibility with
Document
s.As it stands, without the ability to keep a hold of intermediate outputs, you cannot use the Pipeline component and instead need to run each component separately, manually passing the outputs of each component to the next.
Describe the solution you'd like Ideally the pipeline should take an additional init argument defining the ideal behaviour in cases where the pipeline run fails. For example:
errors='raise'
- If a pipeline error occurs, the exception is raised and nothing is returned.errors='return'
- If the pipeline fails, the final result is still returned (with only some of the keys populated), along with apipeline_outcome
key within the result. This would likely require a change to the schema of the pipeline to prevent name clashes - e.g.{"pipeline_outcome" "success", "outputs": {"llm": "Category: phone call"}}
Furthermore, if a pipeline contains a fork where the outputs from step 1 are passed to two parallel components (
A
andB
), it would be nice to allow componentB
to continue running even if componentA
has already failed. This is a stretch feature but would be useful in instances where every partial step is still valuable to the output. For example, suppose I have a business process where I need to process a customer's email and generate a summary, a sentiment score, and a list of all names that appear in the email. This process helps augment an existing manual process, and all outputs from the pipeline will be reviewed by a human.In this situation, even if the sentiment scoring step fails, there is still a lot of value in returning the outputs of the two other steps, as this avoids the need for a human to write them from scratch. By having 2/3 steps filled for them, they now only need to spend time generating the sentiment score, instead of having to write a large summary and extracting the phone numbers as well.
Describe alternatives you've considered A Document Store is one alternative, but this is unsuitable for Bytestream content (without additional converters) and adds needless complexity. Another alternative would be the ability to supply the
final_outputs
dictionary to thePipeline.run
command, so that if the pipeline fails, you caller still has a reference to the dictionary and can inspect the results. This would allow for very minimal changes to the existingPipeline
component and allow the user to implement a try/catch block to handle cases where the pipeline fails.