DAGWorks-Inc / hamilton

Hamilton helps data scientists and engineers define testable, modular, self-documenting dataflows, that encode lineage/tracing and metadata. Runs and scales everywhere python does.
https://hamilton.dagworks.io/en/latest/
BSD 3-Clause Clear License
1.85k stars 123 forks source link

Multi-processing pickle issue with pickling driver within Hamilton #1093

Closed skrawcz closed 2 months ago

skrawcz commented 2 months ago

Current behavior

This breaks on the below code -- it's hamilton within hamilton.

Stack Traces

Pickle error on Module Type.

Steps to replicate behavior

The mapper-worker-reducer pattern works well on multithreadingexecutor, but not work for multiprocessingexecutor. The error is cannot pickle 'module' object. Here is a minimal reproduce environment:

import worker

def double(a:int) -> int:
    return a*2
import mapper

from hamilton.htypes import Parallelizable, Collect
from typing import Any

def mapper(
    drivers: list,
    inputs: list,
    final_vars: list = [],
) -> Parallelizable[dict]:
    for dr, input_ in zip(drivers, inputs):
        yield {
            "dr": dr,
            "final_vars": final_vars or dr.list_available_variables(),
            "input": input_,
        }

def worker(mapper: dict) -> dict:
    _dr = mapper["dr"]
    _inputs = mapper["input"]
    _final_var = mapper["final_vars"]
    return _dr.execute(final_vars=_final_var, inputs=_inputs)

def reducer(worker: Collect[dict]) -> Any:

    return worker
from hamilton import driver
from hamilton.execution import executors

drivers = []
inputs = []
for i in range(4):
    dr = driver.Builder().with_modules(worker).build()
    drivers.append(dr)
    inputs.append({'a': i})

dr = (
    driver.Builder()
    .with_modules(mapper)
    .enable_dynamic_execution(allow_experimental_mode=True)
    .with_local_executor(executors.SynchronousLocalTaskExecutor())
    .with_remote_executor(executors.MultiProcessingExecutor(8))
    .build()
)
dr.execute(
    final_vars=["reducer"],
    inputs={"drivers": drivers, "inputs": inputs, "final_vars": ['double']},
)

Library & System Information

Latest.

Expected behavior

This work.

Additional context

Add any other context about the problem here.

skrawcz commented 2 months ago

From https://github.com/MolCrafts/molexp/pull/8