Arize-ai / openinference

OpenTelemetry Instrumentation for AI Observability
https://arize-ai.github.io/openinference/
Apache License 2.0
208 stars 34 forks source link

[bug] llama-index callback not able to retain multiple concurrent traces #248

Closed RogerHYang closed 7 months ago

RogerHYang commented 7 months ago

There are a few issues that are intertwined (one by us and two by llama-index).

  1. Our callback implementation can't handle multiple concurrent traces.
    • To do this we need to rely on trace_map to differentiate concurrent spans from separate traces.
  2. The trace_map from llama-index can't handle multiple concurrent traces.
    • It seems to be able to retain only one of traces when there are supposed to be multiple.
  3. The trace_map from llama-index is sometimes empty when end_trace is called.
    • This is a problem because we need to rely on trace_map to evict spans by the traces they belong to.

No. 2 can be shown by llama-index's debug handler:

llama_index.core.global_handler = LlamaDebugHandler(print_trace_on_end=True)

The output from the debug handler shows that all concurrent traces have identical timing. This is due to the fact that the trace_map variable in llama-index can only retain a single set of traces. This is a problem for us regarding issue No. 1 above because we have no way to differentiate concurrent spans other than relying on llama-index's trace_map input.

Screenshot 2024-03-01 at 11 24 33 AM

Below is code for testing async concurrent tracing.

import asyncio

import chromadb
from llama_index.core import SimpleDirectoryReader, StorageContext, VectorStoreIndex
from llama_index.vector_stores.chroma import ChromaVectorStore
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
from opentelemetry import trace as trace_api
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export import SimpleSpanProcessor

endpoint = "http://127.0.0.1:6006/v1/traces"
tracer_provider = trace_sdk.TracerProvider()
trace_api.set_tracer_provider(tracer_provider)
tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint)))

LlamaIndexInstrumentor().instrument()
# llama_index.core.global_handler = LlamaDebugHandler(print_trace_on_end=True)

chroma_client = chromadb.EphemeralClient()
chroma_collection = chroma_client.create_collection("essays")
documents = SimpleDirectoryReader("./data/paul_graham/").load_data()
vector_store = ChromaVectorStore(chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(documents, storage_context=storage_context)

async def main():
    tasks = [
        query_engine.aquery("What did the author do growing up?"),
        query_engine.aquery("Who is the author?"),
        query_engine.aquery("Where did the author go to college?"),
        query_engine.aquery("Where is the author's hometown?"),
    ]
    return await asyncio.gather(*tasks)

if __name__ == "__main__":
    query_engine = index.as_query_engine()
    asyncio.run(main())
dosubot[bot] commented 7 months ago

Based on the information you've provided, it seems like the issue lies in the inability of the trace_map and the llama-index callback to handle multiple concurrent traces. This is causing all concurrent traces to have identical timing, which is problematic for differentiating between concurrent spans.

To address this, you could modify the llama-index callback and the trace_map to handle multiple concurrent traces. Here's a general approach:

  1. Ensure Unique Trace Identifiers: Each trace should have a unique identifier (trace_id) that distinguishes it from other concurrent traces. This can be achieved by generating a unique ID at the start of each trace or by leveraging any inherent unique attributes of the trace (e.g., a combination of event_id and start_time).

  2. Accurate Timing: Modify the _EventData class to capture the precise start time for each event. This seems to be already in place with start_time and end_time, but ensure that these times are captured as close to the event's actual start and end as possible to avoid timing inaccuracies.

  3. Concurrent Trace Management: Use a data structure (e.g., a dictionary) keyed by trace_id to manage concurrent traces. This structure should store information about each trace, including its events, start times, end times, and any other relevant data. This approach allows for efficient retrieval and update of trace information based on the unique trace_id.

  4. Trace Completion: Modify the _finish_tracing function to correctly handle the conclusion of each trace based on its unique identifier. Ensure that when a trace is concluded, its end time is accurately captured and associated with the correct trace in the management structure.

  5. Update Callback Logic: In the callback logic where traces are processed (e.g., in the processing of payloads and setting of attributes), ensure that the correct trace is identified and updated based on its unique trace_id. This may involve passing the trace_id as an additional parameter to functions that need to access or modify trace information.

Here's a conceptual code snippet to illustrate handling multiple concurrent traces with unique identifiers and accurate timing:

from collections import defaultdict
import time

# Assuming _EventData is already defined as per the provided context

# A dictionary to manage concurrent traces, keyed by a unique trace identifier
concurrent_traces = defaultdict(lambda: {"events": [], "start_time": None, "end_time": None})

def start_trace(trace_id):
    """Start a new trace with a unique identifier."""
    concurrent_traces[trace_id]["start_time"] = time.time()  # Capture the precise start time

def end_trace(trace_id):
    """End a trace and capture its end time."""
    concurrent_traces[trace_id]["end_time"] = time.time()  # Capture the precise end time

def add_event_to_trace(trace_id, event):
    """Add an event to a specific trace."""
    concurrent_traces[trace_id]["events"].append(event)

# Example usage
trace_id = "unique_trace_id_123"  # This should be generated or determined based on the trace's unique attributes
start_trace(trace_id)
# Perform trace operations...
add_event_to_trace(trace_id, event_data)
end_trace(trace_id)

This is a simplified example to illustrate the concept. In your implementation, you'll need to integrate these concepts with the existing _EventData class and callback logic, ensuring that each trace is correctly managed and concluded with accurate timing information.

Regarding the generation of trace identifiers, a common approach is to use a combination of high-resolution timestamps, unique identifiers (such as UUIDs), and possibly other contextual information (like service names or instance identifiers) to generate trace IDs. This method ensures that each trace ID is unique, even across distributed systems and concurrent executions.

Here's a conceptual example of how you might modify or extend the tracing functionality to include unique trace identifier generation, using UUIDs for simplicity:

import uuid
from opentelemetry import trace as trace_api

def generate_unique_trace_id() -> str:
    """
    Generates a unique trace identifier using UUID.
    """
    return str(uuid.uuid4())

def start_trace_with_unique_id() -> trace_api.Span:
    """
    Starts a new trace with a unique trace identifier.
    """
    trace_id = generate_unique_trace_id()
    # Assuming there's a way to set the trace ID in the OpenInference tracing context.
    # This is a conceptual example; actual implementation may vary.
    span = trace_api.get_current_span()
    span.set_attribute("trace_id", trace_id)
    return span

This example uses the uuid4() function from the uuid module to generate a unique identifier for each trace. This ID is then set as an attribute of the current span, assuming there's a mechanism to do so in the actual tracing context used by OpenInference. Note that the actual implementation details would depend on how OpenInference integrates with the OpenTelemetry SDK or any other tracing framework it uses.

Unfortunately, without more information about the trace_map and its implementation, it's difficult to provide a more specific solution. If you could provide more details about how trace_map is currently implemented and used, it would be easier to suggest a more targeted solution.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.