cgarciae / pypeln

Concurrent data pipelines in Python >>>
https://cgarciae.github.io/pypeln
MIT License
1.55k stars 98 forks source link

how to use on_start functions with arguments #98

Closed alpae closed 1 year ago

alpae commented 1 year ago

Hi @cgarciae

I'm trying to use a on_start function that uses an extra argument. From the code I see in Stage.run, it seems that you've planned to allow for additional arguments apart from the worker_info, but I don't see a way to pass these arguments in the end:

 def run(self) -> tp.Iterable:

    worker_info = WorkerInfo(index=0)

    on_start_args: tp.List[str] = (
        pypeln_utils.function_args(self.on_start) if self.on_start else []
    )
    on_done_args: tp.List[str] = (
        pypeln_utils.function_args(self.on_done) if self.on_done else []
    )

    if self.on_start is not None:
        on_start_kwargs = dict(worker_info=worker_info)
        kwargs = self.on_start(
            **{
                key: value
                for key, value in on_start_kwargs.items()
                if key in on_start_args
            }
        )

it seems you check for additional arguments, but the on_start_kwargs is hard-coded to the worker_info only. Any suggestion how to solve this?

Thanks Adrian

cgarciae commented 1 year ago

Hey @alpae. That code is just checking if the users requests the argument worker_info in very general way. What other arguments do you plan on using? If its a custom object, maybe you can pass it to on_start via a closure? E.g:

def get_on_start(my_object):
  def on_start():
    # use my_object here
    ...
  return on_start

on_start = get_on_start(my_object)
alpae commented 1 year ago

Thanks, yes, I think that should work. I found also another way with a lambda function that worked for me. In my case it was simply a path to a hdf5 file that should be loaded in the sub-process. Thanks for the cool library btw.