PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.92k stars 1.56k forks source link

Improve dask-scheduler memory management of Prefect `State` objects #5230

Open xuevin opened 2 years ago

xuevin commented 2 years ago

Description

The dask-scheduler uses dask.sizeof to manage memory for scaling and work distribution. Tasks which return a large payload can cause scheduling to stall, or may cause workers to exit. This can be better managed by giving the scheduler awareness of the payload sizes.

Expected Behavior

State.__sizeof__() should be defined to return the number of bytes stored in the object.

Reproduction

Define a task which returns a large numpy array. The dask-scheduler will report the managed-memory at 48 B.

@task
def memory_task():
    x = np.random.randint(9, size=2*10**6)
    return x

Proposed resolution: Currently State objects do not have __sizeof__ defined, which causes scheduler to think the payload is 48 B. Ideally, this __sizeof__ function should be implemented to capture the size of the result produced /stored in State.result.

zanieb commented 2 years ago

This seems reasonable, I'd review a PR that adds a basic sizeof implementation.