langchain-ai / langgraph

Build resilient language agents as graphs.
https://langchain-ai.github.io/langgraph/
MIT License
6.31k stars 997 forks source link

Larger outputs cause streaming errors: peer closed connection without sending complete message body #2055

Closed antoremin closed 1 day ago

antoremin commented 2 weeks ago

Checked other resources

Example Code

Run notebook testing.ipynb from the repo below. 

To reproduce this on the demo langgraph, I 10x'd search volume in a single search iteration, and then this cell kicks off 10 threads, half of which end up breaking with the issue I'm describing, pretty much every time. 

from langgraph_sdk import get_client
import asyncio

# Initialize the client
client = get_client(url="http://localhost:8123")  # Update this URL as needed

async def run_search_agent_for_company(company):
    try:
        # Get default assistant
        assistants = await client.assistants.search()
        assistant = [a for a in assistants if not a["config"]][0]
        assistant_id = assistant["assistant_id"]
        print(f"Using assistant with ID: {assistant_id} for {company}")

        # Create a thread
        thread = await client.threads.create()
        thread_id = thread["thread_id"]
        print(f"Created thread with ID: {thread_id} for {company}")

        input_data = {
            "topic": topic.format(company=company),
            "extraction_schema": schema,
            "configurable": {
                "model_name": "anthropic/claude-3-5-sonnet-20240620",
                "max_loops": 50,
                "max_info_tool_calls": 10,
                "max_search_results": 200
            },
            "messages": [
                {
                    "role": "user", 
                    "content": f"Make a plan to complete the request for {company}"
                }
            ],
            'current_plan': f"Initial plan: Analyze the request for {company}",
            'iteration_number': 0,
        }

        # Execute a run on the thread
        async for chunk in client.runs.stream(
            thread_id,
            assistant_id,
            input=input_data,
            stream_mode="updates",
            config={
                "recursion_limit": 50
            }
        ):
            if chunk.data and chunk.event != "metadata":
                print(f"{company}: {chunk.data}")
                if 'error' in chunk.data:
                    print(f"Error encountered for {company}: {chunk.data['error']}")
                    print(f"Error message: {chunk.data['message']}")
                    break

        # Get final state
        final_state = await client.threads.get_state(thread_id)
        print(f"Final results for {company}:", final_state)
        return final_state

    except Exception as e:
        print(f"An error occurred for {company}: {str(e)}")

async def run_all_companies():
    companies = [
        "Mars", "Hershey's", "Nestlé", "Ferrero", "Mondelez International",
        "Lindt & Sprüngli", "Perfetti Van Melle", "Haribo", "Meiji", "Tootsie Roll Industries",
        "Ghirardelli Chocolate Company", "Godiva Chocolatier", "Cadbury", "Jelly Belly Candy Company",
        "Russell Stover Chocolates", "Storck", "Pladis", "Arcor", "Lotte Confectionery", "Ezaki Glico",
        "Fazer", "Cloetta", "Ritter Sport", "Tony's Chocolonely", "Guylian", "Cemoi", "Leaf Brands",
        "Ferrara Candy Company", "Just Born", "Bahlsen"
    ]

    tasks = []
    results = {}
    for company in companies:
        task = asyncio.create_task(run_search_agent_for_company(company))
        tasks.append((company, task))

    for company, task in tasks:
        result = await task
        results[company] = result

    return results

# Run the async function for all companies and get the results
search_results = await run_all_companies()

### Error Message and Stack Trace (if applicable)

```shell
Client side error: 

peer closed connection without sending complete message body (incomplete chunked read)

Langsmith error:

CancelledError()Traceback (most recent call last):

  File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/__init__.py", line 1502, in astream
    async for _ in runner.atick(

  File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/runner.py", line 130, in atick
    await arun_with_retry(t, retry_policy, stream=self.use_astream)

  File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/retry.py", line 102, in arun_with_retry
    await task.proc.ainvoke(task.input, config)

  File "/usr/local/lib/python3.11/site-packages/langgraph/utils/runnable.py", line 452, in ainvoke
    input = await asyncio.create_task(coro, context=context)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langgraph/prebuilt/tool_node.py", line 148, in ainvoke
    return await super().ainvoke(input, config, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langgraph/utils/runnable.py", line 235, in ainvoke
    ret = await asyncio.create_task(coro, context=context)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langgraph/prebuilt/tool_node.py", line 162, in _afunc
    outputs = await asyncio.gather(
              ^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langgraph/prebuilt/tool_node.py", line 192, in _arun_one
    tool_message: ToolMessage = await self.tools_by_name[call["name"]].ainvoke(
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langchain_core/tools/structured.py", line 58, in ainvoke
    return await super().ainvoke(input, config, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langchain_core/tools/base.py", line 490, in ainvoke
    return await self.arun(tool_input, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langchain_core/tools/base.py", line 767, in arun
    response = await asyncio.create_task(coro, context=context)  # type: ignore
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langchain_core/tools/structured.py", line 96, in _arun
    return await self.coroutine(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/deps/data-enrichment/src/enrichment_agent/tools.py", line 55, in extensive_search
    result = await search(query, config=config)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/deps/data-enrichment/src/enrichment_agent/tools.py", line 33, in search
    result = await wrapped.ainvoke({"query": query})
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langchain_core/tools/base.py", line 490, in ainvoke
    return await self.arun(tool_input, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langchain_core/tools/base.py", line 767, in arun
    response = await asyncio.create_task(coro, context=context)  # type: ignore
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langchain_community/tools/tavily_search/tool.py", line 178, in _arun
    raw_results = await self.api_wrapper.raw_results_async(
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langchain_community/utilities/tavily_search.py", line 149, in raw_results_async
    results_json_str = await fetch()
                       ^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langchain_community/utilities/tavily_search.py", line 142, in fetch
    async with session.post(f"{TAVILY_API_URL}/search", json=params) as res:

  File "/usr/local/lib/python3.11/site-packages/aiohttp/client.py", line 1357, in __aenter__
    self._resp: _RetType = await self._coro
                           ^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/aiohttp/client.py", line 688, in _request
    await resp.start(conn)

  File "/usr/local/lib/python3.11/site-packages/aiohttp/client_reqrep.py", line 1058, in start
    message, payload = await protocol.read()  # type: ignore[union-attr]
                       ^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/aiohttp/streams.py", line 643, in read
    await self._waiter

asyncio.exceptions.CancelledError


### Description

When I kick off several langgraph threads at once, each of which process larger volumes, I get "peer closed connection .." errors for some of the threads pretty much every time. 

I was able to reproduce this with a sample langgraph by adding some more search calls on the search step and kicking off several threads at once (which I do with my langgraph as well). 

Here's the repo: https://github.com/antoremin/data-enrichment

to reproduce, langgraph up the graph and run testing.ipynb locally 

https://github.com/antoremin/data-enrichment

### System Info

System Information
------------------
> OS:  Darwin
> OS Version:  Darwin Kernel Version 23.6.0: Mon Jul 29 21:14:46 PDT 2024; root:xnu-10063.141.2~1/RELEASE_ARM64_T6031
> Python Version:  3.9.11 (main, Dec 13 2023, 15:51:08) 
[Clang 14.0.3 (clang-1403.0.22.14.1)]

Langgraph's pyproject.toml: 

[project]
name = "enrichment-agent"
version = "0.0.1"
description = "An agent that populates and enriches custom schemas"
authors = [
    { name = "William Fu-Hinthorn", email = "13333726+hinthornw@users.noreply.github.com" },
]
readme = "README.md"
license = { text = "MIT" }
requires-python = ">=3.9"
dependencies = [
    "langgraph>=0.2.19",
    "langchain-openai>=0.1.22",
    "langchain-anthropic>=0.1.23",
    "langchain>=0.2.14",
    "langchain-fireworks>=0.1.7",
    "python-dotenv>=1.0.1",
    "langchain-community>=0.2.13",
    "transformers",
]

[project.optional-dependencies]
dev = ["mypy>=1.11.1", "ruff>=0.6.1", "pytest-asyncio"]

[build-system]
requires = ["setuptools>=73.0.0", "wheel"]
build-backend = "setuptools.build_meta"

[tool.setuptools]
packages = ["enrichment_agent"]
[tool.setuptools.package-dir]
"enrichment_agent" = "src/enrichment_agent"
"langgraph.templates.enrichment_agent" = "src/enrichment_agent"

[tool.setuptools.package-data]
"*" = ["py.typed"]

[tool.ruff]
lint.select = [
    "E",    # pycodestyle
    "F",    # pyflakes
    "I",    # isort
    "D",    # pydocstyle
    "D401", # First line should be in imperative mood
    "T201",
    "UP",
]
include = ["*.py", "*.pyi", "*.ipynb"]
lint.ignore = ["UP006", "UP007", "UP035", "D417", "E501"]
[tool.ruff.lint.per-file-ignores]
"tests/*" = ["D", "UP"]
"ntbk/*" = ["D", "UP", "T201"]
[tool.ruff.lint.pydocstyle]
convention = "google"
antoremin commented 2 weeks ago

Changing Tavily to Google search didn't solve the issue

Getting errors like this:

CancelledError()Traceback (most recent call last):

  File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/__init__.py", line 1502, in astream
    async for _ in runner.atick(

  File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/runner.py", line 130, in atick
    await arun_with_retry(t, retry_policy, stream=self.use_astream)

  File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/retry.py", line 102, in arun_with_retry
    await task.proc.ainvoke(task.input, config)

  File "/usr/local/lib/python3.11/site-packages/langgraph/utils/runnable.py", line 452, in ainvoke
    input = await asyncio.create_task(coro, context=context)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langgraph/utils/runnable.py", line 235, in ainvoke
    ret = await asyncio.create_task(coro, context=context)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/deps/data-enrichment/src/enrichment_agent/graph.py", line 62, in call_agent_model
    response = cast(AIMessage, await model.ainvoke(messages))
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 5349, in ainvoke
    return await self.bound.ainvoke(
           ^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langchain_core/language_models/chat_models.py", line 305, in ainvoke
    llm_result = await self.agenerate_prompt(
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langchain_core/language_models/chat_models.py", line 794, in agenerate_prompt
    return await self.agenerate(
           ^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langchain_core/language_models/chat_models.py", line 720, in agenerate
    results = await asyncio.gather(
              ^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langchain_core/language_models/chat_models.py", line 924, in _agenerate_with_cache
    result = await self._agenerate(
             ^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/langchain_anthropic/chat_models.py", line 805, in _agenerate
    data = await self._async_client.messages.create(**payload)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/anthropic/resources/messages.py", line 1811, in create
    return await self._post(
           ^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/anthropic/_base_client.py", line 1838, in post
    return await self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/anthropic/_base_client.py", line 1532, in request
    return await self._request(
           ^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/anthropic/_base_client.py", line 1552, in _request
    self._platform = await asyncify(get_platform)()
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/anthropic/_utils/_sync.py", line 69, in wrapper
    return await anyio.to_thread.run_sync(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2356, in run_sync_in_worker_thread
    await cls.checkpoint()

  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2264, in checkpoint
    await sleep(0)

  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 640, in sleep
    await __sleep0()

  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 634, in __sleep0
    yield

asyncio.exceptions.CancelledError
khushiDesai commented 1 week ago

Hi @antoremin, I am Khushi, a 4th year student at UofT CS. I’m working with my teammates @anushak18, @ashvini8, and @ssumaiyaahmed, who are also 4th year students at UofT CS. We would like to take the initiative to work on this issue and contribute to LangChain. We’re eager to help resolve the larger outputs causing streaming errors and share our findings.

vbarda commented 1 day ago

Closing since the issue is now resolved. Feel free to reopen if it reappears!