tobymao / saq

Simple Async Queues
https://saq-py.readthedocs.io/en/latest/
MIT License
532 stars 37 forks source link

duplicate jobs displayed in monitor #101

Closed younseoryu closed 10 months ago

younseoryu commented 10 months ago

screenshot of http://localhost:8080/queues/default

Screenshot 2023-11-18 at 1 55 39 AM

As you can see in the screenshot, there are two rows for jobs with identical key.

How to reproduce:

Run redis with docker:

  1. docker pull redis
  2. docker run --name redis-broker -p 6379:6379 -d redis
  3. confirm it's running with docker ps

within a root folder, make two files: worker.py, main.py

#worker.py
import asyncio

from saq import CronJob, Queue

# all functions take in context dict and kwargs
async def test(ctx, *, a):
    await asyncio.sleep(0.5)
    # result should be json serializable
    # custom serializers and deserializers can be used through Queue(dump=,load=)
    return {"x": a}

async def cron(ctx):
    print("i am a cron job")

async def startup(ctx):
    print("i am a start up")

async def shutdown(ctx):
    await ctx["db"].disconnect()

async def before_process(ctx):
    print(ctx["job"], ctx["db"])

async def after_process(ctx):
    pass

async def sleeper(ctx, *, a):
    await asyncio.sleep(a)
    return {"a": a}

async def adder(ctx, *, a, b):
    await asyncio.sleep(1)
    return a + b

queue = Queue.from_url("redis://localhost")

settings = {
    "queue": queue,
    "functions": [test, adder, sleeper],
    "concurrency": 10,
    # "cron_jobs": [CronJob(cron, cron="* * * * * */5")], # run every 5 seconds
    "startup": startup,
    # "shutdown": shutdown,
    # "before_process": before_process,
    # "after_process": after_process,
}
#main.py
import asyncio
import random
import time

from saq import CronJob, Queue
from worker import queue

async def cron_job(ctx):
    print("excuting cron job")

async def enqueue(count):
    queue = Queue.from_url("redis://localhost")
    for _ in range(count):
        await queue.enqueue("sleeper", a=5, timeout=40)

if __name__ == "__main__":
    now = time.time()
    asyncio.run(enqueue(count=1))
    print(time.time() - now)
  1. run saq worker.settings --web
  2. run python3 main.py
  3. you will see two identical rows in jobs section in http://localhost:8080/queues/default