langchain-ai / langgraph

Build resilient language agents as graphs.
https://langchain-ai.github.io/langgraph/
MIT License
5.92k stars 930 forks source link

Time between nodes semi-randomly very long on DataBricks cloud server #1266

Open vdhelm opened 1 month ago

vdhelm commented 1 month ago

Checked other resources

Example Code

import time
from typing import TypedDict, Optional

from functools import wraps

from langgraph.graph import StateGraph, END

class GraphState(TypedDict):
    query: Optional[str] = None
    temp_response: Optional[str] = None
    final_response: Optional[str] = None

start_time = time.time()

def node_timing(f):
    @wraps(f)
    def wrap(*args, **kw):
        ts = time.time()

        result = f(*args, **kw)

        te = time.time()

        print(f"in {ts - start_time:.3f} out {te - start_time:.3f}")

        return result

    return wrap

class Flow:

    @node_timing
    def start_node(self, state):
        time.sleep(1)
        return {"temp_response": "yes"}

    @node_timing
    def node_1(self, state):
        time.sleep(1)
        return {"temp_response": "no"}

    @node_timing
    def node_2(self, state):
        time.sleep(1)
        return {"temp_response": "no"}

    @node_timing
    def node_3(self, state):
        time.sleep(1)
        return {"temp_response": "no"}

    @node_timing
    def node_4(self, state):
        time.sleep(1)
        return {"temp_response": "no"}

    @node_timing
    def end_node(self, state):
        time.sleep(1)
        return {"final_response": "the graph is now finished"}

    @node_timing
    def get_workflow(self):
        workflow = StateGraph(GraphState)

        workflow.add_node("start_node", self.start_node)
        workflow.add_node("node_1", self.start_node)
        workflow.add_node("node_2", self.start_node)
        workflow.add_node("node_3", self.start_node)
        workflow.add_node("node_4", self.start_node)
        workflow.add_node("end_node", self.start_node)

        workflow.set_entry_point("start_node")
        workflow.add_edge("start_node", "node_1")
        workflow.add_edge("node_1", "node_2")
        workflow.add_edge("node_2", "node_3")
        workflow.add_edge("node_3", "node_4")
        workflow.add_edge("node_4", "end_node")
        workflow.add_edge("end_node", END)

        self.app_workflow = workflow.compile()

    @node_timing
    def get_answer(self, query):

        output = self.app_workflow.invoke({"query": query})

        return output

f = Flow()
f.get_workflow()

f.get_answer("why")

Error Message and Stack Trace (if applicable)

No response

Description

This is a tricky issue to reproduce because it only happens on certain systems and greatly depends on how the python process is started.

In the example provided, I look at the time between one node finishing, and the next node starting. On my local machine, and inside a DataBricks notebook, this is between 5 and 50 milliseconds, but when I run it on our DataBricks server directly, this is between 400 and 700 milliseconds. In a single run it is very consistently the same time, but it differs between runs.

This delay happens between each node, so for a flow with a useful number of nodes, it adds up to a 10 seconds delay which is prohibitively slow in a chat application.

I am not able to root-cause the issue because I do not understand what LangGraph is doing between nodes, especially for such a simple flow as you see in this example. I do see a lot of asynchronous functions so I suspect there is some kind of race condition or wait that takes this time to resolve.

All timing experiments were done on a dedicated cloud compute cluster with no other processes running.

System Info

DataBricks Standard_DS3_v2 computer cluster (14 GB Memory, 4 cores) DatabRicks runtime version: 14.3 LTS ML (Apache Spark 3.5.0, Scala 2.12) OS: Linux #76-20-01-1-Ubuntu SMP Python: 3.10.12 langchain_core: 0.2.28 langchain: 0.0.348 langsmith: 0.1.98 langgraph: 0.2.0

wangm23456 commented 3 weeks ago

+1