spotify / luigi

Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.
Apache License 2.0
17.81k stars 2.39k forks source link

Scalability problem: unworkable scheduler slowdown when requiring many tasks #1750

Closed proycon closed 6 years ago

proycon commented 8 years ago

I have run into a scalability problem where the scheduler causes an unworkable delay when many tasks are scheduled. Have I run into an inherent limitation in Luigi? In my use case I have a task that reads an input directory and requires() child tasks for each file in that input directory. I have a simple example case that just lowercases all files and reproduces the problem:

import os
import glob
import luigi

class LowercaseFile(luigi.ExternalTask):
    inputfile = luigi.Parameter()
    outputdir = luigi.Parameter()

    def run(self):
        with self.output().open('w') as f_out:
            with open(self.inputfile,'r',encoding='utf-8') as f_in:
                f_out.write(f_in.read().lower())

    def output(self):
        return luigi.LocalTarget(os.path.join(self.outputdir, os.path.basename(self.inputfile)))

class LowercaseDir(luigi.Task):
    inputdir = luigi.Parameter()
    outputdir = luigi.Parameter()

    def requires(self):
        return [ LowercaseFile(inputfile=inputfile, outputdir=self.outputdir) for inputfile in glob.glob(self.inputdir + '/*.txt') ]

    def run(self):
        with self.output().open('w') as f:
            f.write("done")

    def output(self):
        return luigi.LocalTarget("done")

When I run this with five workers and feed it a directory with 300,000 input files. The scheduler only manages to get new work about each every two seconds (so +- 166 hours to do it all), the child tasks finish way faster than two seconds, which implies only one of my five workers is effectively doing something. On a low number of files (1000 or less) everything works fast and fine.

It doesn't really matter whether I use the local scheduler or the central one, and whether I enable --parallel-scheduling on the local one or not.

Am I missing something? Is there a way to remedy this scalability problem, or have I hit a limit in Luigi itself? I was planning on using it on big data collections (300,000 files is still fairly small even).

Some output:

INFO: Informed scheduler that task   LowercaseFile_scaletest_txtdir_out2_7c1f2aea5e   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 5 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 299981
DEBUG: Asking scheduler for work...
INFO: [pid 31350] Worker Worker(salt=482450273, workers=5, host=mhysa, username=proycon, pid=28147) running   LowercaseFile(inputfile=scaletest.txtdir/153157.txt, outputdir=out2)
INFO: [pid 31350] Worker Worker(salt=482450273, workers=5, host=mhysa, username=proycon, pid=28147) done      LowercaseFile(inputfile=scaletest.txtdir/153157.txt, outputdir=out2)
DEBUG: Pending tasks: 299980
DEBUG: Asking scheduler for work...
INFO: [pid 31356] Worker Worker(salt=482450273, workers=5, host=mhysa, username=proycon, pid=28147) running   LowercaseFile(inputfile=scaletest.txtdir/29528.txt, outputdir=out2)
INFO: [pid 31356] Worker Worker(salt=482450273, workers=5, host=mhysa, username=proycon, pid=28147) done      LowercaseFile(inputfile=scaletest.txtdir/29528.txt, outputdir=out2)
DEBUG: Pending tasks: 299979
erikbern commented 8 years ago

Luigi probably doesn't scale to 100,000+ tasks – that was not one of its design goals.

proycon commented 8 years ago

That is rather unfortunate.. I have done some profiling of Scheduler.get_work() and wonder if there aren't ways to overcome this limitation by not considering all pending tasks every time, but partitioning them into smaller queues first and moving on to the next when a queue is done. This may not combine well with certain other features such as task priorities though.

(300k tasks, intel i7-4770K):

get_pending_tasks: 1.1205673217773438e-05s
list(relevant_tasks): 0.012180805206298828s
tasks.sort():0.39243364334106445s
iteration over tasks:0.9191298484802246s
erikbern commented 8 years ago

Yeah I think that's very much possible – it just hasn't been a use case so far. But I'm pretty sure it's possible to make get_work run in O(1). If you're interested in graph algorithm (topological sort etc) then feel free to take a stab and see if you can do it!

proycon commented 8 years ago

That sounds like an interesting challenge. I now first approached the issue in a bit of a crude way by identifying the current O(n) bottlenecks and working around them, at the cost of loss of features such as priorities, resources, and resulting in more arbitrary scheduling (no longer sorted by time). If this new fast_scheduler option is enabled, get_work() will effectively be `O(1). In my example this results in about 50 tasks per second, (as opposed to the problematic 0.5 tasks per second I experienced initially). It's not very thoroughly tested yet and I'm not entirely sure what other things I might have broken in the process.

The bulk of the task iteration went into calls to _upstream_status(), which in my impression only serves to compute an accurate count of the number of pending tasks to pass on to the worker? But the worker doesn't seem to use these n_unique_pending and n_pending_tasks counts except to check if they are non-zero, aside from logging purposes. In my implementation they therefore no longer obtain the full count, just after a schedulable task is found I break the iteration.

erikbern commented 8 years ago

cool – i think it's possible to combine the O(1) scheduling with all the existing bells and whistles – but it's probably not easy :)

Tarrasch commented 8 years ago

While I do admire your attempts to refine the algorithm, I'm pretty sure the quick workaround is simply to either set a --worker-task-limit or somehow utilize the Range module. There's no problem with recronning something every minute.

rjpower commented 7 years ago

This same issue seems to be cropping up for us quite frequently. It's frustrating to have to workaround the scheduler in every application. Re-cronning can be expensive as well (statting a large directory to identify the new work needed). Is there a generic solution to this that we could apply consistently (other than a faster scheduler...)

Rather than having the scheduler redo all of the prioritization every time get_work is called, can that work be performed in the background? It appears there's a lot of computation that's shared across workers.

stale[bot] commented 6 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. If closed, you may revisit when your time allows and reopen! Thank you for your contributions.