Koed00 / django-q

A multiprocessing distributed task queue for Django
https://django-q.readthedocs.org
MIT License
1.83k stars 284 forks source link

Multiple Queues ? #107

Open nicolastobo opened 8 years ago

nicolastobo commented 8 years ago

Hello !

I had a look on Django-Q and didn't find anything about multiple queues ? Does it exist, or is it in the roadmap ?

Nice work by the way.

Koed00 commented 8 years ago

Currently there are no multiple queues. Although we do use it in testing by setting the list_key variable, it is not possible for the workers to read from multiple queues based on priority. I'm not a fan of adding features, just for the sake of a named feature. So if you can describe a good use case scenario for what you want, I'll be happy to take a look at it and add it to the roadmap.

nicolastobo commented 8 years ago

Of course: I need to use async tasks for 2 things:

But if there're in the queue 10 A functions, the emails wouldn't be sent before minutes/hours.

But if you see another way to do this, I'm listening to you.

Koed00 commented 8 years ago

This is fairly easy to accomplish, it's just that it would also require to separate worker clusters for each queue. In fact most of the code is already in place to do this, I would just have make it a little bit more accessible.

It gets complicated when you have only one cluster working from several queues. Of course you can tell the workers to always pull from the higher priority queue first, but in your case it might happen that all workers are tied up doing long calculations on queue A, so the high priority queue still has to wait.

Dual queue, dual clusters:

from django_q.brokers import get_broker
from django_q.tasks import async
from django_q.cluster import Cluster

broker_A = get_broker('Priority_A')
broker_B = get_broker('Priority_B')

# queue to Priority_A
async('math.floor', 1.5, broker=broker_A)

# queue to Priority_B
async('math.copysign', 1, -1, broker=broker_B)

# start cluster A
c_a = Cluster(broker_A).start()

# start cluster B
c_b = Cluster(broker_B).start()

Of course this code will get stuck at running the first cluster, cause they need to be started in separate threads. That would just need a minor change to management command to make this work.

nicolastobo commented 8 years ago

Thank you ! I'll test it and tell you.

nicolastobo commented 8 years ago

That works perfectly, I just created a new command called "qclusters":

from optparse import make_option

from django.core.management.base import BaseCommand
from django.utils.translation import ugettext as _

from django_q.cluster import Cluster
from django_q.brokers import get_broker

from my_project.task_managers import clusters_names

class Command(BaseCommand):
    # Translators: help text for qcluster management command
    help = _("Starts MyProject Clusters.")

    option_list = BaseCommand.option_list + (
        make_option('--run-once',
                    action='store_true',
                    dest='run_once',
                    default=False,
                    help='Run once and then stop.'),
    )

    def handle(self, *args, **options):
        for c in clusters_names:
            q = Cluster(get_broker(c))
            q.start()
            if options.get('run_once', False):
                q.stop()

clusters_names is defined like this

clusters_names = ['email', 'A']
Koed00 commented 8 years ago

I'm happy that worked out. I've been using separate queues for parallel testing mostly, until now. Let me think a bit on how to integrate this into the project. You're running this on the same machine, but I can imagine other people might want a separate cluster instance per queue. Probably an optional --queue keyword for the qcluster command, which can take both a single queue name or a list, would be the way to go.

nicolastobo commented 8 years ago

I'm following the project on GitHub, and if you update anything about it, I would be happy to test it !

Eagllus commented 8 years ago

@nicolastobo I tried using your code but it seems if I run python manage.py qclusters (your function) The processes can no longer be stopped by using ctrl + c

@Koed00 I noticed sinds Django 1.8 the option_list is deprecated and should override the add_arguments function

def add_arguments(self, parser):
    parser.add_argument(
        '--run-once',
        action='store_true',
        dest='run_once',
        default=False,
        help='Run once and then stop.')
danmoz commented 7 years ago

Just wondering if there was any progress on this? I was going to implement the solution @nicolastobo posted but I noticed #137 (scheduled tasks run once per cluster) and I'm worried there might be other side effects.

Would love to see this solved as I've got the same problem (long running tasks block higher priority tasks in the queue) and I don't want to go back to Celery (!)

Perhaps a simpler solution would be to allow users to optionally assign task priority as an integer, which causes to worker to always execute the highest priority tasks first? Would be happy to submit a PR.

techdragon commented 6 years ago

I'm also looking at this, my use case is splitting tasks between 'immediate' tasks triggered by Django post_save signals, which I was looking to use Redis for, and less 'urgent' things such as daily scheduled tasks and other things where the additional visibility that using the ORM broker gives me is useful and the performance overheads are not an issue.

jordanmkoncz commented 6 years ago

Also looking for a way to achieve this. My main use case is that I have lots of tasks which are quick to execute (sending emails, sending SMS, etc.) and which I need to have processed ASAP, and then a few other tasks that are very long running (5 minutes or so). It's possible that a large number of these long running tasks can be queued up at the same time, and I want to avoid having all my workers stuck processing these long running tasks and preventing all of the other smaller tasks from running until after all of the long running tasks are processed.

kbuilds commented 5 years ago

Also looking for a good way to do this. Same use-case as others.

BuckinghamIO commented 4 years ago

I'm also looking for a feature to handle this, its certainly a valid use case else you have to try scaling it out with workers which eventually stops working if they can't process the tasks fast enough.

allcaps commented 3 years ago

I also had the requirement to have two queues. One for quick notification tasks and one for long running management tasks. I wanted them both to use the Django ORM broker.

I created a model that has a one to one with django_q.OrmQ. This stores a name. A NamedBroker only processes tasks with its own name.

To run each cluster:

python manage.py qcluster --settings settings.notification python manage.py qcluster --settings settings.management

Here is the code:

# models.py

class OrmQExtension(models.Model):
    """Extends OrmQ with a name field"""
    orm_q = models.OneToOneField("django_q.OrmQ", on_delete=models.CASCADE, related_name="extension")
    name = models.CharField("name", max_length=255)

    def __str__(self):
        return self.name

# broker.py

from time import sleep

from django.utils import timezone
from django_q.brokers.orm import ORM, _timeout
from django_q.conf import Conf

class NamedBroker(ORM):
    def queue_size(self) -> int:
        return (
            self.get_connection()
            .filter(key=self.list_key, lock__lte=_timeout(), extension__name=self.name)
            .count()
        )

    def lock_size(self) -> int:
        return (
            self.get_connection()
            .filter(key=self.list_key, lock__gt=_timeout(), extension__name=self.name)
            .count()
        )

    def purge_queue(self):
        return (
            self.get_connection()
            .filter(key=self.list_key, extension__name=self.name)
            .delete()
        )

    def enqueue(self, task):
        from some_app.models import OrmQExtension  # noqa

        package = self.get_connection().create(
            key=self.list_key, payload=task, lock=_timeout()
        )
        OrmQExtension.objects.create(orm_q=package, name=self.name)
        return package.pk

    def dequeue(self):
        tasks = self.get_connection().filter(
            key=self.list_key, lock__lt=_timeout(), extension__name=self.name
        )[0:Conf.BULK]
        if tasks:
            task_list = []
            for task in tasks:
                if (
                    self.get_connection()
                    .filter(id=task.id, lock=task.lock, extension__name=self.name)
                    .update(lock=timezone.now())
                ):
                    task_list.append((task.pk, task.payload))
                # else don't process, as another cluster has been faster than us on that task
            return task_list
        # empty queue, spare the cpu
        sleep(Conf.POLL)

class NotificationBroker(NamedBroker):
    name = "notification"

class ManagementBroker(NamedBroker):
    name = "management"

# Create a task

from django_q.tasks import async_task
async_task('some.task.function', broker=NotificationBroker())

# Create a schedule

from django_q.tasks import schedule

    schedule(
        func="django.core.management.call_command",
        args="('some_command',)",
        name="Some name",
        schedule_type=Schedule.DAILY,
        q_options={"broker": ManagementBroker()}
    )

# settings/base.py
...
Q_CLUSTER = {
    "name": "DjangoORM",
    ...
    "orm": "default",
}

# settings/notification.py

from .base import *  # noqa

Q_CLUSTER.update({"broker_class": "some_app.broker.NotificationBroker"})

# settings.management.py

from .base import *  # noqa

Q_CLUSTER.update({"broker_class": "some_app.broker.ManagementBroker"})
jpaav commented 3 years ago

I ran into issues while overriding the ORM class until I added my new instance variables to __getstate__ and __setstate__. Just a heads up for anyone trying this approach to get multiple ORM Queues.

Thinh-NT commented 2 years ago

Hi everyone, I'm not sure if my problems can be solved with background tasks due to I'm new to background tasks. My problems is I want to create a task per users (the task is importing data that consume a lot of time). Please let me know if you have any updates on this, thank you very much.

Shadercloud commented 2 years ago

Is there not a simple way to this this? Such as:

async_task('tasks.send_email', label=emails)
async_task('tasks.process_image', label=images)
python manage.py qcluster --label=emails
python manage.py qcluster --label=images

Why is this not a thing? It seems like basic functionality to not want every task in the same processing queue (especially if you have 1 server processing 1 queue and another processing another queue)

(Note @allcaps code works fine, but this seems like a ton of extra work to do something so basic)

Also it would be nice to have the ability to have a qcluster that processes all labels, or a list of multiple labels (or even all labels except, etc...)...basically the same way as the laravel queues work.

nickpolet commented 2 years ago

Would love the functionality that @Shadercloud mentioned above. Really handy for splitting up multiple tasks into different servers, with long running power hungry tasks running on a high powered machine, and smaller/shorter tasks running on a lower powered machine.

@Koed00 Are you still interested in this functionality?

woshiyanghai commented 1 week ago

Would love the functionality that @Shadercloud mentioned above. Really handy for splitting up multiple tasks into different servers, with long running power hungry tasks running on a high powered machine, and smaller/shorter tasks running on a lower powered machine.

@Koed00 Are you still interested in this functionality?

Have you found a better way here?