PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.5k stars 1.52k forks source link

Missing logs in the web UI when task_runner=RayTaskRunner #14494

Open fyrestone opened 3 weeks ago

fyrestone commented 3 weeks ago

First check

Bug summary

The Prefect Ray tasks have empty logs in the web UI whenever log_prints is True or not.

image

image image

Reproduction

from prefect import flow, task
from prefect_ray import RayTaskRunner

@task(log_prints=True)
def bar():
    print("bar")

# @flow(log_prints=True)  # Works good
@flow(task_runner=RayTaskRunner("auto"), log_prints=True)  # Missing logs of bar.
def foo():
    print("foo")
    return bar.submit()

foo()

Error

No response

Versions (prefect version output)

(venv) ➜  prefect git:(main) ✗ prefect version
Version:             3.0.0rc10
API version:         0.8.4
Python version:      3.9.6
Git commit:          200cf212
Built:               Wed, Jul 3, 2024 1:39 PM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         server
Pydantic version:    2.7.4

Additional context

No response

fyrestone commented 3 weeks ago

After my investigation, I found that we need to flush the logs when the Prefect ray task is done. This flush can be executed in the prefect_ray or Prefect can provide an option for the task @task(flush_logs=True).

from prefect import flow, task, get_run_logger
from prefect_ray import RayTaskRunner

@task(log_prints=True)
def bar():
    print("bar")
    logger = get_run_logger()
    # Flush the logs.
    logger.logger.handlers[0].flush()

# @flow(log_prints=True)  # Works good
@flow(task_runner=RayTaskRunner("auto"), log_prints=True)  # Missing logs of bar.
def foo():
    print("foo")
    return bar.submit()

foo()

image

zhen0 commented 2 weeks ago

Thanks for the issue @fyrestone. I can reproduce and will add to our backlog. Thanks also for the workaround and example of how you expect this work.