tobymao / saq

Simple Async Queues
https://saq-py.readthedocs.io/en/latest/
MIT License
532 stars 37 forks source link

Running multiple workers #114

Closed barrelltech closed 6 months ago

barrelltech commented 6 months ago

As part of the comparisons with arq, you mention you can easily run multiple workers to leverage cpu cores.

I thought this implied some logic of marking which functions are run. However, if I run multiple workers processes with the same settings object, the first-run worker is the only one that executes tasks.

What am I missing? Is it just that you can define different workers to different functions?

barakalon commented 6 months ago

Can you share more information on your setup?

barrelltech commented 6 months ago

I'm still playing around with things, so just been adding a variious log functions with various time.sleep and asyncio.sleep calls, starting a server, and adding them with a test script. I don't still have access to the exact test I ran but it's something similar to:

app/saq/test.py

import asyncio
from app.saq import queue

async def main():
    for i in range(50):
        await queue.enqueue("log_n", n=i)
        await queue.enqueue("log_pause", i=i)

if __name__ == "__main__":
    asyncio.run(main())

app/saq/__init__.py

from saq import Queue, CronJob
import asyncio
import os
import time
from icecream import ic

async def log_pause(ctx, *, i):
    ic(f"start:log_pause:{i}")
    time.sleep(5)
    ic(f"finish:log_pause:{i}")

async def log_n(ctx, i, n):
    ic(f"start:log_n:{n}")
    for i in range(n):
        await asyncio.sleep(0.05)
        ic(f"loop:log_n:{n}:{i}")

FUNCTIONS = [log_pause, log_n]
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
queue = Queue.from_url(REDIS_URL)

async def startup(ctx):
    pass

async def shutdown(ctx):
    pass

async def before_process(ctx):
    pass

async def after_process(ctx):
    pass

settings = {
    "queue": queue,
    "functions": FUNCTIONS,
    "startup": startup,
    "shutdown": shutdown,
    "before_process": before_process,
    "after_process": after_process,
    "concurrency": 5,
}

I'll run multiple processes with saq app.saq.settings -v, then run the test file with python app/saq/test.py, however only one of the processes ever handles tasks.

If this sounds like a bug, I can recreate the issue and send you exact code, but I'm assuming I'm missing something fundamental 😂

barrelltech commented 6 months ago

btw the library is amazing and I'm thoroughly enjoying it 😇 thanks for everything!

barakalon commented 6 months ago

Exactly how are you running the worker processes? Works fine for me..

barrelltech commented 6 months ago

Well I'll be darned, it's working for me now.

I think the issue might have come from me playing around with concurrency=1. I guess the value of setting refers to concurrency across all processes combined?

Either way sorry for not catching that myself!

PS: I'm integrating SAQ into my project, if you could provide an example of proper error handling with exponential backoff, I'd be happy to formalize it and submit a PR for the docs. I'll attach the first production task I'm working on below if that helps at all.

If not I'll figure it out from the source code and try to submit a PR anyways as payback for asking stupid questions 😉

async def block_gambit(ctx, *, uuid):
    """
    1. Fetch the language, text, and id for the block
    2. Get the canon information from gambit
    3. process the canon information into words and spans
    4. Update the block id's words and spans
    """
    # Fetch the block
    if not (block := await db.fetch_block(uuid, conn=ctx["db_conn"])):
        raise Exception(f"Block {uuid} not found")
    ic(block)

    # Fetch the canon object
    gambit = await ctx["httpx_client"].post(
        f"{PHRASING_GAMBIT_URL}/canon",
        json={"text": block["text"], "language": block["language"]},
    )
    ic(gambit)

    if gambit.status_code != 200:
        raise Exception(
            f"Canon API failed with status {gambit.status_code} for block {block_uuid}"
        )

    # Parse the response
    try:
        canon = gambit.json()
    except Exception:
        raise Exception(f"Canon API response not JSON for block {block_uuid}")
    ic(canon)

    # Process the canon information into spans and words
    spans, words = await process_canon(
        block["text"], block["language"], canon, conn=ctx["db_conn"]
    )
    ic(spans, words)

    # Update the block with the spans and words
    return await db.update_block(
        block["id"], data={"spans": spans, "words": words}, conn=ctx["db_conn"]
    )
tobymao commented 6 months ago

@barrelltech there's no such thing as a stupid question!

tobymao commented 6 months ago

https://github.com/tobymao/saq/blob/c89c141c64d3315394679d537f8b6120e466887b/saq/utils.py#L31

barrelltech commented 6 months ago

Yes I saw that, so I know it's possible, but it's unclear from the docs how I should best use retries, errors, and exponential backoff all together.

But I can figure it out and I'll report back! Thanks for the support

tobymao commented 6 months ago

i think saq will automatically retry stuff for you with exponential backoff