from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
redis_async_result = RedisAsyncResultBackend(
redis_url="redis://localhost:6379",
)
# Or you can use PubSubBroker if you need broadcasting
broker = ListQueueBroker(
url="redis://localhost:6379",
).with_result_backend(
result_backend=redis_async_result,
)
Starting this worker and then killing it with ctrl+c:
taskiq worker config.taskiq_scheduler:broker
[2024-11-08 18:13:02,105][taskiq.worker][INFO ][MainProcess] Pid of a main process: 22684
[2024-11-08 18:13:02,105][taskiq.worker][INFO ][MainProcess] Starting 2 worker processes.
[2024-11-08 18:13:02,109][taskiq.process-manager][INFO ][MainProcess] Started process worker-0 with pid 22686
[2024-11-08 18:13:02,110][taskiq.process-manager][INFO ][MainProcess] Started process worker-1 with pid 22687
^CWorker process interrupted.
Shutting down the broker.
Worker process interrupted.
Shutting down the broker.
[2024-11-08 18:13:03,078][taskiq.process-manager][WARNING][MainProcess] Workers are scheduled for shutdown.
Task exception was never retrieved
future: <Task finished name='Task-1' coro=<Receiver.listen() done, defined at /Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py:321> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionError('Connection closed by server.')])>
Traceback (most recent call last):
File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/cli/worker/run.py", line 148, in start_listen
loop.run_until_complete(receiver.listen())
File "uvloop/loop.pyx", line 1512, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1505, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1379, in uvloop.loop.Loop.run_forever
File "uvloop/loop.pyx", line 557, in uvloop.loop.Loop._run
File "uvloop/handles/poll.pyx", line 216, in uvloop.loop.__on_uvpoll_event
File "uvloop/cbhandles.pyx", line 83, in uvloop.loop.Handle._run
File "uvloop/cbhandles.pyx", line 66, in uvloop.loop.Handle._run
File "uvloop/loop.pyx", line 399, in uvloop.loop.Loop._read_from_self
File "uvloop/loop.pyx", line 404, in uvloop.loop.Loop._invoke_signals
File "uvloop/loop.pyx", line 379, in uvloop.loop.Loop._ceval_process_signals
File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/cli/worker/run.py", line 107, in interrupt_handler
raise KeyboardInterrupt
KeyboardInterrupt
During handling of the above exception, another exception occurred:
+ Exception Group Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py", line 333, in listen
| async with anyio.create_task_group() as gr:
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 763, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py", line 361, in prefetcher
| message = await iterator.__anext__()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq_redis/redis_broker.py", line 129, in listen
| yield (await redis_conn.brpop(self.queue_name))[
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 616, in execute_command
| return await conn.retry.call_with_retry(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/retry.py", line 62, in call_with_retry
| await fail(error)
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 603, in _disconnect_raise
| raise error
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/retry.py", line 59, in call_with_retry
| return await do()
| ^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 590, in _send_command_parse_response
| return await self.parse_response(conn, command_name, **options)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 637, in parse_response
| response = await connection.read_response()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/connection.py", line 543, in read_response
| response = await self._parser.read_response(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 82, in read_response
| response = await self._read_response(disable_decoding=disable_decoding)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 90, in _read_response
| raw = await self._readline()
| ^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/_parsers/base.py", line 221, in _readline
| raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
| redis.exceptions.ConnectionError: Connection closed by server.
+------------------------------------
Task exception was never retrieved
future: <Task finished name='Task-1' coro=<Receiver.listen() done, defined at /Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py:321> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionError('Connection closed by server.')])>
Traceback (most recent call last):
File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/cli/worker/run.py", line 148, in start_listen
loop.run_until_complete(receiver.listen())
File "uvloop/loop.pyx", line 1512, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1505, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1379, in uvloop.loop.Loop.run_forever
File "uvloop/loop.pyx", line 557, in uvloop.loop.Loop._run
File "uvloop/handles/poll.pyx", line 216, in uvloop.loop.__on_uvpoll_event
File "uvloop/cbhandles.pyx", line 83, in uvloop.loop.Handle._run
File "uvloop/cbhandles.pyx", line 66, in uvloop.loop.Handle._run
File "uvloop/loop.pyx", line 399, in uvloop.loop.Loop._read_from_self
File "uvloop/loop.pyx", line 404, in uvloop.loop.Loop._invoke_signals
File "uvloop/loop.pyx", line 379, in uvloop.loop.Loop._ceval_process_signals
File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/cli/worker/run.py", line 107, in interrupt_handler
raise KeyboardInterrupt
KeyboardInterrupt
During handling of the above exception, another exception occurred:
+ Exception Group Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py", line 333, in listen
| async with anyio.create_task_group() as gr:
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 763, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py", line 361, in prefetcher
| message = await iterator.__anext__()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq_redis/redis_broker.py", line 129, in listen
| yield (await redis_conn.brpop(self.queue_name))[
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 616, in execute_command
| return await conn.retry.call_with_retry(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/retry.py", line 62, in call_with_retry
| await fail(error)
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 603, in _disconnect_raise
| raise error
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/retry.py", line 59, in call_with_retry
| return await do()
| ^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 590, in _send_command_parse_response
| return await self.parse_response(conn, command_name, **options)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 637, in parse_response
| response = await connection.read_response()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/connection.py", line 543, in read_response
| response = await self._parser.read_response(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 82, in read_response
| response = await self._read_response(disable_decoding=disable_decoding)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 90, in _read_response
| raw = await self._readline()
| ^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/_parsers/base.py", line 221, in _readline
| raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
| redis.exceptions.ConnectionError: Connection closed by server.
+------------------------------------
Shouldn't it be silent exit? Exectly same thing happens with NATS backend but not with RabbitMQ.
[2024-11-08 15:11:07,684][taskiq.worker][INFO ][MainProcess] Pid of a main process: 9387
[2024-11-08 15:11:07,684][taskiq.worker][INFO ][MainProcess] Starting 2 worker processes.
[2024-11-08 15:11:07,688][taskiq.process-manager][INFO ][MainProcess] Started process worker-0 with pid 9389
[2024-11-08 15:11:07,690][taskiq.process-manager][INFO ][MainProcess] Started process worker-1 with pid 9390
^C[2024-11-08 15:13:45,281][taskiq.process-manager][DEBUG ][MainProcess] Got signal 2.
[2024-11-08 15:13:45,283][taskiq.process-manager][WARNING][MainProcess] Workers are scheduled for shutdown.
Worker process interrupted.
Worker process interrupted.
Shutting down the broker.
Shutting down the broker.
Task exception was never retrieved
future: <Task finished name='Task-1' coro=<Receiver.listen() done, defined at /Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py:321> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError()])>
Traceback (most recent call last):
File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/cli/worker/run.py", line 148, in start_listen
loop.run_until_complete(receiver.listen())
File "uvloop/loop.pyx", line 1512, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1505, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1379, in uvloop.loop.Loop.run_forever
File "uvloop/loop.pyx", line 557, in uvloop.loop.Loop._run
File "uvloop/handles/poll.pyx", line 216, in uvloop.loop.__on_uvpoll_event
File "uvloop/cbhandles.pyx", line 83, in uvloop.loop.Handle._run
File "uvloop/cbhandles.pyx", line 66, in uvloop.loop.Handle._runTask exception was never retrieved
future: <Task finished name='Task-1' coro=<Receiver.listen() done, defined at /Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py:321> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError()])>
Traceback (most recent call last):
File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/cli/worker/run.py", line 148, in start_listen
loop.run_until_complete(receiver.listen())
File "uvloop/loop.pyx", line 1512, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1505, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1379, in uvloop.loop.Loop.run_forever
File "uvloop/loop.pyx", line 557, in uvloop.loop.Loop._run
File "uvloop/handles/poll.pyx", line 216, in uvloop.loop.__on_uvpoll_event
File "uvloop/cbhandles.pyx", line 83, in uvloop.loop.Handle._run
File "uvloop/cbhandles.pyx", line 66, in uvloop.loop.Handle._run
File "uvloop/loop.pyx", line 399, in uvloop.loop.Loop._read_from_self
File "uvloop/loop.pyx", line 404, in uvloop.loop.Loop._invoke_signals
File "uvloop/loop.pyx", line 379, in uvloop.loop.Loop._ceval_process_signals
File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/cli/worker/run.py", line 107, in interrupt_handler
raise KeyboardInterrupt
KeyboardInterrupt
During handling of the above exception, another exception occurred:
+ Exception Group Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py", line 333, in listen
| async with anyio.create_task_group() as gr:
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 763, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/nats/aio/subscription.py", line 189, in next_msg
| msg = await future
| ^^^^^^^^^^^^
| File "/Users/Silver/.local/share/mise/installs/python/3.12.5/lib/python3.12/asyncio/tasks.py", line 520, in wait_for
| return await fut
| ^^^^^^^^^
| File "/Users/Silver/.local/share/mise/installs/python/3.12.5/lib/python3.12/asyncio/queues.py", line 158, in get
| await getter
| asyncio.exceptions.CancelledError
|
| During handling of the above exception, another exception occurred:
|
| Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py", line 361, in prefetcher
| message = await iterator.__anext__()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq_nats/broker.py", line 232, in listen
| nats_messages: typing.List[NatsMessage] = await self.consumer.fetch(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/nats/js/client.py", line 1073, in fetch
| msg = await self._fetch_one(expires, timeout, heartbeat)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/nats/js/client.py", line 1125, in _fetch_one
| msg = await self._sub.next_msg(timeout=deadline)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/nats/aio/subscription.py", line 196, in next_msg
| raise errors.ConnectionClosedError
| nats.errors.ConnectionClosedError: nats: connection closed
+------------------
File "uvloop/loop.pyx", line 399, in uvloop.loop.Loop._read_from_self
File "uvloop/loop.pyx", line 404, in uvloop.loop.Loop._invoke_signals
File "uvloop/loop.pyx", line 379, in uvloop.loop.Loop._ceval_process_signals
File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/cli/worker/run.py", line 107, in interrupt_handler
raise KeyboardInterrupt
KeyboardInterrupt
During handling of the above exception, another exception occurred:
+ Exception Group Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py", line 333, in listen
| async with anyio.create_task_group() as gr:
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 763, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/nats/aio/subscription.py", line 189, in next_msg
| msg = await future
| ^^^^^^^^^^^^
| File "/Users/Silver/.local/share/mise/installs/python/3.12.5/lib/python3.12/asyncio/tasks.py", line 520, in wait_for
| return await fut
| ^^^^^^^^^
| File "/Users/Silver/.local/share/mise/installs/python/3.12.5/lib/python3.12/asyncio/queues.py", line 158, in get
| await getter
| asyncio.exceptions.CancelledError
|
| During handling of the above exception, another exception occurred:
|
| Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py", line 361, in prefetcher
| message = await iterator.__anext__()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq_nats/broker.py", line 232, in listen
| nats_messages: typing.List[NatsMessage] = await self.consumer.fetch(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/nats/js/client.py", line 1073, in fetch
| msg = await self._fetch_one(expires, timeout, heartbeat)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/nats/js/client.py", line 1125, in _fetch_one
| msg = await self._sub.next_msg(timeout=deadline)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/nats/aio/subscription.py", line 196, in next_msg
| raise errors.ConnectionClosedError
| nats.errors.ConnectionClosedError: nats: connection closed
+------------------------------------
------------------
[2024-11-08 15:13:45,439][root][DEBUG ][MainProcess] Got event: ShutdownAction()
[2024-11-08 15:13:45,439][taskiq.process-manager][DEBUG ][MainProcess] Process manager closed.
interestingly enough there is no issue when result backend is not used
from taskiq_nats import PullBasedJetStreamBroker
broker = PullBasedJetStreamBroker(
servers="nats://localhost:4222",
)
logs:
taskiq worker config.taskiq_scheduler:broker
[2024-11-08 18:32:59,106][taskiq.worker][INFO ][MainProcess] Pid of a main process: 25309
[2024-11-08 18:32:59,106][taskiq.worker][INFO ][MainProcess] Starting 2 worker processes.
[2024-11-08 18:32:59,110][taskiq.process-manager][INFO ][MainProcess] Started process worker-0 with pid 25311
[2024-11-08 18:32:59,111][taskiq.process-manager][INFO ][MainProcess] Started process worker-1 with pid 25312
^CWorker process interrupted.
Worker process interrupted.
Shutting down the broker.
Shutting down the broker.
[2024-11-08 18:32:59,753][taskiq.process-manager][WARNING][MainProcess] Workers are scheduled for shutdown.
However with taskiq-redis there is no difference whether result backend was configured or not.
Here is my minimal setup:
Starting this worker and then killing it with
ctrl+c
:Shouldn't it be silent exit? Exectly same thing happens with NATS backend but not with RabbitMQ.
Code and logs for
taskiq-nats
case below:logs
interestingly enough there is no issue when
result backend
is not usedlogs:
However with
taskiq-redis
there is no difference whetherresult backend
was configured or not.