langchain-ai / langgraphjs

Build resilient language agents as graphs.
https://langchain-ai.github.io/langgraphjs/
MIT License
667 stars 105 forks source link

Subgraph Issues: Invalid Update For Channel query #474

Open justinlevi opened 2 months ago

justinlevi commented 2 months ago

I'm trying to build a RAG graph with a few extra nodes to experiment with a few ideas and I thought it would be interesting to try and create a subgraph for the rag pipeline. Unfortunately I can't figure out where this error is coming from and how to solve it. Any direction is greatly appreciated.

I've tried everything I can think of to return the state from the subgraph to match the parent graph state as well as even just return an empty state in the last node in the graph, but nothing seems to work. I feel like the issue is with the handoff back to the parent graph, but I'm not sure I fully understand how to return properly.

I'm seeing the following error:

[Nest] 51582  - 09/10/2024, 8:37:25 PM   ERROR [ExceptionsHandler] Invalid update for channel query. Values: How can I create a managed cluster?,What are the step-by-step instructions to create a managed cluster?,What files are needed to create a managed cluster?,What code is required to create a managed cluster?,How can I deploy a managed cluster to a specific bare metal host?,What are the step-by-step instructions to deploy to a specific bare metal host?,What files are needed to deploy to a specific bare metal host?,What code is required to deploy to a specific bare metal host?

Error:
InvalidUpdateError: Invalid update for channel query. Values: How can I create a managed cluster?,What are the step-by-step instructions to create a managed cluster?,What files are needed to create a managed cluster?,What code is required to create a managed cluster?,How can I deploy a managed cluster to a specific bare metal host?,What are the step-by-step instructions to deploy to a specific bare metal host?,What files are needed to deploy to a specific bare metal host?,What code is required to deploy to a specific bare metal host?

Error:
    at _applyWrites (/Users/justinwinter/projects/docbot/backend/node_modules/@langchain/langgraph/dist/pregel/index.cjs:652:27)
    at CompiledStateGraph._transform (/Users/justinwinter/projects/docbot/backend/node_modules/@langchain/langgraph/dist/pregel/index.cjs:475:17)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
    at async CompiledStateGraph._transformStreamWithConfig (/Users/justinwinter/projects/docbot/backend/node_modules/@langchain/core/dist/runnables/base.cjs:308:30)
    at async CompiledStateGraph.transform (/Users/justinwinter/projects/docbot/backend/node_modules/@langchain/langgraph/dist/pregel/index.cjs:536:26)
    at async Object.pull (/Users/justinwinter/projects/docbot/backend/node_modules/@langchain/core/dist/utils/stream.cjs:98:41)

Here is my state and graph definitions

import { Injectable, Logger } from '@nestjs/common';
import { StateGraph, END, START, Send } from '@langchain/langgraph';
import { PrismaCheckpointer } from './checkpointer';
import { PrismaService } from '../prisma.service';
import {
  RetrieveDocumentsNode,
  curateComplexResponsesNode,
  generateRagResponseNode,
  gradeDocumentsNode,
  queryDestructuringNode,
} from './nodes';
import { queryKeywordsNode } from './nodes/queryKeywordsNode';
import { TavilySearchNode } from './nodes/tavilySearchNode';
import { querySimpleComplexRouterChain } from './chains/querySimpleComplexRouterChain';
import { BaseMessage } from '@langchain/core/messages';

import { messagesStateReducer, Annotation } from '@langchain/langgraph';
import { Document } from '@langchain/core/documents';

export type IMainState = typeof MainState.State;

export const MainState = Annotation.Root({
  messages: Annotation<BaseMessage[]>({
    reducer: messagesStateReducer,
  }),
  query: Annotation<string>(),
  queryType: Annotation<'SIMPLE' | 'COMPLEX' | 'INTERACTION'>(),
  queries: Annotation<string[]>,
  responses: Annotation<string[]>({
    reducer: (prev: string[], curr: string[]) => [...prev, ...curr],
  }),
  finalResponse: Annotation<string>,
});

export type IRAGState = typeof RAGState.State;

export const RAGState = Annotation.Root({
  messages: Annotation<BaseMessage[]>,
  query: Annotation<string>,
  queryRewrite: Annotation<string>,
  keywords: Annotation<string[]>,
  documents: Annotation<Document[]>,
  response: Annotation<string>,
});

@Injectable()
export class AgentGraph {
  private readonly logger = new Logger(AgentGraph.name);

  constructor(
    private readonly prismaService: PrismaService,
    private readonly retrieverNode: RetrieveDocumentsNode,
    private readonly tavilySearchNode: TavilySearchNode,
  ) {}

  private checkpointer: PrismaCheckpointer;

  createGraph() {
    const ragSubgraph = new StateGraph(RAGState)
      // Add nodes
      .addNode('GENERATE_KEYWORDS', queryKeywordsNode.bind(this))
      .addNode('PARALLEL_FETCH', async (state, config: any) => {
        const [fetchResult, webResult] = await Promise.all([
          this.retrieverNode.retrieve(state, config),
          this.tavilySearchNode.search(state),
        ]);

        const combinedDocuments = [
          ...fetchResult.documents,
          ...webResult.documents,
        ];
        return { documents: combinedDocuments };
      })
      .addNode('GRADE_DOCUMENTS', gradeDocumentsNode.bind(this))
      .addNode('GENERATE_RESPONSE', generateRagResponseNode.bind(this))

      // Add edges
      .addEdge(START, 'GENERATE_KEYWORDS')
      .addEdge('GENERATE_KEYWORDS', 'PARALLEL_FETCH')
      .addEdge('PARALLEL_FETCH', 'GRADE_DOCUMENTS')
      .addEdge('GRADE_DOCUMENTS', 'GENERATE_RESPONSE')
      .addEdge('GENERATE_RESPONSE', END);

    const graph = new StateGraph(MainState)
      // Add nodes
      .addNode('ROUTE_QUERY', this.routeQueryNode.bind(this))
      .addNode('DESTRUCTURE_QUERY', queryDestructuringNode.bind(this))
      .addNode('RAG_PIPELINE', ragSubgraph.compile())
      .addNode('CURATE_RESPONSES', curateComplexResponsesNode.bind(this))

      // Add edges
      .addEdge(START, 'ROUTE_QUERY')
      .addConditionalEdges('ROUTE_QUERY', ({ queryType }) => queryType, {
        SIMPLE: 'RAG_PIPELINE',
        COMPLEX: 'DESTRUCTURE_QUERY',
        INTERACTION: END,
      })
      .addConditionalEdges(
        'DESTRUCTURE_QUERY',
        this.continueToRagPipeline.bind(this),
      )
      .addEdge('RAG_PIPELINE', 'CURATE_RESPONSES')
      .addEdge('CURATE_RESPONSES', END);

    this.checkpointer = new PrismaCheckpointer(this.prismaService);

    const compiled = graph.compile({ checkpointer: this.checkpointer });

    const representation = compiled.getGraph();
    console.log(representation.drawMermaid());

    return compiled;
  }

  private continueToRagPipeline = (state: typeof MainState.State) => {
    // We will return a list of `Send` objects
    // Each `Send` object consists of the name of a node in the graph
    // as well as the state to send to that node
    return state.queries.map(
      (query) => new Send('RAG_PIPELINE', { messages: state.messages, query }),
    );
  };

  private async routeQueryNode(
    state: IMainState,
  ): Promise<Partial<IMainState>> {
    this.logger.log('Routing question');
    const question = state.query;
    const questionRouter = await querySimpleComplexRouterChain();

    const source = await questionRouter.invoke({ question });
    this.logger.log(`Route question to ${source.queryType}`);
    return {
      queryType: source.queryType,
    };
  }

  getCheckpointer() {
    return this.checkpointer;
  }
}
justinlevi commented 2 months ago

I'm also pretty sure I'm not doing the parallel branching correctly either but I'm not sure how to handle that either

Masstronaut commented 1 month ago

We're currently working on support for subgraphs in langgraphjs, but it's not finished yet.