mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

raise RuntimeError('Event loop is closed') #445

Open nuri35 opened 2 years ago

nuri35 commented 2 years ago

i am getting this error how can i solve it Consumers started Consumer 0: Queue empty. Stopping. Consumer 1: Queue empty. Stopping. Consumer 2: Queue empty. Stopping. Consumer 3: Queue empty. Stopping. Consumer 4: Queue empty. Stopping. Consumer 5: Queue empty. Stopping. Consumer 7: Queue empty. Stopping. Consumer 6: Queue empty. Stopping. Consumer 9: Queue empty. Stopping. Consumer 8: Queue empty. Stopping. Exception ignored in: <function _ProactorBasePipeTransport.del at 0x000001E41C5EEEF0> Traceback (most recent call last): File "C:\Users\LENOVO\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 116, in del self.close() File "C:\Users\LENOVO\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 108, in close self._loop.call_soon(self._call_connection_lost, None) File "C:\Users\LENOVO\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 745, in call_soon self._check_closed() File "C:\Users\LENOVO\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 510, in _check_closed raise RuntimeError('Event loop is closed') RuntimeError: Event loop is closed

and How can I run the publish function inside the consume function?

async def consume(consumer_id) -> None:

    async with channel_pool.acquire() as channel:  # type: aio_pika.Channel

        queue = await channel.declare_queue(
            queue_name, durable=False, auto_delete=False,
        )

    while True :
        try:
            m = await queue.get(timeout=300 * 10)
            message = m.body.decode('utf-8')

            try :
                j = json.loads(message)
                print(j)
            except Exception as e:
                print("error" % (e,))
                raise e

        except aio_pika.exceptions.QueueEmpty:
            print("Consumer %s: Queue empty. Stopping." % consumer_id)
            break
nuri35 commented 2 years ago

@mosquito ?

mosquito commented 2 years ago

I can't give an answer for this example. The example should be at least a runable script. If we take the code from the examples, then there are no such errors in it?

nuri35 commented 2 years ago

@mosquito https://github.com/nuri35/template-validation-1/blob/master/rabbitmq_validator/__init__.py this is my example but give aio_pika.exceptions.QueueEmpty

My goal is to ensure that data from 2 different queues goes to a single consumer How can I run the publisher function inside after running the consumer function? @mosquito