Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

Catching this error after RabbitMQ server is stopped: Connection lost exc=ConnectionResetError(104, 'Connection reset by peer') #147

Closed MaxOvcharov closed 6 years ago

MaxOvcharov commented 7 years ago

I run my consumer:

# -*- coding: utf-8 -*-
"""
    Webhook service for sending message from
    mobile clients to PGTA via mobile backend
"""
import aiohttp
import aioamqp
import asyncio
import async_timeout
import functools
import os

from utils import load_config
from settings import BASE_DIR, WH_ADDRESS, logger, parse_args_for_init_worker

opt = parse_args_for_init_worker()
WH_TIMEOUT = opt.wh_timeout
WORKERS_NUM = opt.worker_number

async def send_webhook(body, properties):
    wh_is_send = False
    logger.debug("Received msg: %r" % body)
    try:
        async with aiohttp.ClientSession() as session:
            for _ in range(5):
                try:
                    with async_timeout.timeout(1, loop=session.loop):
                        async with session.get(WH_ADDRESS) as resp:
                            if resp.status == 200:
                                wh_is_send = True
                                break
                    logger.debug("PGTA STATUS CODE: %s. Sleep: %s sec."
                                 % (resp.status, WH_TIMEOUT))
                    await asyncio.sleep(WH_TIMEOUT)
                except aiohttp.client_exceptions.ClientConnectorError as e:
                    logger.debug("Handle ERROR: %s. \nSleep: %s sec" % (e, WH_TIMEOUT))
                    await asyncio.sleep(WH_TIMEOUT)
                    continue
            if wh_is_send:
                logger.debug("SENDING SUCCESS: MSG_ID - %s" % properties.message_id)
            else:
                logger.debug("SENDING FAIL: MSG_ID - %s" % properties.items())
    except Exception as e:
        logger.debug("Handle ERROR: %s" % e)

@asyncio.coroutine
def callback(channel, body, envelope, properties):
    yield from send_webhook(body, properties)
    yield from channel.basic_client_ack(delivery_tag=envelope.delivery_tag)

@asyncio.coroutine
def worker(conf):
    rb_conf = conf.get('rabbitmq')
    try:
        transport, protocol = yield from aioamqp.connect(host=rb_conf['host'],
                                                         port=rb_conf['port'],
                                                         login=rb_conf['login'],
                                                         password=rb_conf['password'])
        logger.debug("WORKER IS CREATED")
    except aioamqp.AmqpClosedConnection as e:
        logger.error('Handle ERROR: %s' % e)
        return

    channel = yield from protocol.channel()
    yield from channel.queue(queue_name='client_msg_q', durable=True)
    yield from channel.basic_qos(prefetch_count=1, prefetch_size=0, connection_global=False)
    yield from channel.basic_consume(callback, queue_name='client_msg_q')

def main():
    # load config from yaml file
    conf = load_config(os.path.join(BASE_DIR, "config/settings.yml"))
    loop = asyncio.get_event_loop()
    try:
        w = asyncio.coroutine(functools.partial(worker, conf))
        loop.run_until_complete(w())
        loop.run_forever()
    except ConnectionRefusedError as e:
        logger.error('Handle ERROR: %s' % e)
        loop.close()
    finally:
        loop.close()

if __name__ == '__main__':
    main()

How can I restart my current worker, after RabbitMQ server start again. In this version of my consumer code I have to restart it after every server restarts.

MaxOvcharov commented 7 years ago

Now I have another question. If I have cluster of two nodes and after that one node is crushed, how can I to reconnect to another node in my current script?

singulared commented 6 years ago

@MaxOvcharov I think you should use some load-balancer like haproxy for this case.

RemiCardona commented 6 years ago

Sorry for the long wait.

To complement what @singulared said, this feature is beyond the scope of both aioamqp and rabbit itself. There are a number of ways of doing this: you could implement a round robin yourself in your consumer program, you could use various DNS tricks, you could use a TCP load balancer, and possibly another dozen solutions and variants.

Cheers

avarf commented 4 months ago

Hi, I have the exact same problem, can you guide me to an example of any of those solutions? The program hangs at the connection close and the exceptions in my own code is not catching it and the logs show that it just hangs inside aioamqp:

2024-02-20 12:27:39,835 ~~INFO  ~~__main__             ~~Awaiting RPC requests ~~[processor.py:376]
2024-02-20 12:27:50,768 ~~WARNING  ~~aioamqp.protocol     ~~Connection lost exc=ConnectionResetError(104, 'Connection reset by peer') ~~[protocol.py:115]
2024-02-20 12:27:50,769 ~~INFO  ~~aioamqp.protocol     ~~Close connection ~~[protocol.py:314]

The only solution is to kill my component so it starts again.

My code:

  while True: #reconnection loop

    try:
      transport, protocol = await aioamqp.connect(host=o.hostname, port=o.port, 
                                                  login=o.username, password=o.password,
                                                  virtualhost=o.path[1:])

      channel = await protocol.channel()

      await channel.queue_declare(queue_name=queue_name)
      #bind to shared queue
      await channel.queue_bind(queue_name=queue_name, exchange_name=exchange, routing_key=queue_name)
      #bind to unique queue
      await channel.queue_bind(queue_name=queue_name, exchange_name=exchange, routing_key=service_id)
      await channel.basic_consume(on_request, queue_name=queue_name)

      log.info('Awaiting RPC requests')

      #wait till on_close event
      await on_close.wait()

      log.info('close amqp connection')
      await protocol.close()
      transport.close()
    except (OSError, aioamqp.exceptions.AmqpClosedConnection) as e: #AMQP server not running or starting up
      log.error('AMQP connection error: %s', e)
      if not max_attempts:
        raise ProcessorException('AMPQ error: max attempt reached: %s', e)
      log.info('retry in %s seconds', backoff)        
      await asyncio.sleep(backoff)

      max_attempts-=1
      backoff*=2
    except ConnectionResetError as e:
        log.error('Connection reset by broker: %s', e)
        log.info('Retrying indefinitely until the broker is up...')
        await asyncio.sleep(retry_interval)  # Adjust the retry interval as needed
    except Exception as e:
        log.error('Unexpected exception: %s', e)
        import traceback
        traceback.print_exc()
    else:
      log.info("inside break")
      break
  log.info("after while")