cenkalti / kuyruk

⚙️ Simple task queue for Python
https://kuyruk.readthedocs.org/
MIT License
231 stars 17 forks source link

Batch tasks sending #51

Closed frol closed 7 years ago

frol commented 7 years ago

As far as I can see from the code and from my experiments, Kuyruk creates a new connection to send every single task. It easily gets a bottleneck when you generate a list of tasks in a loop:

import tasks

for i in range(1000):
    tasks.echo("Hello Kuyruk %d" % i)

It seems that if I would be able to pass an external "channel" to the Task.send_to_queue, this would enable the way to reuse the existing connection:

import tasks

with tasks.echo.kuyruk.channel() as channel:
    for i in range(1000):
        tasks.echo.send_to_queue(args=("Hello Kuyruk %d" % i, ), channel=channel)

Ideally, it would be great to have a reusable connection (with auto-reconnection on connection errors) by default.

Currently, I workaround this limitation using Dask by running the tasks in threads:

import dask.bag

import tasks

indexes = dask.bag.from_sequence(range(1000))

indexes.map(lambda i: tasks.echo("Hello Kuyruk %d" % i)).compute()

Having a ping to RabbitMQ server at around 18ms, I can only schedule ~2 tasks/second with a simple loop, ~37 tasks/second using Dask, ~40 tasks/second when I reuse the channel (manually patched the send_to_queue), and ~238 tasks/second when I combine Dask with reusable connection:

import dask.bag

import tasks

indexes = dask.bag.from_sequence(range(1000), npartitions=40)

def send_in_batch(indexes):
    with tasks.echo.kuyruk.channel() as channel:
        for i in indexes:
            tasks.echo.send_to_queue(args=["Hello Kuyruk %d" % i], ch=channel)

indexes.map_partitions(send_in_batch).compute()

Another approach might be also valid, define a helper for batch scheduling, so the API would look something like this:

kuyruk.send_tasks(
    (task.echo, ["Hello Kuyruk %d" % i]) \
        for i in range(1000)
)

or add a helper method on the Task (I am not a fan of Celery, but it is how they handle this):

kuyruk.send_tasks(
    (task.echo.subtask("Hello Kuyruk %d" % i) \
        for i in range(1000)
)

It would be great to hear other thoughts on this issue.

cenkalti commented 7 years ago

@frol We also need batch sending sometimes. That would be good.

I like Celery approach the most. Did a quick implementation at https://github.com/cenkalti/kuyruk/pull/52. Should we merge this?