MetOffice / dagrunner

Directed acyclic graph (DAG) runner and tools
https://github.com/MetOffice/dagrunner?tab=readme-ov-file#dagrunner
BSD 3-Clause "New" or "Revised" License
1 stars 0 forks source link

INV: Real-time resource recording, usage and feedback mechanism #5

Open cpelley opened 8 months ago

cpelley commented 8 months ago

Dask has the means to manage resources at run-time and autoscale. However it is unknown how effective/efficient this will be in a real world scenario with large workflows. To that end, we have a fall-back plan which intends to instruct dask accurately with memory usage of each processing step while maintaining total separation with the configuration/recipe it is executing.

The new framework will provide a means to instruct the python graph of each steps resource requirement (memory footprint) and also to then write its footprint within each execution step via the logging and monitoring capability (https://github.com/MetOffice/pp_systems_framework/issues/4) to a database or otherwise for more accurate estimation for execution in the next cycle. That is, a feedback mechanism which allows adjusting memory footprint requirements that can evolve to reflect the changing circumstances based on weather.

sizeof

Register a custom sizeof, for example, for cubes:

from dask.sizeof import sizeof
from iris.cube import Cube

@sizeof.register(Cube)
def sizeof_cube(cube):
    return cube.lazy_data().nbytes

print(sys.getsizeof(cube), sizeof(cube))
56 8000000

providing an estimate of the operations (plugin) footprint

Ideally dask will handle this without too much trouble via spill over to disk. However it might not be effective at this. If dask turns out not to be effective enough at this, we can look at passing a dummy object to a task representing the expected memory consumption of the operation to be performed.

Using a "dummy" parameter to represent the anticipated memory usage of an operation will influence Dask's scheduling decisions indirectly. This method would involve attaching a proxy or placeholder object that simulates the memory footprint required by the operation and registering a custom sizeof function for that object.

class MemoryProxy:
    def __init__(self, estimated_size):
        self.estimated_size = estimated_size

@sizeof.register(MemoryProxy)
def sizeof_memory_proxy(proxy):
    return proxy.estimated_size

However, it's not certain whether this might negatively impact dasks ability to autoscale with spill over. That is, perhaps this would be only appropriate if estimates are accurate, meaning there are no spill over to disk.

One possible partway mitigation is simply stopping dask from being able to spill the memory object to disk:

class MemoryProxy:
    def __init__(self, estimated_size):
        self.estimated_size = estimated_size

    # Prevent serialization, thus making it non-spillable
    def __reduce__(self):
        raise TypeError("MemoryProxy objects should not be serialized or spilled to disk.")

However, dask will still think this memory as represented by the MemoryProxy object is consumed.

Alternative avenue

It appears that dask delayed (dask.delayed) objects can have called set_resources(memory=...) Could we do something like this:

tasks = [
    obj.set_resources(memory=req)
    for obj, req in zip(delayed_objects, memory_requirements)
]

Issues

Dependencies

Background reading

cpelley commented 4 months ago

Recording memory footprint of plugin execution is handled by https://github.com/MetOffice/dagrunner/issues/5 The things remaining from this issue are then this feedback mechanism referenced. That is, reading from the sqlite database and wrapping execution in objects with 'size' reflecting their likely footprint.