run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
36.8k stars 5.28k forks source link

[Bug]: Tracing With Langfuse #14591

Closed anantgupta129 closed 1 month ago

anantgupta129 commented 4 months ago

Bug Description

When making APi calls langfuse tracing callback in only working when engine / index are initialized again with api call

working in this case:

def stream_generator(generator: Generator) -> Generator[str, Any, None]:
    yield from (json.dumps({"type": "content_block", "text": text}) for text in generator)

@router.post("/chat")
async def chat(request: ChatRequestBody):
    langfuse_callback_handler.set_trace_params(tags=tags)

    client = qdrant_client.QdrantClient(...)
    vector_store = QdrantVectorStore(client=client, ...)
    index = VectorStoreIndex.from_vector_store(vector_store=vector_store)

    engine = index.as_chat_engine(
        chat_mode="condense_plus_context",
        streaming=True,
        system_prompt=SYSTEM_PROMPT,
    )
    response_stream = engine.stream_chat(request.message, chat_history=request.history)
    return StreamingResponse(stream_generator(response_stream), media_type="application/x-ndjson")

Version

0.10.47

Steps to Reproduce

not working


client = qdrant_client.QdrantClient(...)
vector_store = QdrantVectorStore(client=client, ...)
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)

engine = index.as_chat_engine(
    chat_mode="condense_plus_context",
    streaming=True,
    system_prompt=SYSTEM_PROMPT,
)

def stream_generator(generator: Generator) -> Generator[str, Any, None]:
    yield from (json.dumps({"type": "content_block", "text": text}) for text in generator)

@router.post("/chat")
async def chat(request: ChatRequestBody):
    langfuse_callback_handler.set_trace_params(tags=tags)

    response_stream = engine.stream_chat(request.message, chat_history=request.history)
    return StreamingResponse(stream_generator(response_stream), media_type="application/x-ndjson")

Relevant Logs/Tracbacks

No response

dosubot[bot] commented 4 months ago

The Langfuse tracing callback only works when the engine and index are re-initialized with each API call because the Langfuse SDKs queue and batch events in the background to reduce the number of network requests and improve overall performance. To ensure that all queued events are captured and sent to Langfuse servers, the callback handler needs to be flushed. Re-initializing the engine and index with each API call ensures that the callback handler is properly set up and flushed, capturing all necessary traces and metrics.

To address this issue without re-initializing the engine and index with each API call, you can manually flush the callback handler at the end of each request. Here is an example of how you can modify your code to include the flush operation:

def stream_generator(generator: Generator) -> Generator[str, Any, None]:
    yield from (json.dumps({"type": "content_block", "text": text}) for text in generator)

client = qdrant_client.QdrantClient(...)
vector_store = QdrantVectorStore(client=client, ...)
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)

engine = index.as_chat_engine(
    chat_mode="condense_plus_context",
    streaming=True,
    system_prompt=SYSTEM_PROMPT,
)

@router.post("/chat")
async def chat(request: ChatRequestBody):
    langfuse_callback_handler.set_trace_params(tags=tags)

    response_stream = engine.stream_chat(request.message, chat_history=request.history)

    # Ensure all events are flushed to Langfuse
    langfuse_callback_handler.flush()

    return StreamingResponse(stream_generator(response_stream), media_type="application/x-ndjson")

By adding langfuse_callback_handler.flush() at the end of the request, you ensure that all queued events are sent to Langfuse servers without needing to re-initialize the engine and index with each API call [1].

To continue talking to Dosu, mention @dosu.

anantgupta129 commented 4 months ago

@dosu i have tried langfuse_callback_handler.flush() its not working

dosubot[bot] commented 4 months ago

To resolve the issue with the Langfuse tracing callback not functioning as expected, even after using langfuse_callback_handler.flush(), ensure that you have correctly set up and registered the LangfuseCallbackHandler. Here is a checklist based on the provided documentation:

  1. Install Required Packages:

    %pip install llama-index llama-index-callbacks-langfuse
  2. Configure Environment Variables:

    import os
    
    # Langfuse
    os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-..."
    os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-..."
    os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"  # 🇪🇺 EU region, 🇺🇸 US region: "https://us.cloud.langfuse.com"
    
    # OpenAI
    os.environ["OPENAI_API_KEY"] = "sk-..."
  3. Register the Langfuse Callback Handler:

    • Option 1: Set global LlamaIndex handler

      from llama_index.core import global_handler, set_global_handler
      
      set_global_handler("langfuse")
      langfuse_callback_handler = global_handler
    • Option 2: Use Langfuse callback directly

      from llama_index.core import Settings
      from llama_index.core.callbacks import CallbackManager
      from langfuse.llama_index import LlamaIndexCallbackHandler
      
      langfuse_callback_handler = LlamaIndexCallbackHandler()
      Settings.callback_manager = CallbackManager([langfuse_callback_handler])
  4. Flush Events to Langfuse:

    # ... your LlamaIndex calls here ...
    
    langfuse_callback_handler.flush()

Ensure that all these steps are correctly followed. If the issue persists, double-check your API keys and network connectivity to Langfuse servers [1].

To continue talking to Dosu, mention @dosu.

logan-markewich commented 4 months ago

langfuse_callback_handler.set_trace_params(tags=tags) -- this cannot change the callback that is already attached to existing objects in memory

The reason why the second flow doesn't work is you have objects already in memory, and we cannot change the callback attached to them with updated params

anantgupta129 commented 4 months ago

@logan-markewich so for user cases like serving a chatbot for multiple users, and also different API utilizing the LLM in a project. How to manage the tracing of langfuse to filter them say by user id. What would be the best way

Secondly in another project i am using langfuse this for OpenAiAgnets where i can see the traces on the bases of different userid


def stream_generator(
    response: StreamingAgentChatResponse,
    profile_id: str,
    current_history: list[dict],
    chat_id: str | ObjectId,
    title: str | None = None,
    chat_history: list[dict] | None = None,
    exists: bool | None = False,
) -> Generator[str, None, None]:

    yield from (
        json.dumps({"type": "content_block", "text": text}) for text in response.response_gen
    )

    current_history.append({"role": "ASSISTANT", "content": response.response})
    chat_id = mongodb.persist_chat_store(
        profile_id, chat_id=chat_id, history=current_history, title=title, exists=exists
    )

    chat_history = chat_history + current_history if chat_history else current_history
    chat_history = [d.model_dump() if isinstance(d, Message) else d for d in chat_history]

    yield json.dumps({"type": "message_stop", "id": str(chat_id), "history": chat_history})

@router.post(
    "/chat",
    responses={
        "200": {
            "description": "Successful response will return a stream of JSON objects if stream true, else json response"
        },
        "500": {"model": ErrorResponse},
    },
)
async def chat(body: ChatRequest):
    try:
        exists = bool(body.chatId)
        title = ""
        if not body.chatId:
            response = llm.complete(
                f"For the following query: {body.query}, Generate a title for the chat heading, without any additional text"
            )
            title = response.text

        body.chatId = body.chatId or str(ObjectId())
        langfuse_callback_handler.set_trace_params(
            user_id=body.profileId, tags=[LANGFUSE_ENV_TAG], session_id=body.chatId
        )

        chat_history = load_history(body.history.copy()) if body.history else None
        current_history = [{"role": "USER", "content": body.query}]

        if body.stream:
            response = agent.stream_chat(body.query, chat_history=chat_history)
            return StreamingResponse(
                stream_generator(
                    response,
                    body.profileId,
                    chat_id=body.chatId,
                    current_history=current_history,
                    chat_history=body.history,
                    title=title,
                    exists=exists,
                ),
                media_type="application/x-ndjson",
            )
        else:
            response = agent.chat(body.query, chat_history=chat_history)

            current_history.append({"role": "ASSISTANT", "content": response.response})
            chat_id = mongodb.persist_chat_store(
                body.profileId,
                chat_id=body.chatId,
                history=current_history,
                title=title,
                exists=exists,
            )
            chat_history = chat_history + current_history if chat_history else current_history
            return {"response": response.response, "id": str(chat_id), "history": chat_history}

    except Exception as e:
        traceback.print_exc()
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
        ) from e

although here also i cannot see the function calling logs which i feel now is due to i have not redefining the agents with each api call

logan-markewich commented 4 months ago

we are working on the ux, but setting for multiple users isn't really possible unless you set everything up each time

Not sure about the second thing