Bogdanp / dramatiq

A fast and reliable background task processing library for Python 3.
https://dramatiq.io
GNU Lesser General Public License v3.0
4.26k stars 302 forks source link

Feature request: create pipeline of groups #103

Open AndreaOrru opened 6 years ago

AndreaOrru commented 6 years ago

Currently, it's possible to create a group where the children are pipelines: children (Iterator[Message|group|pipeline]) – A sequence of messages, groups or pipelines.

However, it doesn't seem to be possible to create a pipeline with group as children: children (Iterator[Message|pipeline]) – A sequence of messages or pipelines. Child pipelines are flattened into the resulting pipeline.

Any plan to add this feature? Or any suggestions on how to implement it?

Bogdanp commented 5 years ago

This isn't currently possible because groups don't have any notion of a completion callback. i.e. to be able to use a group inside a pipeline you'd have to be able to send a message to the next actor in line a group completes. It may be possible to extend groups to support something like this, but I'll have to think about it more.

Can you tell me about your use case in the mean time?

ryanhiebert commented 5 years ago

Here's my use-case for this feature. We have a group where the functions gather some basic data about a bunch of courses at a school's online system. Once it's done, we figure out what users are in those courses and get that data in a callback function. We need to ensure that this happens after that group is done, or else we won't have the full list of users. Our use-case doesn't require the results to be passed along, we're just using it for timing, though I'm sure others' use-cases might want to actually pass the results along.

ryanhiebert commented 5 years ago

I've been reading the code for the past few hours to see if I can implement this feature. I've also been reading Celery's code to figure out how they are doing this marshaling with Redis. I don't have a solid plan yet for implementation, but I'm working on it.

I've got to say that this project's code is an absolute pleasure to read. The difference between this code and Celery's is night and day, and it's not just because of the additional functionality that Celery tries to offer. The care that you've taken on this code shows, and it's going to make the job of implementing this feature very targeted and light. This is excellent work. Thank you, @Bogdanp.

ryanhiebert commented 5 years ago

As I've read the implementation, it's become clear that, strictly speaking, the result middleware isn't required in order to make the task pipeline work. Celery absolutely requires the result storage in order to make chords (groups with a callback) work. I have an idea to continue that tradition, but I think there is reasonable concern for it to be a bad idea, so I wanted to bring it up, and see if I can get anyone else's thoughts on the approach.


First off, I think it's important to have a vision of what Celery does, or at least how I'd envision to implement a similar approach to Celery's in our code. This description is both imprecise and wrong, but close enough to help understand how it works.

When running a group, each message will be encoded with the callback message and the group id. Upon starting the group, the messages in the group will be counted, and stored in the result backend in a key related to the group id.

When each message processing completes successfully, the count of tasks is decremented in the result backend, and if the total count is 0 after that has happened, then the group callback is run. The read should happen atomically after the write. This may be more difficult to do with some result backends.


Obviously, porting that implementation from Celery requires the result backend, or at least something like it. Whatever we do, we're going to have to find a way to marshal the group, so that we can tell when the group has completed processing all of its messages.

My possibly hair-brained idea is to create a queue in the broker for each group that has a callback to marshal. There would be enqueued one message for each message in the group, and the messages that would be put on this per-group queue would be empty messages, without any data. As the messages in the group are processed in the other queue, each time a single message will be acked from this special queue. Once there are no more left in the group queue, then we know that the group has completed, and we can call the callback task.

Since it's not my target use-case, this doesn't currently consider how this would interact with the result backend to pass along the results of each task in the group. But that certainly would require the result backend in order to handle gathering the results of all the tasks in the group. I'll leave that exercise for the moment.

The main challenge I currently see with this approach is the problem of creating too many queues for running groups. One thing that can somewhat mitigate that challenge is that groups that do not have any callback need not create this temporary queue.

I'm not sure if these temporary queues can be set to automatically expire, similarly to how we can set redis keys to expire. Certainly this could be possible with the Redis broker, but I don't know if that's possible with RabbitMQ, or if it could be expected in general of brokers.


OK, so what do you think of that idea? Is it a tenable solution, or is it too risky to create a queue for each group that has a callback task? Is there another option that might make it possible to do this without a result backend?

ryanhiebert commented 5 years ago

It does look like at least RabbitMQ is capable of setting task and queue TTLs. https://www.rabbitmq.com/ttl.html#queue-ttl

Bogdanp commented 5 years ago

You can't get an accurate queue size from RMQ via the AMQP protocol because the number of received-but-not-acknowledged messages in a queue are not reported so I don't think this approach would end up being very stable, regardless of the issue of how many queues get created.

I think you have to have some type of atomic synchronisation between the nodes to make this sort of feature work so one option is a sort of "mesh" middleware where the nodes discover and talk to each other. This type of thing is really hard to get right. Another approach is to use some shared state -- as you mentioned -- like an ACID DB, Redis, Memcached or anything with some sort of atomic compare-and-set mechanism (Redis works because it's single threaded, memcached has a CAS operation).

I think the latter is what we should do. And I think a nice way to do it is add a "semaphore" rate limiter to the rate_limits package and then build on top of that. Upon being run, the group would initialise the semaphore, decrementing it after each successful task in the group. Once the count is zero, then it would enqueue the callback. This is sSimilar to what backends with atomic operations do in Celery, but it has the advantage of being a relatively simple solution to the problem and giving us a new primitive that we could potentially use in other code (think spawning many tasks at once and controlling exactly when they all fire by decrementing the semaphore, etc.).

Bogdanp commented 5 years ago

I should mention I definitely don't have the time to implement this right now so if that sounds good to you then please go ahead and implement it!

ryanhiebert commented 5 years ago

You can't get an accurate queue size from RMQ via the AMQP protocol

I don't think I technically need to get the size, I only need to check if there's another message to read. So I guess it would start at n-1, so that the last task would not get a task off of the queue. It would immediately be ack'd for this use-case, so I don't think that's too terrible of a concern, necessarily.

one option is a sort of "mesh" middleware where the nodes discover and talk to each other. This type of thing is really hard to get right.

Agreed. I'm definitely against attempting such a complicated scheme. I'm not smart enough to implement something like that. :smirk:

And I think a nice way to do it is add a "semaphore" rate limiter to the rate_limits package and then build on top of that.

That sounds like a really neat idea. Worth creating an issue for the semaphore even if we could (or even do) go with the queuing approach. It's definitely a plus that it splits up the implementation into independent parts.

ryanhiebert commented 5 years ago

@Bogdanp : With #134 completed, how do you envision this communicating with the rate limiter backend, so it can instantiate the barrier? Should the Pipeline middleware take the rate limiter backend as an optional argument? Since Pipeline is a default middle, how do you envision the setup of the middleware to implement this feature? I'll take a whack at implementing this, but the better feel I can get for what you're going to have in mind, the more likely it'll be that I implement this in a way you find acceptable.

ryanhiebert commented 5 years ago

The experiment I'm currently working on is to add a new middleware, Groups, that knows about Pipelines, but doesn't attempt to do anything that Pipelines does for itself, so that Groups can be added to the middleware without needing to modify or remove Pipelines. So far I think its going to be a workable solution. This middleware will need to take a rate limiter backend as an argument.

ryanhiebert commented 5 years ago

Unfortunately, I think that groups and pipelines are just too intertwined to attempt to keep them in separate middlewares. And, unfortunately, messages as well. I'm working on an alternative design that would be a replacement usage entirely for the current composition classes. Once I've got that, then I'll see what I can do to bring this new concept more in line with what currently exists, to see to what extent I can avoid making life difficult for users. I want to get the design right first, though.

ryanhiebert commented 5 years ago

I've been finding this to be a uniquely challenging problem to solve. I am making some progress. My current iteration is combining pipeline and groups into a single middleware as I mentioned I was going to do previously. In addition, my current work in progress wraps messages in a custom class, and for the moment does not support dealing with results in any way, including, and perhaps especially between tasks in the pipeline. I'm doing my best to make it easy to add that functionality in later, though.

It's still not complete enough to open a pull request for review to allow us to find out how it can best be integrated into the existing APIs, but I hope to have it in a reviewable state in the near future. With the holidays fast approaching, if it doesn't happen pretty soon, I will quite likely not be working on it over the holidays, since I'm going to be traveling.

gjeusel commented 3 years ago

Hey @Bogdanp @ryanhiebert ! Hope you're all doing well !

I wanted to know the status of this task ? What are the missing pieces ?

ryanhiebert commented 3 years ago

I'm not working on it anymore. I'm so frustrated with Celery's signatures that I'm questioning whether the design is anything more than a footgun. I've been investigation entirely other options, but I don't have anything specific to point you to. I still use Celery's signatures extensively, and I'm not using Dramatiq personally, even now.

Bogdanp commented 3 years ago

@gjeusel group completion callbacks were added in https://github.com/Bogdanp/dramatiq/pull/150/files , but I never got a chance to document them. More work is needed to get this integrated with pipelines, but if all you need is to run a task after a group completes, then you can use that work. See the tests for some examples.

gjeusel commented 3 years ago

Indeed, I saw the completion callback addition ! However my need is the pipeline of groups ! At the moment I'm forcing actors to have results in the backend, and wait manually for results to be available in the backend before starting another group.

Would you have any chance of finishing this feature anytime soon @Bogdanp ? 😃

Bogdanp commented 3 years ago

I'm unlikely to work on it any time soon, because I don't need anything like it. I'd be happy to review and merge if someone does the work, though.

aW3st commented 7 months ago

@Bogdanp I think I have an idea about how to implement this. Are you still open to a pull request?

santiago-roig commented 1 month ago

I maintain an open source pipe-lining library which aims to facilitate complex workflows. Under the hood it uses celery (which I've grown to disdain). Anyways I need to be able to run pipelines consisting of "groups" and have been working on switching from celery to dramatiq. I think that I've accomplished this by implementing a custom composition parallel_pipeline and custom middleware. See below:

composition.py contains parallel pipelines which takes in a list of lists of messages that can run together in parallel. It starts the first "group" of messages in parallel and then relies on middleware to continue the next group.

from __future__ import annotations

import copy
import typing
from typing import TYPE_CHECKING
from uuid import uuid4

from dramatiq.broker import get_broker

from pypeline.barrier import LockingParallelBarrier
from pypeline.dramatiq import REDIS_URL

if TYPE_CHECKING:
    from dramatiq.message import Message

class parallel_pipeline:
    """Chain actors together, passing the result of one actor to the
    next one in line.

    Parameters:
      children(typing.List[typing.List[Message]]): A sequence of messages or
        pipelines.  Child pipelines are flattened into the resulting
        pipeline.
      broker(Broker): The broker to run the pipeline on.  Defaults to
        the current global broker.
    """

    messages: list[Message]

    def __init__(self, messages: typing.List[typing.List[Message]], broker=None):
        self.broker = broker or get_broker()
        self.messages = messages

        execution_graph = []

        for message_group in self.messages:
            sub_execution_group = []
            group_completion_uuid = str(uuid4())
            for m in message_group:
                m.options["group_completion_uuid"] = group_completion_uuid
                message_dict = copy.deepcopy(m.asdict())
                sub_execution_group.append(message_dict)
            # Last item in the group is the id of the group to be executed
            execution_graph.append(sub_execution_group)

        for message_group in self.messages:
            for m in message_group:
                m.options["execution_graph"] = execution_graph

    def __len__(self):
        """Returns the length of the parallel_pipeline."""
        raise NotImplemented()

    def __str__(self):  # pragma: no cover
        raise NotImplemented()

    @property
    def completed(self):
        raise NotImplemented()

    @property
    def completed_count(self):
        raise NotImplemented()

    def run(self, *, delay=None):
        """Run this parallel_pipeline.

        Parameters:
          delay(int): The minimum amount of time, in milliseconds, the
            parallel_pipeline should be delayed by. If both parallel_pipeline's delay and
            first message's delay are provided, the bigger value will be
            used.

        Returns:
          parallel_pipeline: Itself.
        """
        starting_group = self.messages[0]

        completion_uuid = starting_group[0].options["group_completion_uuid"]
        locking_parallel_barrier = LockingParallelBarrier(
            REDIS_URL, task_key=completion_uuid, lock_key=f"{completion_uuid}-lock"
        )
        locking_parallel_barrier.set_task_count(len(starting_group))

        for m in starting_group:
            self.broker.enqueue(m, delay=delay)

        return self

    def get_result(self, *, block=False, timeout=None):
        raise NotImplemented()

    def get_results(self, *, block=False, timeout=None):
        raise NotImplemented()

The middleware here waits for messages to finish processing and then they "decrement" their count from redis. I opted to write a custom "barrier" which uses redis locking mechanisms to avoid race conditions where any two jobs execute this at the same time.

import copy
import os

from dramatiq.middleware import Middleware

from pypeline.barrier import LockingParallelBarrier

PARALLEL_PIPELINE_CALLBACK_BARRIER_TTL = int(
    os.getenv("DRAMATIQ_PARALLEL_PIPELINE_CALLBACK_BARRIER_TTL", "86400000")
)

class ParallelPipeline(Middleware):
    def __init__(self, redis_url):
        self.redis_url = redis_url

    def after_process_message(self, broker, message, *, result=None, exception=None):
        from dramatiq.message import Message

        if exception is None:
            group_completion_uuid = message.options.get("group_completion_uuid")
            if group_completion_uuid:
                locking_parallel_barrier = LockingParallelBarrier(
                    self.redis_url,
                    task_key=group_completion_uuid,
                    lock_key=f"{group_completion_uuid}-lock",
                )
                try:
                    locking_parallel_barrier.acquire_lock(
                        timeout=PARALLEL_PIPELINE_CALLBACK_BARRIER_TTL
                    )
                    remaining_tasks = locking_parallel_barrier.decrement_task_count()
                finally:
                    locking_parallel_barrier.release_lock()

                if remaining_tasks <= 0:
                    execution_graph = message.options.get("execution_graph")

                    for i in range(len(execution_graph)):
                        message_group = execution_graph[i]

                        # Check if the current group matches the group_completion_uuid
                        if (
                            message_group[0]["options"]["group_completion_uuid"]
                            == group_completion_uuid
                        ):
                            # Check if there is a next group
                            if i + 1 < len(execution_graph):
                                next_group = execution_graph[i + 1]

                                completion_uuid = next_group[0]["options"][
                                    "group_completion_uuid"
                                ]
                                locking_parallel_barrier = LockingParallelBarrier(
                                    self.redis_url,
                                    task_key=completion_uuid,
                                    lock_key=f"{completion_uuid}-lock",
                                )
                                locking_parallel_barrier.set_task_count(len(next_group))
                                for message in next_group:
                                    message["options"][
                                        "execution_graph"
                                    ] = copy.deepcopy(execution_graph)
                                    broker.enqueue(Message(**message))

Finally here's the custom barrier I use.

import time

import redis

class LockingParallelBarrier:
    def __init__(self, redis_url, task_key="task_counter", lock_key="task_lock"):
        # Connect to Redis using the provided URL
        self.redis = redis.StrictRedis.from_url(redis_url, decode_responses=True)
        self.task_key = task_key
        self.lock_key = lock_key

    def acquire_lock(self, timeout=5):
        """Acquire a lock using Redis."""
        while True:
            if self.redis.set(self.lock_key, "locked", nx=True, ex=timeout):
                return True
            time.sleep(0.1)

    def release_lock(self):
        """Release the lock in Redis."""
        self.redis.delete(self.lock_key)

    def set_task_count(self, count):
        """Initialize the task counter in Redis."""
        self.redis.set(self.task_key, count)

    def decrement_task_count(self):
        """Decrement the task counter in Redis."""
        return self.redis.decr(self.task_key)

    def get_task_count(self):
        """Get the current value of the task counter."""
        return int(self.redis.get(self.task_key) or 0)

I am still working on some things as you can see in the code above, but so far I can take a chain of actor's messages and run them in parallel groups over a pipeline. I will have this implemented in the next few weeks within my open source code as an "extension" to dramatiq, but I figured I'd drop the comment here since I've also been searching for this functionality and finally decided to just write it myself.

TODOs: