AlecThomson / arrakis

BSD 3-Clause "New" or "Revised" License
2 stars 1 forks source link

Logger issues when moving to prefect2 #27

Closed tjgalvin closed 1 year ago

tjgalvin commented 1 year ago

Each of the individual arrakis processing stages are currently a dask based pipeline. The workflow is constructed using dask.delayed functions, which are used to build a graph that is then traversed by dask workers.

The previous prefect version 1 pipeline has been updated to version 2. This pipeline sequentially calls each of the individual dask workflows. The intent is that some of the prefect 2 features (agents for remotely starting pipelines, RESTful API) can be leveraged when full racs processing commences.

We are creating a single Client/Cluster pair which is supplied to the main prefect workflow, and are relying on the prefect_dask.get_dask_client context manager to pass this compute resource manager through the to dask workflows started by each of the separate arrakis stages.

This setup though is losing information emitted by the arrakis logger. The supported methods to attach prefect log handlers to a module (which are defined via an export PREFECT_LOGGING_EXTRA_LOGGERS="arrakis or prefect logging configuration file) are not being properly propagated through to the code being executed with the @dask.delayed functions. Similarly, the logs that should be printed to stdout in these functions are not being printed to stdout. This makes debugging difficult. Grrr.

I have tried to hijack the arrakis logger when the delayed functions are being executed and manually attach the prefect orion logger handler. However, by this point there is not prefect context that can be obtained (e.g. the flowrun/taskrun id). Unless we want to pass this context information into the dask workflow when we enter some main, as I understand things, relying on the prefect helpers (e.g. get_run_context and get_run_logger) is not possible.

The way I have managed to get information out of the encapsulated dask workflows is to retrieve the distributed.worker logger and use that, similar to this below.

@delayed
def cutout(...):
    logger = logging.getLogger('distributed.worker')
    logger.setLevel(logging.INFO)

seems to work reliably, although it emits messages using the distributed.worker formatter.

And in typing up all this as a MWE I think this works just as well

from arrakis.logger import logger

@delayed
def cutout(...):
    logger.setLevel(logging.INFO)

The real gotcha in all of this seems to be the logging level. There needs to be a setLevel(logging.INFO) inside the dask.delayed functions - putting it at the top level of each function does not seem to work. The logger object created in arrakis.logger also has this level set. All this is to say I am not sure why this is needed. It might be related to how serialisation works? As the dask workers are scaled up and down with the Cluster.adapt method, there might be a need to reset this often?

Not sure where the problem is coming through.