taskiq-python / taskiq

Distributed task queue with full async support
MIT License
689 stars 44 forks source link

RuntimeError: read() called while another coroutine is already waiting for incoming data #323

Closed shoang22 closed 2 months ago

shoang22 commented 2 months ago

Hello,

Thanks for all your amazing work. I have a task that uses Langchain's RecursiveCharacterTextSplitter to split text into chunks, counts the chunks, and stores them in redis:

# services.py

broker = NatsBroker(
    settings.nats_urls.split(","),
    queue="fastapi_app_queue",
).with_result_backend(
    RedisAsyncResultBackend(settings.redis_url),
)

taskiq_fastapi.init(broker, "src.app:get_app")

@broker.task(task_name="blocker")
async def nonblocking_call(text: str, task_id: str, db: RedisTaskiqDep):
    splitter = RecursiveCharacterTextSplitter()
    text_chunks = await run_in_threadpool(splitter.split_text, text=text)

    n_chunks = 0
    for _ in text_chunks:
        n_chunks += 1

    await db.setex(task_id, 600, n_chunks)

This is called from the following route:

# routes.py

router = APIRouter()

@router.post("/block", response_model=BaseResponse)
async def get_blocked(
    file: UploadFile = File(...),
):
    task_id = str(uuid.uuid4())
    await broker.startup()
    text = (await file.read()).decode()

    data = BlockRequestBody(text=text, task_id=task_id)
    await nonblocking_call.kiq(**data.model_dump())

    return BaseResponse(response_id=task_id)

When I try to make multiple calls in a row to /block, I expect a result with the task_id returned right away for each call. Not only does this not happen, but I get the following error:

nats: encountered error
Traceback (most recent call last):
  File "/home/user/workspace/playground/toy_fastapi_app/.venv/lib/python3.10/site-packages/nats/aio/client.py", line 2093, in _read_loop
    b = await self._transport.read(DEFAULT_BUFFER_SIZE)
  File "/home/user/workspace/playground/toy_fastapi_app/.venv/lib/python3.10/site-packages/nats/aio/transport.py", line 171, in read
    return await self._io_reader.read(buffer_size)
  File "/usr/lib/python3.10/asyncio/streams.py", line 669, in read
    await self._wait_for_data('read')
  File "/usr/lib/python3.10/asyncio/streams.py", line 487, in _wait_for_data
    raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data

Is there something I'm doing wrong? Not sure what to do at this point?

Here's the full repo for reference: https://github.com/shoang22/toy_fastapi_app

s3rius commented 2 months ago

Hi, and thanks for your interest in the project. I wonder where are these logs from? From fastapi app or from workers? Doesn't seem like it's a problem of taskiq, but natspy.

https://github.com/nats-io/nats.py/blob/d9524199442a53eae813be386c6bbfe8ca9666e8/nats/aio/client.py#L2125

Seems like they try to read from the same socket from two tasks within the loop. Becuase I don't see in your code anything that tries to read from any socket.

@chandr-andr, you better know natspy, have you ever faced such issue before?

s3rius commented 2 months ago

@shoang22, @chandr-andr has noticed that you run broker.startup on every request. Please don't do it. Run startup only once per application in startup. Maybe it's causing issues.

shoang22 commented 2 months ago

@s3rius @chandr-andr yep, that was indeed the issue. Removing seems to fix the problem.