tobymao / saq

Simple Async Queues
https://saq-py.readthedocs.io/en/latest/
MIT License
532 stars 37 forks source link

Long running blocking code getting swept. #132

Closed v3ss0n closed 1 month ago

v3ss0n commented 1 month ago

When a worker job took long and blocking it got swept. How to avoid that?

I am trying to run it in a thread by asyncio.to_thread but it seems not compatible with SAQ

  File "/usr/lib/python3.12/asyncio/tasks.py", line 520, in wait_for
    return await fut
           ^^^^^^^^^
  File "/run/media/nozander/8775bc56-c0e8-4aa4-bfd7-e7a3de973eb5/skyward-ai/SWParse/src/swparse/tasks.py", line 73, in parse_pdf_page_markdown_s3
    markdown, doc_images, out_meta = await coro
                                    ^^^^^^^^^^
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/run/media/nozander/8775bc56-c0e8-4aa4-bfd7-e7a3de973eb5/skyward-ai/SWParse/.venv/lib/python3.12/site-packages/saq/worker.py", line 260, in process
    result = await asyncio.wait_for(task, job.timeout if job.timeout else None)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/tasks.py", line 519, in wait_for
    async with timeouts.timeout(timeout):
  File "/usr/lib/python3.12/asyncio/timeouts.py", line 115, in __aexit__
    raise TimeoutError from exc_val
TimeoutError

code (task)

async def parse_pdf_page_markdown_s3(ctx: Context, *, s3_url: str, page: int) -> str:
    from swparse.convert import pdf_markdown

    s3 = S3FileSystem(
        # asynchronous=True,
        endpoint_url=storage.ENPOINT_URL,
        key=MINIO_ROOT_USER,
        secret=MINIO_ROOT_PASSWORD,
        use_ssl=False,
    )

    with s3.open(s3_url, mode="rb") as doc:
        data = doc.read()

    coro = asyncio.to_thread(pdf_markdown, data,start_page=page,max_pages=1)
    markdown, doc_images, out_meta = await coro
    return markdown

i also tried running it in executor , dosnet' work

    coro = loop.run_in_executor(None,functools.partial(pdf_markdown, data,start_page=page,max_pages=1))
    markdown, doc_images, out_meta = await coro 

it runs fines without asyncio.to_thread ..

tobymao commented 1 month ago

you can increase the timeout of a job to avoid it getting swept.

v3ss0n commented 1 month ago

you can increase the timeout of a job to avoid it getting swept.

So I have to define in Worker config? Can you point me out which I have to increase?

are there better ways to do that for blocking calls? Would it also happen with async jobs? Why those executors cannot work?

tobymao commented 1 month ago

if a job is async, you can just use async calls, and then they won't block. if you don't use async code, create your function without async and it will auto create a thread for you.

tobymao commented 1 month ago

https://github.com/tobymao/saq/blob/master/saq/job.py#L82

v3ss0n commented 1 month ago

Ah Thanks , gonna try

v3ss0n commented 1 month ago

I tried running without async but as soon as task start it is cancelled


Error processing job Job<function=parse_pdf_markdown_s3, kwargs={'s3_url': 'swparse/0ZDZAV.3V4C5J_GS35F0617W_GS-35F-0617W-5-28-2024-832574.PDF'}, queue=swparse, id=saq:job:swparse:a46271fe-5b28-11ef-9f87-d481d7f96db4, scheduled=0, progress=0.0, start_ms=15, attempts=1, status=Status.ACTIVE, meta={}>
Traceback (most recent call last):
  File "/usr/lib/python3.12/asyncio/tasks.py", line 520, in wait_for
    return await fut
           ^^^^^^^^^
  File "/run/media/nozander/8775bc56-c0e8-4aa4-bfd7-e7a3de973eb5/skyward-ai/SWParse/.venv/lib/python3.12/site-packages/saq/worker.py", line 306, in wrapped
    return await loop.run_in_executor(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/run/media/nozander/8775bc56-c0e8-4aa4-bfd7-e7a3de973eb5/skyward-ai/SWParse/.venv/lib/python3.12/site-packages/saq/worker.py", line 260, in process
    result = await asyncio.wait_for(task, job.timeout if job.timeout else None)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/tasks.py", line 519, in wait_for
    async with timeouts.timeout(timeout):
  File "/usr/lib/python3.12/asyncio/timeouts.py", line 115, in __aexit__
    raise TimeoutError from exc_val
TimeoutError

I made sure there are no async in there:

def parse_pdf_page_markdown_s3(ctx: Context, *, s3_url: str,page:int) -> str:
    from swparse.convert import pdf_markdown
    s3 = S3FileSystem(
        # asynchronous=True,
        endpoint_url=storage.ENPOINT_URL,
        key=MINIO_ROOT_USER,
        secret=MINIO_ROOT_PASSWORD,
        use_ssl=False
    )

    with s3.open(s3_url,mode="rb") as doc:
        markdown,doc_images,out_meta = pdf_markdown(doc.read(),start_page=page,max_pages=1)
    logger.error("parse_pdf_markdown_s3")
    logger.error(s3_url)
    logger.error(markdown)
    return markdown

The worker still running the tasks ,i can see the logs going on , but cannot get any results.

v3ss0n commented 1 month ago

Looks like a bug , gonna file a seperate one.