ezmsg-org / ezmsg

Pure-Python DAG-based high-performance SHM-backed pub-sub and multi-processing pattern
https://ezmsg.readthedocs.io/en/latest/
MIT License
9 stars 4 forks source link

gen_to_unit incompatible with multiprocessing #85

Open cboulay opened 5 months ago

cboulay commented 5 months ago

File "/Users/me/mambaforge/envs/ez/lib/python3.11/multiprocessing/reduction.py", line 60, in dump ForkingPickler(file, protocol).dump(obj) _pickle.PicklingError: Can't pickle <class 'abc.DummyUnit'>: attribute lookup DummyUnit on abc failed

But it works fine if I fully define DummySettings(ez.Settings) and Dummy(Gen).

I tried providing the __module__ on the dynamic type defined at the bottom of the gen_to_unit method...

        type(
            f"{func.__name__.capitalize()}Unit",
            (ez.Unit,),
            {
                ...,
                "__module__": getmodule(func),
                ...
            },
        ),

This gets past the first error and might be a good idea anyway, but a different error remains:

_pickle.PicklingError: Can't pickle <class 'DummyUnit'>: import of module <module 'ezmsg.chad.dummy' from '/Users/me/Code/tmp/ezmsg/ezmsg-chad/src/ezmsg/chad/dummy.py'> failed

Apparently we also (or only) need to provide __reduce__. Relevant SO answer.

cboulay commented 5 months ago

I pip installed multiprocess which is supposed to be identical to multiprocessing except it uses dill instead of pickle. Then I modified the import statements in ezmsg.core.backendprocess to use multiprocess instead of multiprocessing. This works!

I don't know if this is the right solution for this project though.

cboulay commented 5 months ago

@pperanich , minimum repro (except needs #90 to show working example):

import asyncio
import ezmsg.core as ez
import numpy as np
from dataclasses import replace
from typing import Any, Generator
from ezmsg.util.messages.axisarray import AxisArray
from ezmsg.util.debuglog import DebugLog
from ezmsg.util.gen_to_unit import gen_to_unit
from ezmsg.util.generator import consumer

@consumer
def pow(n: float) -> Generator[AxisArray, AxisArray, None]:
    axis_arr_in = AxisArray(np.array([]), dims=[""])
    axis_arr_out = AxisArray(np.array([]), dims=[""])
    while True:
        axis_arr_in = yield axis_arr_out
        axis_arr_out = replace(axis_arr_in, data=axis_arr_in.data**n)

AutoPowSettings, AutoPow = gen_to_unit(pow)

class MessageSender(ez.Unit):
    OUTPUT = ez.OutputStream(Any)

    @ez.publisher(OUTPUT)
    async def send_data(self):
        yield self.OUTPUT, AxisArray(np.arange(100), dims=["data"])
        await asyncio.sleep(1)
        raise ez.NormalTermination

if __name__ == "__main__":
    comps = {
        "SEND": MessageSender(),
        "POW": AutoPow(3),
        "LOG": DebugLog()
    }
    conns = (
        (comps["SEND"].OUTPUT, comps["POW"].INPUT_SIGNAL),
        (comps["POW"].OUTPUT_SIGNAL, comps["LOG"].INPUT)
    )

    ez.run(
        components=comps,
        connections=conns,
        process_components=(comps["POW"],)  # Comment me to work.
    )
griffinmilsap commented 2 months ago

Can verify this is still an issue. Maybe best path forward is to add multiprocess to optional dependencies and use it if installed. We can add an environment variable to force use of stdlib multiprocessing if multiprocess is installed by some other module