traceloop / openllmetry

Open-source observability for your LLM application, based on OpenTelemetry
https://www.traceloop.com/openllmetry
Apache License 2.0
1.83k stars 151 forks source link

🐛 Bug Report: Spans being detached from Langchain parent traces #1541

Open maciejwie opened 2 months ago

maciejwie commented 2 months ago

Which component is this bug for?

Langchain Instrumentation

📜 Description

Calls to functions instrumented by other libraries get detached from the parent trace. This happens for example in Langchain chains which call a retriever which is instrumented by opentelemetry.instrumentation.sqlalchemy. The nominally child spans are created but are independent of each other and of the actual parent.

Occurs for both sync and async calls.

👟 Reproduction steps

This code reproduces the issue given a local postgres server:

import os

from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_postgres import PGVector
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from traceloop.sdk import Traceloop

Traceloop.init(
    exporter=ConsoleSpanExporter(),
)

PROMPT_TEMPLATE = """
You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question.

Question: {question} 

Context: {context} 

Answer:
"""

qa_prompt = PromptTemplate.from_template(PROMPT_TEMPLATE)

llm = AzureChatOpenAI()

embeddings = OpenAIEmbeddings()

vectorstore = PGVector(
    connection=PGVector.connection_string_from_db_params(
        driver="psycopg",
        host="localhost",
        port=5432,
        database="vectors",
        user="postgres",
        password="localpassword"
    ),
    embeddings=embeddings,
)

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

qa_chain = (
    {
        "context": vectorstore.as_retriever() | format_docs,
        "question": RunnablePassthrough(),
    }
    | qa_prompt | llm | StrOutputParser()
)

qa_chain.invoke("What are autonomous agents?")

👍 Expected behavior

All spans in the chain should have the same trace_id and appropriate parent_id.

👎 Actual Behavior with Screenshots

Example:

{
    "name": "RunnableAssign<filter>.langchain.task",
    "context": {
        "trace_id": "**0x2b1a7979f5725daf1a624b67a66c0d65**",
        "span_id": "0xff0ed485230f3387",
        "trace_state": "[]"
    },
    "kind": "SpanKind.INTERNAL",
    "parent_id": "**0x8e9d707a6ced4a3e**",
    "start_time": "2024-07-11T16:54:46.031626Z",
    "end_time": "2024-07-11T16:54:46.146668Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "traceloop.span.kind": "task",
        "traceloop.entity.name": "RunnableAssign<filter>.langchain.task"
    },
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "service.name": "dev"
        },
        "schema_url": ""
    }
}
{
    "name": "connect",
    "context": {
        "trace_id": "**0x8e897f889032d34c89fab768696dda50**",
        "span_id": "0x16564f54d9e75f4a",
        "trace_state": "[]"
    },
    "kind": "SpanKind.CLIENT",
    "parent_id": **null**,
    "start_time": "2024-07-11T16:54:46.158278Z",
    "end_time": "2024-07-11T16:54:46.178849Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "net.peer.name": "localhost",
        "net.peer.port": 5432,
        "db.name": "postgres",
        "db.user": "postgres",
        "db.system": "postgresql"
    },
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "service.name": "dev"
        },
        "schema_url": ""
    }
}
{
    "name": "connect",
    "context": {
        "trace_id": "**0x81bd9194f61397cbe0bd534c37f0ea35**",
        "span_id": "0x5b67b8e9ffad9bf1",
        "trace_state": "[]"
    },
    "kind": "SpanKind.CLIENT",
    "parent_id": **null**,
    "start_time": "2024-07-11T16:54:46.182237Z",
    "end_time": "2024-07-11T16:54:46.182272Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "net.peer.name": "localhost",
        "net.peer.port": 5432,
        "db.name": "postgres",
        "db.user": "postgres",
        "db.system": "postgresql"
    },
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "service.name": "dev"
        },
        "schema_url": ""
    }
}
{
    "name": "connect",
    "context": {
        "trace_id": "**0xee2ee041c775319c24a37d968f4afbf2**",
        "span_id": "0xdbc3250f693b6ec2",
        "trace_state": "[]"
    },
    "kind": "SpanKind.CLIENT",
    "parent_id": **null**,
    "start_time": "2024-07-11T16:54:46.189804Z",
    "end_time": "2024-07-11T16:54:46.189848Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "net.peer.name": "localhost",
        "net.peer.port": 5432,
        "db.name": "postgres",
        "db.user": "postgres",
        "db.system": "postgresql"
    },
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "service.name": "dev"
        },
        "schema_url": ""
    }
}
{
    "name": "connect",
    "context": {
        "trace_id": "**0x175f1f1a4b4a24a093615e2d6dc9be0c**",
        "span_id": "0xc96e709b49c7a085",
        "trace_state": "[]"
    },
    "kind": "SpanKind.CLIENT",
    "parent_id": **null**,
    "start_time": "2024-07-11T16:54:47.339822Z",
    "end_time": "2024-07-11T16:54:47.340088Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "net.peer.name": "localhost",
        "net.peer.port": 5432,
        "db.name": "postgres",
        "db.user": "postgres",
        "db.system": "postgresql"
    },
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "service.name": "dev"
        },
        "schema_url": ""
    }
}
{
    "name": "RunnableSequence.langchain.task",
    "context": {
        "trace_id": "**0x2b1a7979f5725daf1a624b67a66c0d65**",
        "span_id": "0x8e9d707a6ced4a3e",
        "trace_state": "[]"
    },
    "kind": "SpanKind.INTERNAL",
    "parent_id": "**0x73b5335f38792980**",
    "start_time": "2024-07-11T16:54:46.027813Z",
    "end_time": "2024-07-11T16:54:47.468602Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "traceloop.span.kind": "task",
        "traceloop.entity.name": "RunnableSequence.langchain.task"
    },
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "service.name": "dev"
        },
        "schema_url": ""
    }
}

🤖 Python Version

No response

📃 Provide any additional context for the Bug.

Present in 0.25.3.

👀 Have you spent some time to check if this bug has been raised before?

Are you willing to submit PR?

None

nirga commented 2 months ago

Thanks @maciejwie! this is happening, unfortunately, due to the fact that we're instrumenting spans in callbacks (since 0.25.0) instead of patching LangChain methods (which was highly unstable since LangChain kept changing their method signatures). The downside of using callbacks, is that this doesn't work well with otel's context since the actual calls within LangChain are running on a different context. We're looking for a solution for this and will update the issue here.

nirga commented 1 month ago

Hey @maciejwie, I tried reproducing this (see #1617) but it seems all connect spans are in the same trace with the langchain ones. Can you verify which instrumentation version are you using?

maciejwie commented 2 weeks ago

Hi @nirga, sorry to re-open this but this issue still occurs with 0.28.1 since I think there was an error in #1617. I've left a comment there.

I'm able to reproduce the issue with the following:

(somewhat abridged) requirements.txt:

-i https://pypi.org/simple
langchain==0.2.14; python_version < '4.0' and python_full_version >= '3.8.1'
langchain-community==0.2.12; python_version < '4.0' and python_full_version >= '3.8.1'
langchain-core==0.2.35; python_version < '4.0' and python_full_version >= '3.8.1'
langchain-openai==0.1.22; python_version < '4.0' and python_full_version >= '3.8.1'
langchain-postgres==0.0.9; python_full_version >= '3.8.1' and python_full_version < '4.0.0'
langchain-text-splitters==0.2.2; python_version < '4.0' and python_full_version >= '3.8.1'
langsmith==0.1.104; python_version < '4.0' and python_full_version >= '3.8.1'
openai==1.42.0; python_full_version >= '3.7.1'
opentelemetry-api==1.26.0; python_version >= '3.8'
opentelemetry-exporter-otlp==1.26.0; python_version >= '3.8'
opentelemetry-exporter-otlp-proto-common==1.26.0; python_version >= '3.8'
opentelemetry-exporter-otlp-proto-grpc==1.26.0; python_version >= '3.8'
opentelemetry-exporter-otlp-proto-http==1.26.0; python_version >= '3.8'
opentelemetry-instrumentation==0.47b0; python_version >= '3.8'
opentelemetry-instrumentation-alephalpha==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-anthropic==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-bedrock==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-chromadb==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-cohere==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-google-generativeai==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-haystack==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-lancedb==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-langchain==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-llamaindex==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-marqo==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-milvus==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-mistralai==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-ollama==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-openai==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-pinecone==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-qdrant==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-replicate==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-requests==0.47b0; python_version >= '3.8'
opentelemetry-instrumentation-sqlalchemy==0.47b0; python_version >= '3.8'
opentelemetry-instrumentation-threading==0.47b0; python_version >= '3.8'
opentelemetry-instrumentation-together==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-transformers==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-urllib3==0.47b0; python_version >= '3.8'
opentelemetry-instrumentation-vertexai==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-watsonx==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-instrumentation-weaviate==0.28.1; python_version >= '3.9' and python_version < '4'
opentelemetry-proto==1.26.0; python_version >= '3.8'
opentelemetry-sdk==1.26.0; python_version >= '3.8'
opentelemetry-semantic-conventions==0.47b0; python_version >= '3.8'
opentelemetry-semantic-conventions-ai==0.4.1; python_version >= '3.9' and python_version < '4'
opentelemetry-util-http==0.47b0; python_version >= '3.8'
pgvector==0.2.5; python_version >= '3.8'
psycopg[binary]==3.1.18; python_version >= '3.7'
psycopg-binary==3.1.18
psycopg-pool==3.2.2; python_version >= '3.8'
pyarrow==17.0.0; python_version >= '3.8'
sqlalchemy==2.0.32; python_version >= '3.7'
traceloop-sdk==0.28.1; python_version >= '3.9' and python_version < '4'

I don't have an OpenAI key so I can't confirm that it's different than AzureChatOpenAI but I don't expect an issue there. My test code was largely the same as yours:

def main():
    """Run the chain on the user input"""
    PROMPT_TEMPLATE = """
You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the
question.
Question: {question}
Context: {context}
Answer:
    """

    prompt = PromptTemplate.from_template(PROMPT_TEMPLATE)

    llm = AzureChatOpenAI(
        api_key=api_key,
        azure_ad_token_provider=token_provider,
        azure_endpoint=azure_llm_endpoint,
        azure_deployment=azure_llm_deployment,
    )

    embeddings = AzureOpenAIEmbeddings(
        api_key=api_key,
        azure_ad_token_provider=token_provider,
        azure_endpoint=azure_emb_endpoint,
        azure_deployment=azure_emb_deployment,
        dimensions=azure_emb_dims,
    )

    vectorstore = PGVector(
        collection_name=collection_name,
        connection=PGVector.connection_string_from_db_params(
            driver="psycopg",
            host=pgvector_host,
            port=int(pgvector_port),
            database=pgvector_database,
            user=pgvector_user,
            password=pgvector_password,
        ),
        embedding_length=azure_emb_dims,
        embeddings=embeddings,
    )

    def format_docs(docs):
        return "\n\n".join(doc.page_content for doc in docs)

    runnable = (
        {"context": vectorstore.as_retriever() | format_docs, "question": RunnablePassthrough()}
        | prompt
        | llm
        | StrOutputParser()
    )

    print(runnable.invoke("What are autonomous agents?"))

    spans = exporter.get_finished_spans()

    assert [
        "connect",
        "connect",
        "connect",
        "RunnablePassthrough.task",
        "connect",
        "format_docs.task",
        "RunnableSequence.task",
        "RunnableParallel<context,question>.task",
        "PromptTemplate.task",
        "AzureChatOpenAI.chat",
        "StrOutputParser.task",
        "RunnableSequence.workflow",
    ] == [span.name for span in spans]

    workflow_span = next(
        span for span in spans if span.name == "RunnableSequence.workflow"
    )
    runnable_parallel = next(
        span for span in spans if span.name == "RunnableParallel<context,question>.task"
    )
    connect_spans = [span for span in spans if span.name == "connect"]

    for span in connect_spans:
        assert workflow_span.context.trace_id == span.context.trace_id, f"Trace ID mismatch: {span.context.trace_id} != {workflow_span.context.trace_id}"
        assert runnable_parallel.context.trace_id == span.context.trace_id, f"Trace ID mismatch: {span.context.trace_id} != {runnable_parallel.context.trace_id}"
        print(f"{span.name}: {span.context.trace_id} {workflow_span.context.trace_id} {runnable_parallel.context.trace_id}")
nirga commented 2 weeks ago

Thanks @maciejwie for this detailed repro! I will take a look and add this to our tests! Sorry about that