taskiq-python / taskiq

Distributed task queue with full async support
MIT License
694 stars 44 forks source link

`kiq().with_labels(delay=X)` in documents not a valid invocation? #279

Open pahrohfit opened 5 months ago

pahrohfit commented 5 months ago

The documentation for Kicker shows .with_labels(delay=1):

async def main():
    # This task was initially assigned to broker,
    # but this time it is going to be sent using
    # the second broker with additional label `delay=1`.
    task = await my_async_task.kicker().with_broker(second_broker).with_labels(delay=1).kiq()
    print(await task.get_result())

But this doesn't appear to be a valid invocation of taskiq, just purely an example with a label? This code, when set to delay=600 executes from the Worker immedately, rather than after a delay.

Is the intention to use the Scheduler to achieve this functionality? If so, the docs should probably remove delay in favor of something else as a generic, since delay seems to only be part of cli.scheduler.run.delayed_send() -- or better yet, maybe delay should become a standard part of the kiq() invocation to remove the dependancies on a single running Scheduler if not looking for anything remotely crontab based and light weight, and is a cleaner resolution to #187 ?

reuben-dutton commented 5 months ago

I can't speak to the intent with regards to Scheduler, but the delay label has a use in the taskiq-aio-pika plugin library here, which is only relevant if you intend on using RabbitMQ as a message broker.

pahrohfit commented 5 months ago

I can't speak to the intent with regards to Scheduler, but the delay label has a use in the taskiq-aio-pika plugin library here, which is only relevant if you intend on using RabbitMQ as a message broker.

Ahhhh ... just read though it and see that its expected to be implemented at the broker layer.

s3rius commented 5 months ago

If you want to have delayed tasks, I'd suggest you to use scheduler and schedule task by time. The scheduler is compatible with any broker but it should run as a separate service.

Here's the discussion with how you can use it. https://github.com/orgs/taskiq-python/discussions/275 I will update docs later.

Arutemu64 commented 5 months ago

Do I get it right that scheduler polls sources once a minute so if I want to schedule a new task with a few seconds delay using RedisScheduleSource it can take up to a minute for scheduler to send it to workers? Doesn't sound very good, I hope I'm wrong.

s3rius commented 4 months ago

No, you're correct. It will take up to minute for a default scheduler to send a task. If you want to add delay for few seconds I'd ask you to do something like this:

import asyncio
from typing import Any, Coroutine

async def _sub_delay(seconds: int, future: Coroutine[Any, Any, Any]):
    await asyncio.sleep(seconds)
    await future

def delayed_await(seconds: int, future: Coroutine[Any, Any, Any]) -> asyncio.Task[Any]:
    loop = asyncio.get_running_loop()
    return loop.create_task(_sub_delay(seconds, future))

This thing allows you to create small delays before sending tasks on client's side without awaiting them. But be aware that such technique is good only for small delays.

The usage is like this:

 delayed_await(3, my_task.kiq("arg1", 2))