tobymao / saq

Simple Async Queues
https://saq-py.readthedocs.io/en/latest/
MIT License
580 stars 39 forks source link

Issues in the sample project #25

Closed aprilahijriyan closed 2 years ago

aprilahijriyan commented 2 years ago

Hi!

I'm trying a sample project in the examples folder.

And I found an "error" that appears in my terminal.

Here are the full logs:

ERROR:saq:Traceback (most recent call last):
  File "/Users/user/Projects/saq/saq/worker.py", line 228, in process
    await job.finish(Status.COMPLETE, result=result)
  File "/Users/user/Projects/saq/saq/job.py", line 218, in finish
    await self.queue.finish(self, status, result=result, error=error)
  File "/Users/user/Projects/saq/saq/queue.py", line 378, in finish
    await pipe.execute()
  File "/Users/user/Projects/saq/venv/lib/python3.9/site-packages/redis-4.3.1-py3.9.egg/redis/asyncio/client.py", line 1334, in execute
    conn = await self.connection_pool.get_connection("MULTI", self.shard_hint)
  File "/Users/user/Projects/saq/venv/lib/python3.9/site-packages/redis-4.3.1-py3.9.egg/redis/asyncio/connection.py", line 1490, in get_connection
    async with self._lock:
  File "/Users/user/.pyenv/versions/3.9.11/lib/python3.9/asyncio/locks.py", line 14, in __aenter__
    await self.acquire()
  File "/Users/user/.pyenv/versions/3.9.11/lib/python3.9/asyncio/locks.py", line 120, in acquire
    await fut
RuntimeError: Task <Task pending name='Task-3508' coro=<Worker.process() running at /Users/user/Projects/saq/saq/worker.py:228> cb=[Worker._process(), _gather.<locals>._done_callback() at /Users/user/.pyenv/versions/3.9.11/lib/python3.9/asyncio/tasks.py:767]> got Future <Future pending> attached to a different loop

Lots of errors above appear in my terminal.

Here is the command to reproduce:

Terminal 1:

docker run -p 6379:6379 redis

Terminal 2:

saq examples.simple.settings --web

Terminal 3:

python examples/simple.py

There's a slight modification in examples/simple.py:

import asyncio
import random
import time

from saq import CronJob, Queue

async def sleeper(ctx, *, a):
    await asyncio.sleep(a)
    print("sleeper", a)
    return {"a": a}

async def adder(ctx, *, a, b):
    await asyncio.sleep(1)
    print("adder", a, b)
    return a + b

async def cron_job(ctx):
    print("excuting cron job")

settings = {
    "functions": [sleeper, adder],
    "concurrency": 100,
    "cron_jobs": [CronJob(cron_job, cron="* * * * * */5")],
}

async def enqueue(func, **kwargs):
    queue = Queue.from_url("redis://localhost")
    for _ in range(10000):
        await queue.enqueue(func, **{k: v() for k, v in kwargs.items()})

async def main():
    now = time.time()
    await enqueue("sleeper", a=random.random)
    await enqueue("adder", a=random.random, b=random.random)
    print(time.time() - now)

if __name__ == "__main__":
    asyncio.run(main())

Question:

Thanks!

tobymao commented 2 years ago

i ran your script and i'm unable to reproduce it

tobymao commented 2 years ago

in terms of "queues", you mean enqueued items? if so it can support as many as you can store in redis.

closing for now as i cannot reproduce, will reopen if you can send me something to debug