PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.45k stars 1.64k forks source link

Send all terminal output to Prefect logger #9797

Open j-tr opened 1 year ago

j-tr commented 1 year ago

First check

Prefect Version

2.x

Describe the current behavior

When using third-party modules as black boxes in prefect tasks, certain log messages from these modules might be only displayed on the terminal of the machine that is running the flow but not sent to the Prefect cloud. This is because modules might not only use python print statements (that could be logged via log_prints=True) but also write directly to stdout or even run external subprocesses that are not even written in python.

Currently, there is no convenient way to forward these log messages to the prefect cloud without custom wrapper functions or instrumenting the third-party libraries with prefect logging.

Describe the proposed behavior

1) A general switch like log_prints (e.g. log_console_out) that would send everything that is visible on the terminal of the machine to the prefect cloud.

or

2) A util wrapper function that can take any python function and arguments, capture the output and send it to the cloud.

Example Use

The following script contains a task that executes a function that doesn't use conventional print statements but instead calls a .sh script that directly prints to the terminal. I tried to come up with two different wrapper functions that attempt to catch this output and send it to the logger.

redirect_stdout replaces stdout and stderr with a class that writes to the prefect logger on write(). This solves the problem for functions that write to sys.stdout but not for all outputs.

redirect_fd replaces the file descriptor of stdout with a tempfile and reads the content afterward. This works but is not concurrency safe as file descriptors are shared between subprocesses.

from prefect import flow, task
from prefect import get_run_logger
from subprocess import run
from multiprocessing import Process, Queue
import sys
from tempfile import NamedTemporaryFile
import os
import io

def redirect_stdout(function, *args, **kwargs):

    def wrapper(function, return_value, *args, **kwargs):
        logger = get_run_logger()

        class StreamToLogger:
            def __init__(self, log_function):
                self.log_function = log_function

            def write(self, buf):
                for line in buf.rstrip().splitlines():
                    self.log_function(line.rstrip())

            def flush(self):
                pass

        sys.stdout = StreamToLogger(logger.info)
        sys.stderr = StreamToLogger(logger.warning)

        return_value.put(function(*args, **kwargs))

    return_value = Queue()
    process = Process(target=wrapper, args=(function, return_value, *args), kwargs=kwargs)
    process.start()
    process.join()
    return return_value.get()

def redirect_fd(function, *args, **kwargs):

    def wrapper(function, return_value, outfile, *args, **kwargs):
        original_stdout_fd = sys.stdout.fileno()
        sys.stdout.close()
        os.dup2(outfile.fileno(), original_stdout_fd)
        sys.stdout = io.TextIOWrapper(os.fdopen(original_stdout_fd, 'wb'))

        return_value.put(function(*args, **kwargs))

    return_value = Queue()
    with NamedTemporaryFile() as outfile:
        process = Process(target=wrapper, args=(function, return_value, outfile, *args), kwargs=kwargs)
        process.start()
        process.join()

        logger = get_run_logger()
        for line in open(outfile.name).readlines():
            logger.info(line.strip())
    return return_value.get()

def some_external_function_that_calls_a_subprocess():
    # this function would be imported from a third-party module in a real world scenario and is hence considered a black box
    print("hey hey i end up in the logger")
    # but the output from this script doesn't
    run(["./test.sh"])
    return "X"

@task
def test_output():
    # output from the subprocess call doesn't end up in the logger
    print(some_external_function_that_calls_a_subprocess())

    # redirecting sys.stdout also doesn't work because the subprocess is not python
    print(redirect_stdout(some_external_function_that_calls_a_subprocess))

    # changing the file descriptor for stdout to a tempfile and printing the content afterwards works
    # however, this doesn't solve the problem yet, because logs are only available after the function returns,
    # which is no solution for long-running commands
    # reading from the temp file in another thread concurrently and printing it could be a solution.
    # In general, we don't want to manipulate the file descriptor for all tasks but only for one specific function
    # This solution could have unwanted side effects on the output of other functions that run concurrently.
    print(redirect_fd(some_external_function_that_calls_a_subprocess))

@flow(log_prints=True)
def main() -> None:
    print(test_output.submit())

if __name__ == "__main__":
    main()

test.sh:

#!/bin/bash
echo "I'm from the subprocess"
echo "I'm the second line from subprocess"

Output:

15:55:14.398 | INFO    | Flow run 'liberal-crayfish' - Submitted task run 'test_output-0' for execution.
15:55:15.169 | INFO    | Task run 'test_output-0' - hey hey i end up in the logger
I'm from the subprocess
I'm the second line from subprocess
15:55:15.180 | INFO    | Task run 'test_output-0' - X
15:55:15.191 | INFO    | Task run 'test_output-0' - hey hey i end up in the logger
I'm from the subprocess
I'm the second line from subprocess
15:55:15.203 | INFO    | Task run 'test_output-0' - X
15:55:15.213 | INFO    | Task run 'test_output-0' - hey hey i end up in the logger
15:55:15.228 | INFO    | Task run 'test_output-0' - I'm from the subprocess
15:55:15.229 | INFO    | Task run 'test_output-0' - I'm the second line from subprocess
15:55:15.230 | INFO    | Task run 'test_output-0' - X
15:55:15.489 | INFO    | Task run 'test_output-0' - Finished in state Completed()

These lines from test.sh only end up in the prefect logger when redirecting the stdout file descriptor. Otherwise, they are just printed to the terminal.

I'm from the subprocess
I'm the second line from subprocess

Additional context

No response

EmilRex commented 1 year ago

@j-tr Would the PREFECT_LOGGING_EXTRA_LOGGERS setting (documentation here) work for your use case?

j-tr commented 1 year ago

i am not sure if an extra logger could capture this because the logs are not coming from any specific python library but they are just directly written to the stdout file descriptor.

EmilRex commented 1 year ago

@j-tr I'm not the right person to determine whether that's feasible or not, but what I can say is that many of our users collect logs at the infrastructure level through their log aggregator of choice. Doing so is probably the most robust way to ensure you are collecting all available logs.

trahloff commented 1 year ago

@EmilRex, as discussed during our recent call, some follow-up information:

zhen0 commented 1 year ago

Adding needs engineering feedback here as it would be great if someone could dig into what's possible here.

github-actions[bot] commented 1 year ago

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

trahloff commented 1 year ago

Still relevant.

WillRaphaelson commented 8 months ago

If we implement this as a setting i.e PREFECT_LOGGER_LOG_STOUT we'd also want to raise a warning. This setting should also be opt in.

trahloff commented 8 months ago

@WillRaphaelson, that would work for us. Is this a feature you are potentially working on in the near future?