langfuse / langfuse

🪢 Open source LLM engineering platform: LLM Observability, metrics, evals, prompt management, playground, datasets. Integrates with LlamaIndex, Langchain, OpenAI SDK, LiteLLM, and more. 🍊YC W23
https://langfuse.com/docs
Other
6.45k stars 617 forks source link

bug: Python SDK 100% CPU usage when running with ThreadPoolExecutor and process 100+ tasks #3061

Closed BrikerMan closed 2 months ago

BrikerMan commented 2 months ago

Describe the bug

Thanks for building this amazing tool. I had been loving it.

While I am using langchain to process lots of webpage in bulk, I run into hight CPU issue after task runs for few minutes, so far I find if the task is less than 100, it works fine. But when I have long context and 100+ task on each run, it reaches to 100% cpu even on my M2 Max Macbook.

PS: I have the same issue whether I am using ThreadPoolExecutor with sync invoke of langchain or using async for limited concurrency using ainvoke in langchain.

To reproduce

Here is my code to reproduce this issues. Please be aware need to change to data to something larget.

import concurrent
import os
from concurrent.futures import ThreadPoolExecutor

from langchain_openai import ChatOpenAI
from langfuse.callback import CallbackHandler
from loguru import logger

class SuperDuperAgent:
    def __init__(self, concurrency_limit: int = 5):
        self.concurrency_limit = concurrency_limit
        self.llm = ChatOpenAI(
            model="qwen/qwen-2-72b-instruct",
            openai_api_key=os.environ.get("OPENROUTER_API_KEY"),  # type: ignore
            base_url="https://openrouter.ai/api/v1/",
        )

    def summerize(self, title: str, content: str, debug: bool = False) -> str:
        callback = CallbackHandler(
            user_id="SuperDuperAgent",
            trace_name="super-duper-summarizer",
            metadata={"title": title},
            host=os.environ.get("LANGFUSE_HOST"),
            public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"),
            secret_key=os.environ.get("LANGFUSE_SECRET_KEY")
        )

        r = self.llm.invoke(
            [
                ("system", "You are the best summerizer in the world, get the bullet points from the article"),
                ("human", content),
            ],
            config={"callbacks": [callback]},
        )

        if debug:
            logger.info(f"Result: {r}")
        return r

    def bulk_run(self):
        webpages = ['2000', 'webpages', 'with', '2000+', 'tokens', 'each']

        worker = ThreadPoolExecutor(max_workers=self.concurrency_limit)
        futures = []
        for webpage in webpages:
            future = worker.submit(self.crawl_agent.crawl, url=webpage.url)
            futures.append(future)

        for future in concurrent.futures.as_completed(futures):
            try:
                res = future.result()
                if res:
                    logger.info(res)
            except Exception as e:
                logger.exception(e)

SDK and container versions

Current Version

langfuse 2.44.0 langchain 0.2.14 langchain-core 0.2.34 langchain-openai 0.1.22 langchain-text-splitters 0.2.2

Langfuse Server Docker. 2.71.0

But I had this issue for several past version, I tried to keep updating but so far not fixed the issue.

Additional information

Host info

                    'c.          brikerman@M2-MAX-MacBook.local
                 ,xNMM.          ------------------------------
               .OMMMMo           OS: macOS 14.5 23F79 arm64
               OMMM0,            Host: Mac14,6
     .;loddo:' loolloddol;.      Kernel: 23.5.0
   cKMMMMMMMMMMNWMMMMMMMMMM0:    Uptime: 2 days, 13 hours, 53 mins
 .KMMMMMMMMMMMMMMMMMMMMMMMWd.    Packages: 193 (brew)
 XMMMMMMMMMMMMMMMMMMMMMMMX.      Shell: zsh 5.9
;MMMMMMMMMMMMMMMMMMMMMMMM:       Resolution: 2560x1440, 2560x1440
:MMMMMMMMMMMMMMMMMMMMMMMM:       DE: Aqua
.MMMMMMMMMMMMMMMMMMMMMMMMX.      WM: Quartz Compositor
 kMMMMMMMMMMMMMMMMMMMMMMMMWd.    WM Theme: Blue (Light)
 .XMMMMMMMMMMMMMMMMMMMMMMMMMMk   Terminal: iTerm2
  .XMMMMMMMMMMMMMMMMMMMMMMMMK.   Terminal Font: CaskaydiaCoveNFM-SemiBold 17
    kMMMMMMMMMMMMMMMMMMMMMMd     CPU: Apple M2 Max
     ;KMMMMMMMWXXWMMMMMMMk.      GPU: Apple M2 Max
       .cooc,.    .,coo:.        Memory: 8621MiB / 98304MiB

Are you interested to contribute a fix for this bug?

Yes

marcklingen commented 2 months ago

thanks for sharing! If you create a new callbackhandler for every execution of summerize this will currently create a new langfuse sdk instance for every invocation of the function which in turn creates a new background thread to batch events to the langfuse api. I'd recommend to migrate to the langfuse decorator + langfuse_context.get_langchain_handler as this will only create a single langfuse client instance -> way more scalable. This is a current limitation of the langchain intgeration which would ideally only create a single client instance. However, this would be a breaking change for some users who rely on this behavior.

docs on this here, happy to help with this change in case this is unclear: https://langfuse.com/docs/integrations/langchain/tracing#interoperability

BrikerMan commented 2 months ago

Thanks, I updated my code according to your suggestion. I will test about 1-2 days and let you know the result.

marcklingen commented 2 months ago

Thanks, I updated my code according to your suggestion. I will test about 1-2 days and let you know the result.

Sounds great, closing the issue for now. please let me know in case this did not resolve it for you

BrikerMan commented 2 months ago

It is fixed, thanks for the help.

marcklingen commented 2 months ago

awesome, thanks for confirming!