Netflix / metaflow

Open Source Platform for developing, scaling and deploying serious ML, AI, and data science systems
https://metaflow.org
Apache License 2.0
8.22k stars 772 forks source link

TypeError: __init__() got an unexpected keyword argument 'use_cli' #503

Open Vineet-Sharma29 opened 3 years ago

Vineet-Sharma29 commented 3 years ago

I am trying to implement a text processing pipeline. Since I need to run it over list of files therefore I am using __init__ method as follows:-

class TextProcessing(FlowSpec):
  def __init__(self, filename):
    self.filename = filename
    super().__init__()

  @step
  def start(self):
    .....

However, on running the worflow I am getting following error:-

2021-05-02 17:54:39.851 [1619958258880108/merge_1/8 (pid 28142)] Task is starting.
2021-05-02 17:54:42.306 [1619958258880108/merge_1/8 (pid 28142)] <flow TextProcessing step merge_1> failed:
2021-05-02 17:54:42.315 [1619958258880108/merge_1/8 (pid 28142)] Internal error
2021-05-02 17:54:42.349 [1619958258880108/merge_1/8 (pid 28142)] Traceback (most recent call last):
2021-05-02 17:54:42.350 [1619958258880108/merge_1/8 (pid 28142)] File "/home/vineet/Documents/projects/celery-venv/lib/python3.6/site-packages/metaflow/cli.py", line 987, in main
2021-05-02 17:54:42.350 [1619958258880108/merge_1/8 (pid 28142)] start(auto_envvar_prefix='METAFLOW', obj=state)
2021-05-02 17:54:42.351 [1619958258880108/merge_1/8 (pid 28142)] File "/home/vineet/Documents/projects/celery-venv/lib/python3.6/site-packages/click/core.py", line 829, in __call__
2021-05-02 17:54:42.351 [1619958258880108/merge_1/8 (pid 28142)] return self.main(args, kwargs)
2021-05-02 17:54:42.543 [1619958258880108/merge_1/8 (pid 28142)] File "/home/vineet/Documents/projects/celery-venv/lib/python3.6/site-packages/click/core.py", line 782, in main
2021-05-02 17:54:42.543 [1619958258880108/merge_1/8 (pid 28142)] rv = self.invoke(ctx)
2021-05-02 17:54:42.543 [1619958258880108/merge_1/8 (pid 28142)] File "/home/vineet/Documents/projects/celery-venv/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
2021-05-02 17:54:42.544 [1619958258880108/merge_1/8 (pid 28142)] return _process_result(sub_ctx.command.invoke(sub_ctx))
2021-05-02 17:54:42.544 [1619958258880108/merge_1/8 (pid 28142)] File "/home/vineet/Documents/projects/celery-venv/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
2021-05-02 17:54:42.544 [1619958258880108/merge_1/8 (pid 28142)] return ctx.invoke(self.callback, ctx.params)
2021-05-02 17:54:42.544 [1619958258880108/merge_1/8 (pid 28142)] File "/home/vineet/Documents/projects/celery-venv/lib/python3.6/site-packages/click/core.py", line 610, in invoke
2021-05-02 17:54:42.544 [1619958258880108/merge_1/8 (pid 28142)] return callback(args, kwargs)
2021-05-02 17:54:42.544 [1619958258880108/merge_1/8 (pid 28142)] File "/home/vineet/Documents/projects/celery-venv/lib/python3.6/site-packages/click/decorators.py", line 33, in new_func
2021-05-02 17:54:42.545 [1619958258880108/merge_1/8 (pid 28142)] return f(get_current_context().obj, args, kwargs)
2021-05-02 17:54:42.545 [1619958258880108/merge_1/8 (pid 28142)] File "/home/vineet/Documents/projects/celery-venv/lib/python3.6/site-packages/metaflow/cli.py", line 504, in step
2021-05-02 17:54:42.545 [1619958258880108/merge_1/8 (pid 28142)] max_user_code_retries)
2021-05-02 17:54:42.545 [1619958258880108/merge_1/8 (pid 28142)] File "/home/vineet/Documents/projects/celery-venv/lib/python3.6/site-packages/metaflow/task.py", line 383, in run_step
2021-05-02 17:54:42.545 [1619958258880108/merge_1/8 (pid 28142)] input_obj = Inputs(self._clone_flow(inp) for inp in inputs)
2021-05-02 17:54:42.546 [1619958258880108/merge_1/8 (pid 28142)] File "/home/vineet/Documents/projects/celery-venv/lib/python3.6/site-packages/metaflow/datastore/datastore.py", line 34, in __init__
2021-05-02 17:54:42.546 [1619958258880108/merge_1/8 (pid 28142)] self.flows = list(flows)
2021-05-02 17:54:42.546 [1619958258880108/merge_1/8 (pid 28142)] File "/home/vineet/Documents/projects/celery-venv/lib/python3.6/site-packages/metaflow/task.py", line 383, in <genexpr>
2021-05-02 17:54:42.546 [1619958258880108/merge_1/8 (pid 28142)] input_obj = Inputs(self._clone_flow(inp) for inp in inputs)
2021-05-02 17:54:42.546 [1619958258880108/merge_1/8 (pid 28142)] File "/home/vineet/Documents/projects/celery-venv/lib/python3.6/site-packages/metaflow/task.py", line 200, in _clone_flow
2021-05-02 17:54:42.546 [1619958258880108/merge_1/8 (pid 28142)] x = self.flow.__class__(use_cli=False)
2021-05-02 17:54:42.547 [1619958258880108/merge_1/8 (pid 28142)] TypeError: __init__() got an unexpected keyword argument 'use_cli'
2021-05-02 17:54:42.547 [1619958258880108/merge_1/8 (pid 28142)] 
2021-05-02 17:54:42.548 [1619958258880108/merge_1/8 (pid 28142)] Task failed.

I saw a similar issue here but in my case I don't think using parameters would help.

romain-intel commented 3 years ago

Parameters would probably be the best way to solve this. You could define a parameter called filename which you could the pass to your flow using --filename (as an example). I am not sure I understand why you think parameters would not be a good fit in this case. You could also use IncludeFile as an alternative (it would include the actual file and save it).

Vineet-Sharma29 commented 3 years ago

Thanks for replying @romain-intel . In this case, I have to run multiple instance of the pipeline over list of files in a directory, which I am doing programatically and then passing the filename. I think that parameters needs to be passed via cli which isn't feasible when there are many files.

romain-intel commented 3 years ago

I see. Another possible approach would be to pass the name of the directory as a parameter and then do a foreach over the files inside there (so the filename would be passed as a regular input). I will think if I can think of another solution as well. The FlowSpec only has the use_cli argument (defaults to True) so you can try implementing it that way too but I am not sure if it will fully work.

vineetsharma883 commented 3 years ago

@romain-intel I think using foreach won't be apt in my case. I have pipeline like this:-

                                 ___F------
                     ______ D---|          |  
                    |           |___G---|  |__>  
          ____B-----|                   |----->H
         |          |______E_________________> ^
      A -|                                     |
         |____C________________________________|

Foreach requires joining all the branches, whereas in this case if I spawn task A(extract doc task) over list of items(filenames) then it is still branched over more tasks like B and C.

Even handling use_cli argument error which happened during join operation won't help. I tried simplifying my workflow as:-

A->B->H , where H is load to text file task

which does not have any branch or merge. The code won't run over all the files in the list but for first item in the list since Metaflow at present does not have DAG scheduler support

savingoyal commented 3 years ago

@vineetsharma883 Can you elaborate on the DAG scheduler support? There is an integration with AWS Step Functions and another one on the deck with Argo.

Regarding text processing over multiple files - You can choose whether you want to do for-each over all files or assign a different grouping strategy or just processing the entire directory in one shot. In you case, you can process all the files in step A or you can do a foreach just for step A and have a corresponding join before again forking into B and C.