langchain-ai / langgraphjs

âš¡ Build language agents as graphs âš¡
https://langchain-ai.github.io/langgraphjs/
MIT License
619 stars 95 forks source link

Abort signal doesn't work #319

Open Pckool opened 2 months ago

Pckool commented 2 months ago

When I try using an abort signal with my graph, it doesn't work; It calls the graph but then never het's past the start node. 🥲 I try adding the signal to the config after compiling my graph; I'm not sure if that makes a difference here.

jacoblee93 commented 2 months ago

Hey @Pckool, that shouldn't make a difference!

What version of @langchain/langgraph and @langchain/core are you using? Signal was added to core fairly recently

Perhaps this is helpful?

https://langchain-ai.github.io/langgraphjs/how-tos/manage-ecosystem-dependencies/

jacoblee93 commented 2 months ago

If you've done the above, can you also share some code so we can try to repro?

Pckool commented 2 months ago

Hey so it also happens when we only have timeout so I think there's a problem with the logic on the executeTasks fn.

Here are my versions and we already have "0.2.24" on resolutions 🫠

"@langchain/core": "0.2.24",
"@langchain/langgraph": "0.0.34",

A bit difficult to share code as it's has a bunch of internal stuff mixed in but lemme see if I can whip something up

Pckool commented 2 months ago

Here's a gutted version where the issue still persists:

import { BaseMessage, AIMessage } from '@langchain/core/messages';
import { START, END, StateGraph } from '@langchain/langgraph';
import { BookingManagerState } from '../__defs__';
import { invokeBookingManagerAgent } from '../agents/internal/booking-manager';

const stateChannels = {
  messages: {
    reducer: (existing: BaseMessage[] = [], incoming: BaseMessage[]) => existing.concat(incoming) as BaseMessage[],
    default: () => [] as BaseMessage[],
  },
  booking_status: {
    reducer: (_, input) => input,
    default: () => '',
  },
};

export const createBookingManagerAgent = async (payload: BookingManagerProps) => {
  const { aclContext, wanderIds, wanderContext } = payload;

  const graph = new StateGraph<BookingManagerState>({
    channels: stateChannels,
  })
    .addNode('booking_manager', async ({ messages }) => {
      const result = await invokeBookingManagerAgent({
        messages,
        aclContext,
        wanderIds,
        wanderContext,
      });
      return {
        messages: result.messages.map(m => 
          m._getType() === 'ai' ? new AIMessage({ ...m, name: 'booking_manager' }) : m
        ),
        booking_status: result.booking_status || 'CONTINUE',
      };
    })
    .addEdge(START, 'booking_manager')
    .addConditionalEdges(
      'booking_manager',
      (state) => state.booking_status === 'CONTINUE' ? 'booking_manager' : END
    );

  return graph;
};

// Usage example with abort signal
export async function streamBookingManager(input: BookingManagerInput) {
  const graph = await createBookingManagerAgent(input);
  const abortController = new AbortController();

  setupAbortListener(input.conversationId, abortController);

  const stream = await graph.stream({
    messages: input.messages,
  }, {
    signal: abortController.signal,
  });

  return stream;
}

function setupAbortListener(conversationId: string, abortController: AbortController) {
  logic.stream.listenToAbortChannel(conversationId, () => {
    abortController.abort('New stream started for this conversation. Aborting this one.');
    console.warn('Aborted stream due to new stream starting', { conversationId });
  });
}
Pckool commented 2 months ago

I think part of the issue is the stream method? It would probably work with invoke but I need the all messages generated from the model called in the node

Pckool commented 2 months ago

Hey! Any movement on this? It still is not working 😅

jacoblee93 commented 2 months ago

Hey, really sorry will dig in as soon as I can!