langfuse / langfuse

🪢 Open source LLM engineering platform: Observability, metrics, evals, prompt management, playground, datasets. Integrates with LlamaIndex, Langchain, OpenAI SDK, LiteLLM, and more. 🍊YC W23
https://langfuse.com/docs
Other
4.97k stars 452 forks source link

bug: Using lf.flush() with decorators doesn't flush in-progress observations #2495

Open yaniv-aknin opened 4 weeks ago

yaniv-aknin commented 4 weeks ago

Describe the bug

When langfuse.flush() is called inside an observe()-ed function, it doesn't flush the updates made to this function's observations.

This means the app doesn't update until the observation is done (especially bad for very long running observations) and that if the process crashes then updates might be lost without a way to flush them.

To reproduce

If you run this program, the trace will only have the name COOLNAME after the 10s (when the program exits). If you kill the program (with SIGKILL, i.e., it doesn't get to clean up) then COOLNAME will be lost.

#!/usr/bin/env python3
from langfuse.decorators import observe, langfuse_context
import time
@observe()
def main():
    langfuse_context.update_current_observation(name="COOLNAME")
    langfuse_context.flush()
    time.sleep(10)
if __name__ == "__main__":
    main()

SDK and container versions

>>> langfuse.__version__
'2.36.2'
>>>

Additional information

No response

Are you interested to contribute a fix for this bug?

No

hassiebp commented 4 weeks ago

Hi @yaniv-aknin - thanks for your report! You are trying to flush an observation that has not been finalized yet, i.e. the decorated function has not returned yet. Decorators in Python 'wrap' the decorated function and thus rely on it to return / raise before processing the result.

Please move your langfuse_context.flush() call to after calling the main() function to flush observations correctly.

yaniv-aknin commented 4 weeks ago

Thanks Hassieb!

The problem is that I want to flush an unfinished observation. I have observations that can last for a long time, and I want the UI to show them (with updated metadata) as they are being processed.

Another issue is that my process might terminate uncleanly (before the observation finished), and then I'll get these orphaned observations which aren't associated with the right parent - this is even though I tried to associate them, but couldn't flush.

I could give up on the decorator UI and manually craft flushing (see here for an example), but I'd much prefer that langfuse gave me a way to flush mid-observation.

yaniv-aknin commented 4 weeks ago

One more thought -- from API design standpoint, I still think that x.update() ; x.flush() should flush the update, otherwise the API is surprisingly non Pythonic (and this is why I think it's a bug, not a feature request).

I don't think anything in Python's behaviour around decorators precludes you from flushing the update, it's more of an implementation choice on langfuse's part. I believe you have all the necessary information to flush even inside the decorated observation.

4t8dd commented 3 weeks ago

with in-progress observations, do you mean a live data stream presented in UI? Here there is a default setting for flush.

      flush_at: Max batch size that's sent to the API.                
      flush_interval: Max delay until a new batch is sent to the API. 

For observe it just uses the default settings here: LANGFUSE_FLUSH_AT = 15 LANFGFUSE_FLUSH_INTERVAL=0.5

These configs can not be customized when to use observe decorator. So one way to flush the trace in your exaple is to customize these settings, like set flush_at = 1. Then all your data will be flushed once it is done. But still not live data.

marcklingen commented 3 weeks ago

Discord post related to this issue by @Ricardonacif -> flushing of in-flight updates to an observation are relevant adn changing this behavior makes sense

hey guys! I'm integrating langfuse into our python app and I was able to make it work with the observers. THe issue I'm having is, when I call my instance method that does the call to openai directly as a python script, everything works fine. But when I call my instance inside my background job (I'm using python-rq), I don't get any of the metadata (neither the response from openai). I am calling langfuse_context.flush() in the end of my function but still. Any thoughts?

I enabled langfuse debug and apparently it is uploading correctly all the metadata, but those aren't showing up on the UI

the information gets there as I can see in the json, but the ui doesnt populate the name, user id and tags

  class GPTCompletionModel(CompletionModel):
      def __init__(self, model_type: ModelType, model_config: CompletionModelConfig):
          super().__init__(model_type, model_config)
          self._gpt_model_name = str(self._model_type)
          self._completer = openai_client.chat.completions
          self._encoding = tiktoken.encoding_for_model(self._gpt_model_name)

      @observe()
      def _generate(
          self,
          prompt: ChatMessage,
          system_message: ChatMessage,
          json_format: bool = False,
          stop_sequences: list[str] | None = None,
          completion_kwargs: CompletionModelKwargs | None = None,
          enrichments: dict[str, Any] | None = None,
      ) -> ChatMessage:

          if enrichments is not None:
              tags = [get_config().ENVIRONMENT]
              if "tags" in enrichments:
                  tags += cast(list[str], enrichments["tags"])
              langfuse_context.update_current_trace(
                  name=enrichments.get("name", None),
                  session_id=enrichments.get("session_id", None),
                  user_id=enrichments.get("user_id", None),
                  tags=tags,
              )
          ...
          completion = self._completer.create(
              model=self._gpt_model_name,
              messages=messages,
              response_format=response_format,
              stop=stop_sequences,
              temperature=0,
          )

          usage = dict(completion).get("usage")

          _logger.info(
              f"Generated response for model={completion.model}: used {usage.prompt_tokens} to generate"
              f" {usage.completion_tokens} ({usage.total_tokens} total)"
          )

          self._flush_langfuse_safely()
          return self._as_roadway_message(completion.choices[0].message)

      def _flush_langfuse_safely(self):
          try:
              langfuse_context.flush()
          except Exception as e:
              print("something went wrong here")
              print(e)
              _logger.exception(e)

image image image

Ricardonacif commented 3 weeks ago

Calling flush outside of that function definitely solved my issue. I believe this usecase is going to be somewhat frequent, so I suggest adding a parameter for the observe to flush instantly:

@observe(flush_right_after=True)
def main():
    langfuse_context.update_current_observation(name="COOLNAME")
    time.sleep(10)

For now I was able to enhance a decorator and it's working:


def flusher_observe(decorator):
    def decorator_wrapper(func):
        def wrapper(*args, **kwargs):
            result = decorator(func)(*args, **kwargs)
            try:
                langfuse_context.flush()
            except Exception as e:
                _logger.exception(e)
            return result

        return wrapper

    return decorator_wrapper
hassiebp commented 3 weeks ago

Hey @Ricardonacif - my recommendation would be for your to call langfuse_context.flush() prior to end of the background task to ensure all events are sent correctly as I assume the function execution is short lived.

What @yaniv-aknin is trying to achieve appears slightly different, that is to report updates to a longer-running function execution to langfuse and to avoid losing that event if the process is killed / shutdown ungracefully. I'll look into reworking the observe-decorator internals to allow for this 👍🏾