dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 719 forks source link

Dead loop caused by `forward_logging` in nested tasks. #8915

Closed trivialfis closed 1 day ago

trivialfis commented 1 day ago

Describe the issue:

This might be a bug in my code, but the behavior is dependent on dask, hence opening an issue here to confirm.

There are two issues here:

output:

2024-10-28 20:54:29,748 - distributed.worker - INFO - -------------------------------------------------
2024-10-28 20:54:29,749 - distributed.core - INFO - Starting established connection to tcp://10.18.128.0:8786
2024-10-28 20:54:34,452 - distributed.worker - INFO - Starting Worker plugin forward-logging-test-logger
hello print
Hello
Hello
Hello
Hello
Hello
Hello
...

Currently, XGBoost logs the evaluation results on the scheduler process using its own communication channel. I'm trying to redirect the logs to the client process using Dask facilities.

Minimal Complete Verifiable Example:

Start a fresh cluster, this is required to reproduce the error.

dask scheduler --scheduler-file=sched.json
dask worker --scheduler-file=sched.json
from distributed import LocalCluster, Client, get_client
import logging

name = "test-logger"

def get_logger() -> logging.Logger:
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    if not logger.hasHandlers():
        handler = logging.StreamHandler()
        logger.addHandler(handler)
    return logger

def nested_fn():
    print("hello print", flush=True)
    get_logger().info("Hello")

def worker_fn():
    client = get_client()
    client.forward_logging(get_logger().name)
    client.submit(nested_fn).result()

def main():
    with Client(scheduler_file="sched.json") as client:
        client.submit(worker_fn).result()

if __name__ == "__main__":
    main()

Anything else we need to know?:

Environment:

pentschev commented 1 day ago

What happens here is that you submit worker_fn() that runs on each of the Workers, in there you run get_client() which gets a client, that is a Client from the worker process and is NOT the user-facing Client. I suspect what you want is to move client.forward_logging(get_logger().name) to your main() before submitting the task, e.g.:

...
def worker_fn():
    client = get_client()
    client.submit(nested_fn).result()

def main():
    with Client(scheduler_file="sched.json") as client:
        client.forward_logging(get_logger().name)
        client.submit(worker_fn).result()
trivialfis commented 1 day ago

Thank you for pointing it out. The use of nested function is by design, to simulate a user calling the XGBoost dask interface. The user might have arbitrary number of nested tasks before reaching the forward logging call.

hendrikmakait commented 1 day ago

Client.forward_logging is designed such that the loggers are only forwarded to the clients that requested them. In order to forward the logging to your user's client, you'd need some API call to happen user-side and initialize forwarding.

trivialfis commented 1 day ago

Ah, thank you for sharing! This is really helpful.