pydiverse / pydiverse.pipedag

A data pipeline orchestration library for rapid iterative development with automatic cache invalidation allowing users to focus writing their tasks in pandas, polars, sqlalchemy, ibis, and alike.
https://pydiversepipedag.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
15 stars 2 forks source link

Reflect on global communication with tasks in multi-node setup #92

Open windiana42 opened 1 year ago

windiana42 commented 1 year ago

From working with prefect, I realized it is pretty hard to set flags specific to a unit-test which influence task execution (i.e. disabling some external actions which won't work in a specific test setup). A particular challenge was that os.environ["xxx"] = "yyy" did not work in a system designed for multi-node execution. It seams the process pool scheduler was forked during import phase already (import prefect).

In pipedag, the way to pass information to flow construction and to flow execution are via attrs attribute in pipedag.yaml configuration file.

Questions:

The questions could already result in the conclusion that nothing needs to change. The solution could be that ConfigContext.attrs is a mutable member variable under full control of the user. Since only flow.run() starts any potential parallelization, the ConfigContext as of shortly before calling flow.run() will be copied to all worker threads/processes/nodes.

NMAC427 commented 1 year ago

All ConfigContext objects are immutable. However, you can create a new config context with modified attrs by calling .evolve on it.

config = ConfigContext.get()
new_config = config.evolve(attrs=config.attrs | {"x": "y"})
with new_config:
    flow.run()

No, you can't modify the config context within a task before calling flow.run(), because:

  1. Tasks don't get executed before flow.run() gets called.
  2. ConfigContext is an immutable contextvar, thus you can't replace / modify a ConfigContext from an outer scope in an inner scope.