langflow-ai / langflow

Langflow is a low-code app builder for RAG and multi-agent AI applications. It’s Python-based and agnostic to any model, API, or database.
http://www.langflow.org
MIT License
28.93k stars 3.7k forks source link

Issue with tracing service reinitializing the tracers for nested workflows #3831

Open rogeriochaves opened 2 days ago

rogeriochaves commented 2 days ago

Bug Description

The tracing/service.py service is a singleton, responsible for starting the tracers (langwatch, langsmith, langfuse) whenever a run happens. The problem is, when using a nested workflow, for example with "Flow as a Tool", this service gets reinitialized with a different run id, overriding the last one. As a result, the parent run is stop being tracked (unless for langchain callbacks that go on), and children runs each become new traces, with no run id connection to their parents

Moreover, when two flows are run in parallel, the last one to start sets the runId, so components executed in the first one might also end up being tracked there, as visible on the screenshot on this PR: https://github.com/langflow-ai/langflow/pull/3830

As a solution, the tracing service should probably either stop being a singleton, at least not a global one (could be per run thread) and keep the parent reference OR it could stay a singleton but components during execution would inform their run id during tracing service calls and the singleton would keep the relation of parent and children runs to be passed for the tracing services to correct assemble the tree of calls, either on the same trace or separate tracers per child

Reproduction

  1. Create a simple flow, just input/output, maybe an LLM call in the middle
  2. Create another flow and use Flow as a Tool for the first one, tell the tool calling agent to call the tool twice, exactly and only twice
  3. You can add print statements on the def initialize_tracers, def _start_traces, def _end_traces and def _end_all_traces for the self.run_id to see that it always changes to the last one to be executed

Expected behavior

_start_traces should know which run_id it is starting a trace for, so they don't get all tangled up together

Who can help?

No response

Operating System

Mac OS

Langflow Version

1.0.18

Python Version

3.11

Screenshot

No response

Flow File

20240917120814097_langflow_flows.zip

dosubot[bot] commented 2 days ago

Hey @rogeriochaves! I'm here to help you with any bugs, questions, or contributions you have for the Langflow framework. Let's tackle this issue together and get it resolved!

To resolve the issue with the tracing/service.py singleton reinitializing tracers for nested workflows, causing loss of tracking for parent runs and incorrect associations between parent and child runs, you need to ensure that tracers are not reinitialized for nested workflows. Instead, maintain the existing tracer context for nested workflows.

Here is a possible solution:

  1. Modify the initialize_tracers method to check if tracers are already initialized.
  2. Ensure that nested workflows use the existing tracer context.
class TracingService(Service):
    # ... existing code ...

    async def initialize_tracers(self):
        if self._tracers:
            logger.debug("Tracers already initialized, skipping reinitialization.")
            return
        try:
            await self.start()
            self._initialize_langsmith_tracer()
            self._initialize_langwatch_tracer()
            self._initialize_langfuse_tracer()
        except Exception as e:
            logger.debug(f"Error initializing tracers: {e}")

    # ... existing code ...

    @asynccontextmanager
    async def trace_context(
        self,
        component: "Component",
        trace_name: str,
        inputs: Dict[str, Any],
        metadata: Optional[Dict[str, Any]] = None,
    ):
        trace_id = trace_name
        if component._vertex:
            trace_id = component._vertex.id
        trace_type = component.trace_type
        if trace_name in self._children:
            logger.debug(f"Using existing tracer context for {trace_name}")
        else:
            self._start_traces(
                trace_id,
                trace_name,
                trace_type,
                self._cleanup_inputs(inputs),
                metadata,
                component._vertex,
            )
        try:
            yield self
        except Exception as e:
            self._end_traces(trace_id, trace_name, e)
            raise e
        finally:
            asyncio.create_task(await asyncio.to_thread(self._end_and_reset, trace_id, trace_name, None))

    async def _end_and_reset(self, trace_id: str, trace_name: str, error: Exception | None = None):
        self._end_traces(trace_id, trace_name, error)
        self._reset_io()

This modification ensures that tracers are not reinitialized for nested workflows, preserving the tracking for parent runs and maintaining correct associations between parent and child runs [1][2].

To continue talking to Dosu, mention @dosu.