eandersson / amqpstorm

Thread-safe Python RabbitMQ Client & Management library
https://www.amqpstorm.io/
MIT License
186 stars 36 forks source link

Connection was closed by remote server: CONNECTION_FORCED #35

Closed cp2587 closed 7 years ago

cp2587 commented 7 years ago

Hello,

We recently updated the library version to 2.1.3 (from 1.1.7) and we now face several errors we did not have previously. One of them is the following:

    self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
  File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/basic.py", line 194, in publish
    self._channel.write_frames(frames_out)
  File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 326, in write_frames
    self.check_for_errors()
  File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 169, in check_for_errors
    self._connection.check_for_errors()
  File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/connection.py", line 155, in check_for_errors
    raise self.exceptions[0]
AMQPConnectionError: Connection was closed by remote server: CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'

On the rabbitmq server, we have the following logs: client unexpectedly closed TCP connection

Finally here is the code i use to create an amqp socket (this socket is then used by logging to send log in the queue) :

class AMQPStormSocket(object):

    def __init__(self, host, port, username, password, virtual_host, exchange, queue, exchange_is_durable,
                 queue_is_durable, exchange_type, fallback_call):

        # create connection & channel
        self.connection = amqpstorm.Connection(host, username, password, port, virtual_host=virtual_host, timeout=1)
        self.channel = self.connection.channel()

        # create an exchange, if needed
        self.channel.exchange.declare(exchange=exchange, exchange_type=exchange_type, durable=exchange_is_durable)
        # create a queue, if needed
        self.channel.queue.declare(queue=queue, durable=queue_is_durable, passive=False, auto_delete=False)
        # bind it
        self.channel.queue.bind(queue=queue, exchange=exchange)

        # needed when publishing
        self.exchange = exchange

        self.fallback_call = fallback_call

    def sendall(self, data):
        try:
            self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
        except Exception as e:
            self.fallback_call(e)

    def close(self):
        try:
            self.channel.close()
            self.connection.close()
        except Exception:
            pass

Do you have an idea on how to fix these errors ?

eandersson commented 7 years ago

Interesting - have you tried downgrading to see if the errors really goes away with 1.1.7?

eandersson commented 7 years ago

Technically broker forced connection closure with reason 'shutdown' means that the RabbitMQ server was shutdown. Is RabbitMQ itself maybe crashing?

cp2587 commented 7 years ago

No the instance is completely fine and this error only happens on some of the servers (the one that are running amqp storm inside their celery task actually). I will try to downgrade

cp2587 commented 7 years ago

Also sometimes i have a different error: AMQPConnectionError: Connection was closed by remote server: CHANNEL_ERROR - expected 'channel.open'

eandersson commented 7 years ago

I'll see if I can reproduce this tonight.

Some general questions that could help to narrow this down.

cp2587 commented 7 years ago

Tank you very much for your help. I am using rabbitmq 3.6.6. Server are on the same network but not same host (they use a private IP address to communicate). I am using amqpstorm because of its thread safetyness (running my logger inside sub thread, i need to have the socket thread safe).

Here is the full code for the logger + socket:

class AMQPLogstashHandler(SocketHandler):
    "AMQP Log Format handler

    In case rabbitmq is down, this will retry to use it every 30 sec until it is up again

    :param host: AMQP host (default 'localhost')
    :param port: AMQP port (default 5672)
    :param username: AMQP user name (default 'guest', which is the default for
        RabbitMQ)
    :param password: AMQP password (default 'guest', which is the default for
        RabbitMQ)

    :param exchange: AMQP exchange. Default 'logging.gelf'.
        A queue binding must be defined on the server to prevent
        log messages from being dropped.
    :param exchange_type: AMQP exchange type (default 'fanout').
    :param durable: AMQP exchange is durable (default False)
    :param virtual_host: AMQP virtual host (default '/').

    :param tags: list of tags for a logger (default is None).
    :param message_type: The type of the message (default logstash).
    :param version: version of logstash event schema (default is 0).

    :param extra_fields: Send extra fields on the log record to graylog
        if true (the default)
    :param fqdn: Use fully qualified domain name of localhost as source
        host (socket.getfqdn()).
    :param facility: Replace facility with specified value. If specified,
        record.name will be passed as `logger` parameter.
   "

    def __init__(self, sendgrid_credentials, host='localhost', port=5672, username='guest', password='guest',
                 exchange='logstash', queue='logstashQueue', exchange_type='fanout', virtual_host='/',
                 message_type='logstash', tags=None, durable=True, queue_is_durable=True, extra_fields=True,
                 fqdn=False, facility=None):

        SocketHandler.__init__(self, host, port)

        # AMQP parameters
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.exchange_type = exchange_type
        self.exchange = exchange
        self.exchange_is_durable = durable
        self.queue = queue
        self.queue_is_durable = queue_is_durable
        self.virtual_host = virtual_host

        # retry when rabbitmq gets unavailable
        self.closeOnError = 1

        # Extract Logstash paramaters
        self.tags = tags or []
        self.formatter = WiremindLogstashFormatter(message_type, tags, fqdn)

        # Standard logging parameters
        self.extra_fields = extra_fields
        self.fqdn = fqdn
        self.facility = facility

        # fallback
        self.fallback_handler = WiremindMailHandler(
            mailhost='smtp.sendgrid.net', fromaddr='no-reply@wiremind.fr', toaddrs=['dev@wiremind.fr'],
            subject='logstash critical', credentials=sendgrid_credentials)
        self.fallback_timeout = 10 * 60 # 10 min
        self.fallback_called_time = None

    def makeSocket(self, **kwargs):
        try:
            return AMQPStormSocket(self.host, self.port, self.username, self.password, self.virtual_host, self.exchange,
                                   self.queue, self.exchange_is_durable, self.queue_is_durable,
                                   self.exchange_type, self.fallback_call)
        except Exception as e:
            self.fallback_call(e)

    def makePickle(self, record):
        return self.formatter.format(record)

    def fallback_call(self, exception):
        # Sent an email to alert that something went bad with logstash AMQP
        if not self.fallback_called_time or time.time() - self.fallback_called_time > self.fallback_timeout:
            # condition prevent infinite loop (for instance when redis is not activated
            self.fallback_called_time = time.time()
            fn, lno, func = logger.findCaller()
            msg = 'Failed to connect to logstash AMQP on %s:\n %s' % (socket.gethostname(), exception)
            record = logger.makeRecord(self.name, logging.CRITICAL, fn, lno, msg, [], sys.exc_info(), func, {})
            self.fallback_handler.emit(record)
        raise socket.error(safe_str(exception))  # make sure SocketHandler correctly do its thing

class AMQPStormSocket(object):

    def __init__(self, host, port, username, password, virtual_host, exchange, queue, exchange_is_durable,
                 queue_is_durable, exchange_type, fallback_call):

        # create connection & channel
        self.connection = amqpstorm.Connection(host, username, password, port, virtual_host=virtual_host, timeout=1)
        self.channel = self.connection.channel()

        # create an exchange, if needed
        self.channel.exchange.declare(exchange=exchange, exchange_type=exchange_type, durable=exchange_is_durable)
        # create a queue, if needed
        self.channel.queue.declare(queue=queue, durable=queue_is_durable, passive=False, auto_delete=False)
        # bind it
        self.channel.queue.bind(queue=queue, exchange=exchange)

        # needed when publishing
        self.exchange = exchange

        self.fallback_call = fallback_call

    def sendall(self, data):
        try:
            self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
        except Exception as e:
            self.fallback_call(e)

    def close(self):
        try:
            self.channel.close()
            self.connection.close()
        except Exception:
            pass

Handler is initialized by parent thread.

cp2587 commented 7 years ago

Hello,

So i tested using librabbitmq and creating a connection/channel in every subthread but i keep having similar errors so i guess your library is not faulty.

Now i think the problem comes from the fact that after a while rabbitmq close the socket if it did not received any message so i am going to try using the heartbeat functionnality.

Since i am planning to use amqpstorm, i have a few questions for you in order to better understand how i can use it in my user case. Do you mind responding to them here ? Here is the list:

Thanks for you help and your work on this library.

eandersson commented 7 years ago

I recently moved to a new country so I haven't been able to be as active as I normally would, but I can try to put together a robust example for you in the coming days (or worst case weeks).

cp2587 commented 7 years ago

Don't worry, I am starting to get my head around your code and manage to retry on connection loss so all the questions i have are more out of curiosity than to solve my original problem :p

For the no_ack=True, where do i set this ? I am not sure it is helpful in my case (i debugged and apparently when pushing on the socket a message with basic.publish, rabbitmq server does not write anything back so that's fine)... Can't i use write_frames (note the 's') to send multiple frame in a single batch ?

PS: Also congratulation on your new position :) Blizzard is it ? That's a really nice company (especially when you are a gamer ahah)

eandersson commented 7 years ago

For the no_ack=True, where do i set this ?

Sorry, for no_ack=True I was actually confusing it with confirm_deliveries which is actually not set by default.

If you call channel.confirm_deliveries() before publishing RabbitMQ would reply every-time you publish a message. It's good for reliability, but at the cost of performance.

Can't i use write_frames (note the 's') to send multiple frame in a single batch ?

You could send multiple frames in a single batch, but would require some custom code.

Maybe something like this, but I haven't had time to test it yet.

import amqpstorm
from pamqp import specification
from pamqp import header as pamqp_header
from amqpstorm.basic import Basic

messages = ['messsage1', 'message2']
exchange = 'hello'
routing_key = 'queue1'

frames = []
for body in messages:
    properties = {}
    body = Basic._handle_utf8_payload(body, properties)
    properties = specification.Basic.Properties(**properties)
    method_frame = specification.Basic.Publish(exchange=exchange,
                                               routing_key=routing_key,
                                               mandatory=False,
                                               immediate=False)
    header_frame = pamqp_header.ContentHeader(body_size=len(body),
                                              properties=properties)

    frames.append(method_frame)
    frames.append(header_frame)
    for body_frame in Basic._create_content_body(body):
        frames.append(body_frame)

connection = amqpstorm.Connection(....)
channel = connection.channel()
channel.write_frames(frames)

PS: Also congratulation on your new position :) Blizzard is it ? That's a really nice company (especially when you are a gamer ahah)

Haha thanks, I have actually been working here for 8 years! I just moved from the French office, to the US office.