langchain-ai / langchainjs

🦜🔗 Build context-aware reasoning applications 🦜🔗
https://js.langchain.com/docs/
MIT License
12.51k stars 2.13k forks source link

How to return source documents from runnnableSequence RAG chain #4189

Closed Gr33nLight closed 8 months ago

Gr33nLight commented 8 months ago

Given the following code, how to I return the source documents togheter with the stream object?

import { DocumentInterface } from '@langchain/core/documents';
import { StringOutputParser } from '@langchain/core/output_parsers';
import { RunnablePassthrough, RunnableSequence } from '@langchain/core/runnables';
import { OpenAIEmbeddings } from '@langchain/openai';
import { Message } from 'ai';
import { ContextualCompressionRetriever } from 'langchain/retrievers/contextual_compression';
import { EmbeddingsFilter } from 'langchain/retrievers/document_compressors/embeddings_filter';
import { formatDocumentsAsString } from 'langchain/util/document';

import { nonStreamingModel, streamingModel } from './llm.js';
import { getPineconeClient } from './pinecone-client.js';
import { ANSWER_PROMPT_TEMPLATE, STANDALONE_QUESTION_TEMPLATE } from './prompt-templates.js';
import { trimStrToTokenLimit } from './token.js';
import { getVectorStore } from './vector-store.js';

type ConversationalRetrievalQAChainInput = {
  question: string;
  chat_history: Message[];
};

type DocumentType = {
  pageContent: string;
  metadata: {
    'loc.pageNumber'?: number;
  };
};

const formatMessageHistory = (messages: Message[]) => {
  const messagesStr = messages
    .map((message) => `${message.role === 'user' ? 'Human' : 'Assistant'}: ${message.content}`)
    .join('\n');
  return trimStrToTokenLimit(messagesStr);
};

const formatSourceDocs = (docs: DocumentInterface<Record<string, unknown>>[]) => {
  const firstTwoDocuments = docs.slice(0, 2);
  const pageContents = firstTwoDocuments.map(({ pageContent, metadata }: DocumentType) => ({
    page: metadata['loc.pageNumber'],
    body: pageContent,
  }));
  return pageContents;
};

export async function callChain({
  question,
  chat_history,
  namespace,
}: ConversationalRetrievalQAChainInput & { namespace: string }) {
  try {
    const sanitizedQuestion = question.trim().replaceAll('\n', ' ');
    const pineconeClient = await getPineconeClient();
    const vectorStore = await getVectorStore(pineconeClient, namespace);

    //Consider reranking embeddings with Cohere api in the future
    const baseCompressor = new EmbeddingsFilter({
      embeddings: new OpenAIEmbeddings({
        timeout: 10000,
      }),
      similarityThreshold: 0.7,
    });

    const retriever = new ContextualCompressionRetriever({
      baseCompressor,
      baseRetriever: vectorStore.asRetriever(),
    });

    const standaloneQuestionChain = RunnableSequence.from([
      {
        chat_history: (input: ConversationalRetrievalQAChainInput) =>
          formatMessageHistory(input.chat_history),
        question: (input: ConversationalRetrievalQAChainInput) => input.question,
      },
      STANDALONE_QUESTION_TEMPLATE,
      nonStreamingModel,
      new StringOutputParser(),
    ]);

    const answerChain = RunnableSequence.from([
      {
        context: retriever.pipe(formatDocumentsAsString).pipe(trimStrToTokenLimit),
        question: new RunnablePassthrough(),
      },
      ANSWER_PROMPT_TEMPLATE,
      streamingModel,
    ]);

    const conversationalRetrievalQAChain = standaloneQuestionChain.pipe(answerChain);

    const stream = await conversationalRetrievalQAChain.stream({
      question: sanitizedQuestion,
      chat_history: chat_history,
    });
    return { stream };
  } catch (e) {
    console.error(e);
    throw new Error('Call chain method failed to execute!!');
  }
}
dosubot[bot] commented 8 months ago

🤖

Hey there, @Gr33nLight! Good to see you again in the langchainjs repo. Hope you're coding away to glory!

In the provided code, the source documents are retrieved and processed within the ContextualCompressionRetriever and EmbeddingsFilter classes, but they are not returned from the callChain function. To return the source documents along with the stream object, you would need to modify the callChain function to include the source documents in its return value.

However, the RunnableSequence class does not currently support returning additional data along with the stream object. The stream method of a RunnableSequence instance returns a stream of the outputs of the sequence of operations, and there is no built-in mechanism to include additional data in this stream.

One possible solution would be to modify the RunnableSequence class to support returning additional data. This would involve changing the stream method to return an object that includes both the stream and any additional data, instead of just the stream. However, this would be a significant change that could affect other parts of the code that use the RunnableSequence class.

Another solution would be to store the source documents in a variable that is accessible from outside the callChain function, and then retrieve this variable after calling callChain. This would not require any changes to the RunnableSequence class, but it would require careful management of the variable to ensure that it is updated correctly and not accessed before it is ready.

Here is an example of how you might implement the second solution:

let sourceDocs;

export async function callChain({
  question,
  chat_history,
  namespace,
}: ConversationalRetrievalQAChainInput & { namespace: string }) {
  try {
    // ... existing code ...

    const retriever = new ContextualCompressionRetriever({
      baseCompressor,
      baseRetriever: vectorStore.asRetriever(),
      onRetrieve: (docs) => {
        sourceDocs = formatSourceDocs(docs);
      },
    });

    // ... existing code ...

    return { stream, sourceDocs };
  } catch (e) {
    console.error(e);
    throw new Error('Call chain method failed to execute!!');
  }
}

In this code, the onRetrieve option is added to the ContextualCompressionRetriever constructor. This option is a function that is called with the retrieved documents as an argument. The function formats the documents and stores them in the sourceDocs variable. The callChain function then returns this variable along with the stream.

Please note that this is a simplified example and may not work as expected in a real-world application. You would need to ensure that the onRetrieve function is called at the correct time and that the sourceDocs variable is managed correctly.

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

Gr33nLight commented 8 months ago

@dosu-bot unfortunately, the sourceDocs is undefined when the value is returned. The callback gets triggered but its tool late, since its a streaming response the return happens immediately whereas the retriever callback happens after

dosubot[bot] commented 8 months ago

🤖

To return the source documents along with the stream object, you can modify the context function in the RunnableSequence to return both the serialized documents and the relevant documents. You can then modify the stream function to return an object containing both the stream and the source documents. Here's how you can do it:

const chain = RunnableSequence.from([
  {
    question: (input: { question: string; chatHistory?: string }) =>
      input.question,
    chatHistory: (input: { question: string; chatHistory?: string }) =>
      input.chatHistory ?? "",
    context: async (input: { question: string; chatHistory?: string }) => {
      const relevantDocs = await retriever.getRelevantDocuments(input.question);
      const serialized = formatDocumentsAsString(relevantDocs);
      return { serialized, relevantDocs };
    },
  },
  questionPrompt,
  model,
  new StringOutputParser(),
]);

const { stream, sourceDocs } = await chain.stream({
  question: "What did the president say about Justice Breyer?",
});

let streamedResult = "";
for await (const chunk of stream) {
  streamedResult += chunk;
  console.log(streamedResult);
}

console.log(sourceDocs);

In this modification, the context function now returns an object containing both the serialized documents and the relevant documents. The stream function then destructures this object to get both the stream and the source documents. The source documents are then logged to the console after the stream has been processed.

The RunnableSequence class in langchainjs handles asynchronous operations through the use of Promises and async/await syntax. Specifically, for streaming responses, it uses the stream method which returns a Promise<IterableReadableStream<RunOutput>>.

The stream method works by creating an AsyncGeneratorWithSetup instance from the _streamIterator method, which is an asynchronous generator function. This generator function yields the result of the invoke method, which is a Promise. The stream method then waits for the setup of the AsyncGeneratorWithSetup instance to complete using await wrappedGenerator.setup, and finally returns an IterableReadableStream from the AsyncGeneratorWithSetup instance.

In the RunnableLambda class, the stream method works similarly, but it uses the transform method instead of the invoke method. The transform method returns an AsyncGenerator<RunOutput>, which is then used to create the AsyncGeneratorWithSetup instance.

Please note that this is a high-level explanation and the actual implementation may vary depending on the specifics of your use case and the version of langchainjs you are using. For more detailed information, you can refer to the langchain-core/src/runnables/base.ts file in the langchainjs repository.

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.