tobymao / saq

Simple Async Queues
https://saq-py.readthedocs.io/en/latest/
MIT License
579 stars 39 forks source link

Job is not performed if there is a scheduled job #69

Closed markus-zoezi closed 1 year ago

markus-zoezi commented 1 year ago

I have problems getting saq to work properly when using scheduled tasks.

As soon as I enqueue() a Job with scheduled set to something else than 0 I run into problems.

The scheduled job gets executed, but any jobs I try to enqueue during the time until the scheduled job is executed are silently ignored. They are not even performed after the scheduled job, they are simply just gone.

I've tried to use multiple workers, but that doesn't help. Any help in debugging this or help in understand how to setup this would be appreciated.

tobymao commented 1 year ago

can you give me a minimal reproducible setup script?

markus-zoezi commented 1 year ago

I think I actually found something. I have the following. key is normally None, so Job gets passed None - which it doesn't seem to like. If I don't pass key at all - it seems to work much better.

def create_task(host, method, params, time=None, key=None):
    from saq import Queue, Job
    queue = Queue.from_url(os.getenv('REDIS'))

    job = Job(
        timeout=0,
        function='rpc',
        scheduled=time or 0,
        key=key,
        kwargs=dict(
            host=host,
            data={
                'method': method,
                'params': params
            }
        )
    )
    await queue.enqueue(job)
tobymao commented 1 year ago

key is used to avoid duplicate jobs, if you set the same value to multiple jobs, only one will run. this seems to be your issue.

tobymao commented 1 year ago

you don't need to pass in key, it's set automatically as a uuid

markus-zoezi commented 1 year ago

Yes I understand that now. Thanks!

markus-zoezi commented 1 year ago

So my updated function becomes:

async def create_task_ecs(host, method, params, time=None, key=None):
    from saq import Queue, Job
    queue = Queue.from_url(os.getenv('REDIS'))

    job_args = dict(
        timeout=0,
        function='rpc',
        scheduled=time or 0,
        kwargs=dict(
            host=host,
            data={
                'method': method,
                'params': params
            }
        )
    )
    if key:
        job_args['key'] = key

    job = Job(**job_args)
    await queue.enqueue(job)

And that seems to work much better.