nats-io / nats.py

Python3 client for NATS
https://nats-io.github.io/nats.py/
Apache License 2.0
838 stars 173 forks source link

nats.errors.TimeoutError: nats: timeout #572

Closed michaelmohamed closed 1 month ago

michaelmohamed commented 1 month ago

Observed behavior

I am using nats with faststream, and I randomly get the below error. Everything seems to continue, along, until eventually it stops processing.

Has anyone ever run into this?

TimeoutError: nats: timeout
Traceback (most recent call last):
  File "/usr/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
    fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/nats/aio/client.py", line 1027, in _request_new_style
    msg = await asyncio.wait_for(future, timeout)
  File "/usr/lib/python3.9/asyncio/tasks.py", line 494, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/faststream/broker/subscriber/usecase.py", line 338, in consume
    await h.call(
  File "/usr/local/lib/python3.9/dist-packages/faststream/broker/subscriber/call_item.py", line 172, in call
    raise e
  File "/usr/local/lib/python3.9/dist-packages/faststream/broker/subscriber/call_item.py", line 164, in call
    result = await call(message)
  File "/usr/local/lib/python3.9/dist-packages/faststream/broker/middlewares/base.py", line 73, in consume_scope
    await self.after_consume(err)
  File "/usr/local/lib/python3.9/dist-packages/faststream/broker/middlewares/base.py", line 54, in after_consume
    raise err
  File "/usr/local/lib/python3.9/dist-packages/faststream/broker/middlewares/base.py", line 64, in consume_scope
    result = await call_next(await self.on_consume(msg))
  File "/usr/local/lib/python3.9/dist-packages/faststream/broker/wrapper/call.py", line 201, in decode_wrapper
    return await func(msg)
  File "/usr/local/lib/python3.9/dist-packages/fast_depends/use.py", line 148, in injected_wrapper
    r = await real_model.asolve(
  File "/usr/local/lib/python3.9/dist-packages/fast_depends/core/model.py", line 530, in asolve
    response = await run_async(call, *final_args, **final_kwargs)
  File "/usr/local/lib/python3.9/dist-packages/fast_depends/utils.py", line 48, in run_async
    return await cast(Callable[P, Awaitable[T]], func)(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/faststream/utils/functions.py", line 53, in to_async_wrapper
    return await call_or_await(func, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/fast_depends/utils.py", line 48, in run_async
    return await cast(Callable[P, Awaitable[T]], func)(*args, **kwargs)
  File "/code/main.py", line 248, in handle
    await publisher_utility_poles.publish(
  File "/usr/local/lib/python3.9/dist-packages/faststream/nats/publisher/usecase.py", line 153, in publish
    return await call(message, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/faststream/broker/middlewares/base.py", line 115, in publish_scope
    await self.after_publish(err)
  File "/usr/local/lib/python3.9/dist-packages/faststream/broker/middlewares/base.py", line 90, in after_publish
    raise err
  File "/usr/local/lib/python3.9/dist-packages/faststream/broker/middlewares/base.py", line 102, in publish_scope
    result = await call_next(
  File "/usr/local/lib/python3.9/dist-packages/faststream/nats/publisher/producer.py", line 162, in publish
    await self._connection.publish(
  File "/usr/local/lib/python3.9/dist-packages/nats/js/client.py", line 125, in publish
    msg = await self._nc.request(
  File "/usr/local/lib/python3.9/dist-packages/nats/aio/client.py", line 991, in request
    msg = await self._request_new_style(
  File "/usr/local/lib/python3.9/dist-packages/nats/aio/client.py", line 1040, in _request_new_style
    raise errors.TimeoutError
nats.errors.TimeoutError: nats: timeout

This is roughly my code:

# Set the message processing time
MSG_PROCESSING_TIME = 10

# Initialize the broker
broker = NatsBroker(
    BROKER_URL,
    ping_interval=5,
    graceful_timeout=MSG_PROCESSING_TIME + 1,
)

# Initialize the publisher
publisher = broker.publisher(
    stream=OUTPUT_STREAM,
    subject=OUTPUT_SUBJECT,
    timeout=10,
)

# Initialize the FastStream app
app = FastStream(broker=broker, logger=logger)

@broker.subscriber(
    durable=GROUP_ID,
    stream=INPUT_STREAM,
    subject=INPUT_SUBJECT,
    pull_sub=PullSub(batch_size=1, timeout=5)
)
async def handle(data: InputTopicType) -> None:
    ....
    # this code can take up to 10 seconds to run
    ....

    # Publish the bbox to the output topic
    await publisher.publish(OutputTopicType(**output_subject_data))

async def main():
    await app.run()  # blocking method

if __name__ == "__main__":
    asyncio.run(main())

I am using nats on k8s.

Expected behavior

no timeout

Server and client version

nats-server: v2.10.16

Host environment

No response

Steps to reproduce

No response

itssimon commented 1 month ago

Yes, I also get a lot of these timeouts, for no obvious reasons. Load on the system is minimal, NATS server logs show nothing, network is fine otherwise. At the same time the "nats: missing response token" error is raised as well. I just had to upgrade my Sentry subscription because of the volume of errors ingested due to this :/

I'm also running NATS Server on Kubernetes.

Possibly related: https://github.com/nats-io/nats.py/issues/391

michaelmohamed commented 1 month ago

I see the token error as well.

michaelmohamed commented 1 month ago

@itssimon are you using faststream or vanilla nats.py?

itssimon commented 1 month ago

I don't use faststream, just vanilla nats.py