spotify / luigi

Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.
Apache License 2.0
17.85k stars 2.39k forks source link

task collection scheme seems to invalidate `batch_method=sum` #3241

Open kstech-roadie opened 1 year ago

kstech-roadie commented 1 year ago

TL;DR: Instances of Task are apparently never considered unique per instance, instead they are unique by a task_id that is a string of its class name and parameter values. This creates Task instance parsimony, which would be a nice optional feature. It does not however appear to be optional, which limits the framework's flexibility by conflicting with many use cases for the batching mechanism. So, the batch_method feature seems to have only been thought of in the context of min and max which are tolerant of the parsimonious instantiation approach, but it breaks batching functions that rely on all values being present such as len, sum, average, variance, etc.

I'm passing via IntParameter a count of desired/requested resources to a class named Resource(luigi.Task) that creates these resources against an API concurrently and then outputs their IDs for the requesting Task to use. Many dependencies call this task with their desired count of resources passed into the IntParameter. I am declaring this parameter like IntParameter(batch_method=sum) and its working very well....

...except for the frequently occurring case of asking for the same number of resources. All the Resource(1) requests turn into a single Task and the specified batch_method=sum never happens. The summation only happens for for unique Resource/Param combinations.

Is there any way around this issue? Initially I tried adding a "salt" Param and passing in values from randint. I also tried generating various fixed salts and hashes on objects, classes and modules. If the salt param is significant then nothing is summation-batched. If its insignificant, then I'm back to the old behavior. I also tried overriding task_id. None of this works. Looking for some kind of solution that unblocks what seems like a perfectly fitting use case for Luigi.

UPDATE 1:

I can force int values to to be unique by sending them as FloatParameter with unique decimal values and then casting back to int when they arrive in my Task's run method. this allows batch_method=sum to do its job without interference from Task instance parsimony behavior. I would prefer to extend Task or Parameter classes to fold this behavior in, and make the consumer unaware of how their CountParameter(luigi.Parameter) class is causing this to happen. Tips and pointers appreciated.

UPDATE 2:

If I could just get my CountParameter to somehow return int values to the Task I would be content with this hack:

class CountParameter(Parameter):
    _salt_base = 1e-09
    _salt = 0

    def normalize(self, x):
        if x == int(x):
            self._salt += self._salt_base
            return x + self._salt
        return x

    def parse(self, x):
        return float(x)

Using this parameter class I can declare a Task like:

class Resource(Task):
    pm = CountParameter(batch_method=sum)

And inside my run method:

    def run(self):
        pm = int(self.pm)

I've spent a fair bit of time reading the code and I'm not seeing another way, but I would love to have a cleaner solution to this.

Pros:

Cons:

Really it is this last detail that irks me. I don't care much if the task_ids in the console out look crazy. But I really need to be able to cast these floats back to int before passing back to the calling Task. Either that or find a whole new work around for this issue.

kstech-roadie commented 1 year ago

The Task/Parameter parsimony design decision irks me. It seems to be embedded into the core assumptions of this framework with no way to disable it. It annoys me that the very identity of a Task instance is defined by its class name and parameters. This goes against the object oriented design that that framework builds on and benefits from. While using unique object identity itself, it then cuts off and buries this functionality, disallowing the user to benefit from it. While this would have made a nice optional feature, currently this decision seems to completely cut off a lot of flexibility. I am starting to think I should have just built my project in RxPy or something that exposes the underlying language features and leaves them open to extension rather than forcing opinionated view of what a Task or a Parameter is, and how it may be used.

I would love to be wrong about this point of view. Please correct me if I am mistaken and I'm overlooking a framework feature that allows for this.

kstech-roadie commented 1 year ago

For anyone who comes across this post and reads this far. I would just say to you that Luigi is a funky, clunky, hacky, magical system that can able simple workflows with heavier processing needs. If you are building complex workflows that are more of the cooperative concurrency flavor, take a look at reactivex, rxpy and especially aioreactive which really brings it all together in a tidy package. I'm not sure why Luigi has so many stars and forks. It seems to occupy a space where its neither computationally powerful like distributed platforms Airflow and Spark, nor programmatically robust/flexible like ReactiveX. It is very specifically single node multiprocessing. I did not appreciate that when I started.