mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Robust_connection and RuntimeError "Writer is None" #288

Open rsaleev opened 4 years ago

rsaleev commented 4 years ago

Is it ok that robust_connectionwith args reconnection_interval=0.5and timeout=2 throws exception when connection couldn't be re-established during exchange.publish(Message(...))?

RuntimeError: Writer is None
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /usr/lib/python3.7/asyncio/tasks.py:596> exception=RuntimeError('Writer is None')>
Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/tasks.py", line 223, in __step
    result = coro.send(None)
  File "/usr/lib/python3.7/asyncio/tasks.py", line 603, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.7/dist-packages/aiormq/tools.py", line 86, in __await__
    return (yield from self().__await__())
  File "/usr/local/lib/python3.7/dist-packages/aiormq/connection.py", line 136, in drain
    raise RuntimeError("Writer is %r" % self.writer)
RuntimeError: Writer is None
socket.send() raised exception. 

When I use just a Connecton class and wrap connection with:

while self.cnx is None:
            try:
                self.cnx = await connect(f"amqp://{self.user}:{self.password}@{self.host}/", loop=self.loop, timeout=2)
            except (ConnectionError, ConnectionRefusedError, ConnectionResetError, ConnectionAbortedError):
                continue
else:
            self.connected = True
            self.ch = await self.cnx.channel()
            if self.exchange_type == 'fanout':
                self.ex = await self.ch.declare_exchange(self.exchange, ExchangeType.FANOUT)
            elif self.exchange_type == 'topic':
                self.ex = await self.ch.declare_exchange(self.exchange, ExchangeType.TOPIC)
            elif self.exchange_type == 'direct':
                self.ex = await self.ch.declare_exchange(self.exchange, ExchangeType.DIRECT)
            if not self.queue is None:
                self.queue = await self.ch.declare_queue(self.queue_name, durable=True)
                await self.queue.bind(self.ex, self.bind)
            return self

I see no exceptions during code execution. I use this to reconnect to server if connection was lost during message sending. I know that I've reinvented a wheel, but it works as I expected.

remram44 commented 3 years ago

I'm also getting RuntimeError: Writer is None a lot under network load. I also get ChannelInvalidStateError: <Channel: "1"> closed from publish().

remram44 commented 3 years ago
2020-11-19 22:41:15,434 WARNING asyncio: socket.send() raised exception.
2020-11-19 22:41:15,435 WARNING asyncio: socket.send() raised exception.
2020-11-19 22:41:15,435 WARNING asyncio: socket.send() raised exception.
2020-11-19 22:41:15,436 INFO aio_pika.robust_connection: Connection to amqp://datamart:******@rabbitmq:5672// closed. Reconnecting after 5 seconds.
2020-11-19 22:41:15,437 WARNING asyncio: socket.send() raised exception.
2020-11-19 22:41:15,437 WARNING asyncio: socket.send() raised exception.
2020-11-19 22:41:15,438 ERROR asyncio: Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /usr/local/lib/python3.7/asyncio/tasks.py:623> exception=RuntimeError('Writer is None')>
Traceback (most recent call last):
    File "/usr/local/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
    File "/usr/local/lib/python3.7/site-packages/aiormq/tools.py", line 86, in __await__
    return (yield from self().__await__())
    File "/usr/local/lib/python3.7/site-packages/aiormq/connection.py", line 138, in drain
    raise RuntimeError("Writer is %r" % self.writer)
RuntimeError: Writer is None
nhumrich commented 3 years ago

Any update on this issue?

mosquito commented 3 years ago

@nhumrich unfortunately not, too much have to be investigated.

nhumrich commented 3 years ago

@mosquito So, I tracked this down to being an issue with the RobustChannel not actually being robust. When a channel dies, you can call await channel.reopen() and it fixes everything, but I would expect RobustChannel to handle this itself. Instead, the channel just dies, and reopen has to be called manually. Close callbacks aren't called when a channel is closed non-gracefully, so you can't just add a callback to re-open.

You can reproduce by creating a channel, then doing something that will close the channel such as binding a queue that doesn't exist to an exchange.

Artimi commented 3 years ago

Just for records I experienced this when due to a failure in my program I did not acked a message. 30 minutes after that I could see this in RabbitMQ:

2021-06-15 11:16:00.456 [warning] <0.6808.6> Consumer ctag1.cf784dcdb9f4fd0e427d8d29bc731e18 on channel 1 has timed out waiting for delivery acknowledgement. Timeout used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more
2021-06-15 11:16:00.458 [error] <0.6808.6> Channel error on connection <0.6799.6> (10.233.64.217:57120 -> 10.233.64.223:5672, vhost: '/', user: 'user'), channel 1:
operation none caused a channel exception precondition_failed: delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more

And then I got this exception

Task exception was never retrieved
future: <Task finished name='Task-222840' coro=<MessagingToPubsubAdapter.remove_mapping() done, defined at /usr/local/lib/python3.8/site-packages/rabbit_messaging/adapters.py:133> exception=ChannelInvalidStateError('writer is None')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/rabbit_messaging/adapters.py", line 136, in remove_mapping
    await self._consumer.unsubscribe(routing_key)
  File "/usr/local/lib/python3.8/site-packages/rabbit_messaging/consumer.py", line 122, in unsubscribe
    await self._queue.unbind(self._exchanges[schema], routing_key)
  File "/usr/local/lib/python3.8/site-packages/aio_pika/robust_queue.py", line 102, in unbind
    result = await super().unbind(
  File "/usr/local/lib/python3.8/site-packages/aio_pika/queue.py", line 184, in unbind
    return await asyncio.wait_for(
  File "/usr/local/lib/python3.8/asyncio/tasks.py", line 455, in wait_for
    return await fut
  File "/usr/local/lib/python3.8/site-packages/aiormq/channel.py", line 751, in queue_unbind
    return await self.rpc(
  File "/usr/local/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/usr/local/lib/python3.8/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
  File "/usr/local/lib/python3.8/site-packages/aiormq/channel.py", line 121, in rpc
    raise ChannelInvalidStateError("writer is None")
aiormq.exceptions.ChannelInvalidStateError: writer is None

I "fixed" it by acking all messages, but the problem there still remains.

kaya-zekioglu commented 2 years ago

@mosquito So, I tracked this down to being an issue with the RobustChannel not actually being robust. When a channel dies, you can call await channel.reopen() and it fixes everything, but I would expect RobustChannel to handle this itself. Instead, the channel just dies, and reopen has to be called manually. Close callbacks aren't called when a channel is closed non-gracefully, so you can't just add a callback to re-open.

You can reproduce by creating a channel, then doing something that will close the channel such as binding a queue that doesn't exist to an exchange.

I am encountering the same situation. Minimal reproduction:

import contextlib
import aio_pika

connection = await aio_pika.connect_robust(url)
exit_stack = contextlib.AsyncExitStack()
await exit_stack.enter_async_context(connection)
channel = await connection.channel()
await channel.declare_queue(subject_that_exists, passive=True)  # works fine
await channel.declare_queue(subject_that_doesnt_exist, passive=True)  # raises aiormq.exceptions.ChannelNotFoundEntity
await channel.declare_queue(subject_that_exists, passive=True)  # raises aiormq.exceptions.ChannelInvalidStateError: writer is None
await channel.reopen()
await channel.declare_queue(subject_that_exists, passive=True)  # works fine
mosquito commented 2 years ago

@kaya-zekioglu that's a good example but this doesn't reproduce that issue. robust_connection protects the network issues, but this example points to the business logic breaks. RabbitMQ closes the channel when the user performs the wrong action, and it's correct not a bug. So when you manually call the reopen() method you force the channel creation.

As in the topic described this issue raises when reconnection performs.

Of course, I can rewrite it for calls reopen when the channel is closed for any reason, but it's might have unexpected side effects.

themanifold commented 2 years ago

Of course, I can rewrite it for calls reopen when the channel is closed for any reason, but it's might have unexpected side effects.

@mosquito How about you make it optional?

mosquito commented 2 years ago

@themanifold I think it's the wrong way to done it.

kaya-zekioglu commented 2 years ago

@kaya-zekioglu that's a good example but this doesn't reproduce that issue. robust_connection protects the network issues, but this example points to the business logic breaks. RabbitMQ closes the channel when the user performs the wrong action, and it's correct not a bug. So when you manually call the reopen() method you force the channel creation.

As in the topic described this issue raises when reconnection performs.

Of course, I can rewrite it for calls reopen when the channel is closed for any reason, but it's might have unexpected side effects.

Thank you for the explanation. I understand now that what I encountered is not necessarily related to this issue.

bokolob commented 2 years ago

Hello. I have the same issue. But, my consumer starts with the no_ack flag.

In spite of this, I get 'operation none caused a channel exception precondition_failed: delivery acknowledgement on channel 1 timed out.'

Then channel is closed.

bokolob commented 2 years ago

Is it safe to have different consumers (with different queueus ) on the same channel?

mosquito commented 2 years ago

Is it safe to have different consumers (with different queueus ) on the same channel?

@bokolob yes it's a legal safe. This issue will be fixed in #381 I hope 😅.

Sorry guys it's really giant work that I will have to be done. Unfortunately, work and private life takes up more time than we would like, there is not much free time left, especially for such big changes as in #381.

ltalirz commented 1 year ago

Since https://github.com/mosquito/aio-pika/pull/436 (rework of #381) was released in aio-pika 7.0, has anyone checked whether it has resolved this issue?