langchain-ai / langgraphjs

Build resilient language agents as graphs.
https://langchain-ai.github.io/langgraphjs/
MIT License
692 stars 111 forks source link

Using langgraph with Sockets.io #293

Open Anand-Keshavan opened 3 months ago

Anand-Keshavan commented 3 months ago

Hi,

I need to run a multi-agent system as a server that communicates ( sends all the messages and gets human input) with a client. How does one do this? are there any examples?

objectiveSee commented 2 months ago

You should read the documentation for socket.io and get comfortable with sockets before connecting that to a graph. You have a few options such as doing a human in the loop method.

Anand-Keshavan commented 2 months ago

Hi Danny,

I have been using socket.io with autogen without any issues . Regarding human in th the loop, most of the examples are about taking input from a terminal. Real world apps don;t work like that-- i need to get input from web page/ or a remote app and I need the ability not onl;y to get the input, but also to execute functions ( tools) in the client. If you have any examples on how such things can be done, please let me know.

Cheers

A

On Thu, Sep 12, 2024 at 10:21 AM Danny Ricciotti @.***> wrote:

You should read the documentation for socket.io and get comfortable with sockets before connecting that to a graph. You have a few options such as doing a human in the loop method.

— Reply to this email directly, view it on GitHub https://github.com/langchain-ai/langgraphjs/issues/293#issuecomment-2345267499, or unsubscribe https://github.com/notifications/unsubscribe-auth/A4S3B26O5ICEIPS5FS3MRBDZWEMUVAVCNFSM6AAAAABL7QHSF2VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDGNBVGI3DONBZHE . You are receiving this because you authored the thread.Message ID: @.***>

TranThienTrong commented 1 month ago

Hi, I'm also implementing socket.io with langgraph, I will post my strategy and implementation after finish :)

Anand-Keshavan commented 1 month ago

Thanks a lot

On Tue, Oct 1, 2024 at 10:47 AM Trần Thiên Trọng @.***> wrote:

Hi, I'm also implementing socket.io with langgraph, I will post my strategy and implementation after finish :)

— Reply to this email directly, view it on GitHub https://github.com/langchain-ai/langgraphjs/issues/293#issuecomment-2384808592, or unsubscribe https://github.com/notifications/unsubscribe-auth/A4S3B27GBHF4SYWO3TT7YE3ZZIV4ZAVCNFSM6AAAAABL7QHSF2VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDGOBUHAYDQNJZGI . You are receiving this because you authored the thread.Message ID: @.***>

l4b4r4b4b4 commented 1 month ago

Would be awesome if you keep the community posted on your progress ;)

TranThienTrong commented 1 month ago

Hi all, hope this answer your question, sorry for not having more line-by-line detail, but you guy are smart, can figuring out :)

import os
from contextlib import asynccontextmanager
from typing import TypedDict
from dotenv import load_dotenv
from langchain_core.tools import tool
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.constants import START, END
from langgraph.graph import StateGraph
import socketio
import uvicorn
from fastapi import FastAPI

class MemoryManager:
    def __init__(self, db_uri):
        self.db_uri = db_uri
        self.memory = None
        self.memory_context = None

    def open(self):
        if self.memory is None:
            self.memory_context = PostgresSaver.from_conn_string(self.db_uri)
            self.memory = self.memory_context.__enter__()  # Manually enter the context
            self.memory.setup()  # Now you can call setup on the memory object
        return self.memory

    def close(self):
        if self.memory is not None:
            self.memory_context.__exit__(None, None, None)  # Manually exit the context
            self.memory = None
            self.memory_context = None

class CustomState(TypedDict):
    input: str
    user_feedback: str

def step_1(state: CustomState):
    print("---- STEP 1 ----")

def human_feedback(state: CustomState):
    print("---- HUMAN FEEDBACK ----")

def step_3(state: CustomState):
    print("---- STEP 3 ----")

def step_4(state: CustomState):
    print("---- STEP 4 ----")

builder = StateGraph(CustomState)

builder.add_node("step_1", step_1)
builder.add_node("human_feedback", human_feedback)
builder.add_node("step_3", step_3)
builder.add_node("step_4", step_4)

builder.add_edge(START, "step_1")
builder.add_edge("step_1", "human_feedback")
builder.add_edge("human_feedback", "step_3")
builder.add_edge("step_3", "step_4")
builder.add_edge("step_4", END)

DB_URI = "postgresql://trong:@localhost:5432/langgraph_memory?sslmode=disable"

class SocketServer:

    def __init__(self, app,graph, memory):
        self.graph = graph
        self.memory = memory
        self.app = app
        # Wrap the socket app with FastAPI
        self.sio = socketio.AsyncServer(cors_allowed_origins='*', async_mode='asgi', logger=True, engineio_logger=True)
        self.socket_app = socketio.ASGIApp(self.sio)
        self.app.mount("/", self.socket_app)

        self.register_events()

    def register_events(self):
        @self.sio.on("connect")
        async def connect(sid, env):
            print("New Client Connected to this Socket Server: " + str(sid))
            await self.sio.emit("send_msg", "Hello from Server")

        @self.sio.on("disconnect")
        async def disconnect(sid):
            print("Client Disconnected: " + str(sid))

        @self.sio.on('user_feedback')
        async def user_feedback(sid, data):
            print("EVENT: User Feedback")
            thread_id = data['thread_id']
            thread = {"configurable": {"thread_id": thread_id}}
            user_input = data['feedback']

            self.graph.update_state(thread, {"user_feedback": user_input}, as_node="human_feedback")

            saved_state = self.graph.get_state(thread)
            print("Current State")
            print(saved_state.next if bool(saved_state.values) else "No saved state")
            for event in self.graph.stream(None, thread, stream_mode="values"):
                print(event)

def create_socket_server(app, memory_manager, reload=False):
    if reload:
        print("Skipping server initialization on reload")
        return None  # Skip initialization during reload

    load_dotenv()
    thread = {"configurable": {"thread_id": "88911010"}}
    initial_input = {"input": "hello world"}

    memory = memory_manager.open()

    # Compile the graph
    graph = builder.compile(checkpointer=memory, interrupt_before=["human_feedback"],
                            interrupt_after=["human_feedback"])

    # Start graph streaming if necessary
    saved_state = graph.get_state(thread)
    print("Current State")
    print(saved_state.values if bool(saved_state.values) else "No saved state")

    if bool(saved_state.values):
        print("Resuming from saved state:")
        for event in graph.stream(saved_state.values, thread, stream_mode="values"):
            print(event)
    if not bool(saved_state.values):
        print("No saved state found. Starting from the beginning.")
        for event in graph.stream(initial_input, thread, stream_mode="values"):
            print(event)

    current_state = graph.get_state(thread)
    print("Current node after streaming:", current_state.next)

    # Pass the compiled graph and memory to the socket server class
    socket_server = SocketServer(app, graph, memory)
    return socket_server

# Initialize MemoryManager
memory_manager = MemoryManager(DB_URI)

# Create the socket server object inside lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Load the ML model
    memory_manager.open()

    # Only create socket server once, check if app is being reloaded
    reload = "SERVER_RELOADER" in os.environ
    socket_server = create_socket_server(app, memory_manager, reload=reload)

    if socket_server is not None:
        application = socket_server.app

    yield
    # Clean up the ML models and release the resources
    memory_manager.close()

app = FastAPI(lifespan=lifespan)
application = app  # Use the FastAPI app directly for uvicorn

if __name__ == "__main__":
    uvicorn.run("main:application", host="0.0.0.0", port=3500, lifespan="on", reload=True)

# Handle FastAPI startup and shutdown

Socket IO Client

const default_socket = io("http://127.0.0.1:3500");

default_socket.emit('user_feedback', {
    'thread_id': '88911010',
    'feedback': 'I'm feedback'
})
l4b4r4b4b4 commented 1 month ago

thx, however I was looking for something that works in JS ;)