dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.55k stars 712 forks source link

Worker profile limited to a short timespan #8653

Open RaphaelRobidas opened 1 month ago

RaphaelRobidas commented 1 month ago

Describe the issue: The worker profile has a limited span and older data seems to be lost. For example, with the minimal example below, the total CPU time is 24 hours, but the profile never contains more than 4 h 26 min. At one point during the run, the profile looks like this: image

30 minutes later, it looks like this: image

The previous data is completely gone, as can be seen by the "activity over time" graph at the bottom.

This issue has been occurring for several months, most recently with dask and distributed 2024.4.2. You can look at the full discussion on Discourse: https://dask.discourse.group/t/measuring-the-overall-profile-of-long-runs/1859/11

Minimal Complete Verifiable Example:

import time
import logging

from dask.distributed import (
    Client,
    LocalCluster,
    get_client,
    as_completed,
    performance_report,
)

NUM_PROCESSES = 16

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s %(levelname)-8s %(message)s",
)

class DummyManager:
    def run(self):
        logging.info("Starting the manager")

        jobs = list(range(1, 97))

        client = get_client()
        futs = []
        for j in jobs:
            futs.append(client.submit(self.job, j))

        asc = as_completed(futs, with_results=True)
        for fut, ret in asc:
            logging.info(f"Processing future {str(fut)}: ret={str(ret)}")
            if ret > 0:
                logging.info(f"Launching a subjob with time {ret}")
                asc.add(client.submit(self.job, ret))
            fut.release()

    def job(self, n):
        time.sleep(60 * 15)

        return 0

if __name__ == "__main__":
    cluster = LocalCluster(
        n_workers=1,
        threads_per_worker=NUM_PROCESSES,
        processes=False,
    )

    client = Client(cluster)
    with performance_report(filename=f"dask-performance_{time.time():.0f}.html"):
        try:
            manager = DummyManager()
            manager.run()
        except KeyboardInterrupt:
            logging.info("Stopping the job...")
            cluster.close()
            exit(0)
    client.close()
    cluster.close()

Environment: