spotify / luigi

Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.
Apache License 2.0
17.89k stars 2.4k forks source link

Failed to return in running luigi.build in luigi tasks #3208

Open gavincyi opened 2 years ago

gavincyi commented 2 years ago

I have a workflow to trigger another luigi.build in the task run method (like below). Both parent and child level of luigi build use multiple workers.

import luigi

class Task2(luigi.Task):
    def run(self):
        print("*"*10, "done task2")

class Task1(luigi.Task):
    def run(self):
        luigi.build([Task2()], local_scheduler=True, workers=2)
        print("*"*10, "done task1")

luigi.build([Task1()], local_scheduler=True, workers=2)

The scheduler in the child process (Task.run) stuck and failed to return. I killed the process and it seems to me the worker queue failed to pull the task.

2022-11-12 01:06:00,411 INFO Informed scheduler that task   Task2__99914b932b   has status   PENDING
2022-11-12 01:06:00,411 INFO Done scheduling tasks
2022-11-12 01:06:00,411 INFO Running Worker with 2 processes
^C2022-11-12 01:06:40,606 INFO Worker Worker(salt=4670658361, workers=2, host=slurm-node6.development-singapore.chorus, username=gchan, pid=27743) was stopped. Shutting down Keep-Alive thread
Traceback (most recent call last):
  File "test_luigi.py", line 37, in <module>
    luigi.build([Task1()], local_scheduler=True, workers=2)
  File "/home/gchan/intranet/environments/20220905_211953_12c7ddd0-0e8b-4bcd-b13b-b7e44cd1f344/lib/python3.8/site-packages/luigi/interface.py", line 239, in build
    luigi_run_result = _schedule_and_run(tasks, worker_scheduler_factory, override_defaults=env_params)
  File "/home/gchan/intranet/environments/20220905_211953_12c7ddd0-0e8b-4bcd-b13b-b7e44cd1f344/lib/python3.8/site-packages/luigi/interface.py", line 173, in _schedule_and_run
    success &= worker.run()
  File "/home/gchan/intranet/environments/20220905_211953_12c7ddd0-0e8b-4bcd-b13b-b7e44cd1f344/lib/python3.8/site-packages/luigi/worker.py", line 1242, in run
    self._handle_next_task()
  File "/home/gchan/intranet/environments/20220905_211953_12c7ddd0-0e8b-4bcd-b13b-b7e44cd1f344/lib/python3.8/site-packages/luigi/worker.py", line 1101, in _handle_next_task
    self._task_result_queue.get(
  File "/opt/chorus-python38/lib/python3.8/multiprocessing/queues.py", line 107, in get
    if not self._poll(timeout):
  File "/opt/chorus-python38/lib/python3.8/multiprocessing/connection.py", line 257, in poll
    return self._poll(timeout)
  File "/opt/chorus-python38/lib/python3.8/multiprocessing/connection.py", line 424, in _poll
    r = wait([self], timeout)
  File "/opt/chorus-python38/lib/python3.8/multiprocessing/connection.py", line 931, in wait
    ready = selector.select(timeout)
  File "/opt/chorus-python38/lib/python3.8/selectors.py", line 415, in select
    fd_event_list = self._selector.poll(timeout)
KeyboardInterrupt

Currently I am using the latest luigi version 3.1.1 and Python 3.8.12.

lallea commented 2 years ago

luigi.build() is intended to be called from your main function, not from run(). If you want Task2 to run before Task1, add it to requires() for Task1. See https://luigi.readthedocs.io/en/stable/tasks.html#task-run for an example.

gavincyi commented 2 years ago

We have built an internal scheduler library to leverage on luigi and the users pass a configuration specifying the pipelines on business level to schedule the tasks in luigi.

The problem is we have no control on users' tasks and they can pass in another level of luigi scheduler in the Task.run. Also, regard luigi as a schduler to run tasks in dependency trees. It makes total sense for the users to spawn a new tree on the node.

Meanwhile, I struggle to understand why no_lock parameter is set to True in the luigi.build function, but defaulted as False in the command line entry point.

lallea commented 2 years ago

Ok. Luigi is a simple tool, and that's one of its primary strengths in comparison to the alternatives. Recursive invocation is simply outside the design scope and this is not a functionality bug. Feel free to submit a doc PR to clarify this limitation if you want.

It is possible to build services on top of Luigi, as you have done, since Luigi is simple and not opinionated. We should not add features for each such case, however. It would complicate the core. It would be reasonable to refactor or expose existing features, or add optional functionality. But for the things you build on top, you will need to work within Luigi's design constraints. At least IMHO.

I don't know anything about no_lock.

gavincyi commented 2 years ago

Thanks for your detailed explanation.

I understand it would be huge pain for the contributors to maintain the open source software whenever the users request a change to work around with their corner cases. At the moment I am not requesting for any change in luigi but just to raise a question to understand why the scheduler was frozen in my case, so that we can accommodate with it in our library design. It just seems to me it is related to the parent pid process lock.

There are two evidences to demonstrate why it might be related to the luigi process lock

  1. Run the scheduler with single worker (without using multiprocessing in background)

Just change the child scheduler to run with workers=1 and it retains only one process retrieving the process lock. No dead lock.

import luigi

class Task2(luigi.Task):
    def run(self):
        print("*"*10, "done task2")

class Task1(luigi.Task):
    def run(self):
        luigi.build([Task2()], local_scheduler=True, workers=1)
        print("*"*10, "done task1")

luigi.build([Task1()], local_scheduler=True, workers=2)
  1. Run the child scheduler in a new process

Spawning the child scheduler in a separate process avoids sharing the same process lock and then it works perfectly.

import subprocess                                                                              
import luigi                                                                                    

class Task2(luigi.Task):                                      
    def run(self):                                            
        print("*" * 10, "done task2")                         

class Task1(luigi.Task):                                      
    def run(self):                                            
        subprocess.run(                                       
            [                                                 
                "luigi",                                      
                "--local-scheduler",                          
                "--workers",                                  
                "2",                                          
                "--module",                                   
                "test_luigi_subprocess",                      
                "Task2",                                      
            ],                                                
            check=True,                                       
        )                                                     
        print("*" * 10, "done task1")                         

luigi.build([Task1()], local_scheduler=True, workers=2)   

It'd be great if anyone could share more details / previous discussions about the usage of process lock in luigi, or the luigi design doc around arguments no_lock, take_lock and lock_size