jasonrbriggs / stomp.py

“stomp.py” is a Python client library for accessing messaging servers (such as ActiveMQ or RabbitMQ) using the STOMP protocol (versions 1.0, 1.1 and 1.2). It can also be run as a standalone, command-line client for testing.
Apache License 2.0
495 stars 166 forks source link

Receiver loop ended when reconnecting multiple times #337

Open deepsworld opened 3 years ago

deepsworld commented 3 years ago

The re connection works as expected for the first disconnection but the receiver loop ends if the broker gets disconnected again. is this an expected behavior. I am trying to have consistent connection to broker. The connection needs to reconnect unlimited times in case of broker restart. Please let me know if you need more information on this. Thanks!

Broker restarts: disconnected

ConnectionRefusedError: [Errno 111] Connection refused INFO:stomp.py:Attempting connection to host <>, port 61614 WARNING:stomp.py:Could not connect to host <>, port 61614 Traceback (most recent call last): File "/home/ml/dpatel/miniconda3/envs/sinet37/lib/python3.7/site-packages/stomp/transport.py", line 737, in attempt_connection self.socket = socket.create_connection(host_and_port, self.__timeout) File "/home/ml/dpatel/miniconda3/envs/sinet37/lib/python3.7/socket.py", line 728, in create_connection raise err File "/home/ml/dpatel/miniconda3/envs/sinet37/lib/python3.7/socket.py", line 716, in create_connection sock.connect(sa) ConnectionRefusedError: [Errno 111] Connection refused INFO:stomp.py:Attempting connection to host <>, port 61614 WARNING:stomp.py:Could not connect to host <>, port 61614 Traceback (most recent call last): File "/home/ml/dpatel/miniconda3/envs/sinet37/lib/python3.7/site-packages/stomp/transport.py", line 737, in attempt_connection self.socket = socket.create_connection(host_and_port, self.__timeout) File "/home/ml/dpatel/miniconda3/envs/sinet37/lib/python3.7/socket.py", line 728, in create_connection raise err File "/home/ml/dpatel/miniconda3/envs/sinet37/lib/python3.7/socket.py", line 716, in create_connection sock.connect(sa) ConnectionRefusedError: [Errno 111] Connection refused

Reconnection okay

INFO:stomp.py:Attempting connection to host <>, port 61614 INFO:stomp.py:keepalive: autodetected linux-style support INFO:stomp.py:keepalive: activating linux-style support INFO:stomp.py:keepalive: using system defaults INFO:stomp.py:keepalive: set 'enable' option to 1 on socket INFO:stomp.py:Established connection to host <>, port 61614 INFO:stomp.py:Starting receiver loop (<Thread(Thread-2, started daemon 140292711454464)>) INFO:stomp.py:Created thread <Thread(Thread-2, started daemon 140292711454464)> using func <function default_create_thread at 0x7f9888a05680> INFO:stomp.py:Sending frame: 'STOMP' INFO:stomp.py:Received frame: 'CONNECTED', len(body)=0 INFO:stomp.py:Sending frame: 'SUBSCRIBE' [11/16/2020 15:22:17] INFO: Listening to Detecting queue with selector: JMSCorrelationID IN ('1')

Broker restarts again: the receiver loop ended INFO:stomp.py:Receiver loop ended

FYI: this is the wrapper we are using to re connect. The listener on_disconnected calls the reconnect function.

class Broker(object):
    """Wrapper around the original stomp connection with setup and reconnect with credentials
    """
    def __init__(self, server, user, passwd, port=61614):
        self.server = server
        self.port = port
        self.user = user
        self.passwd = passwd
        self.conn = self.setup(server, port, user, passwd)
        self.subscriptions = {}

    def get_listener(self, name):
        return self.conn.get_listener(name)

    def set_listener(self, listener_name, listener):
        self.conn.set_listener(listener_name, listener)

    def setup(self, server, port, user, passwd):
        conn = stomp.Connection(
            host_and_ports=[(server, port)],
            auto_decode=True,
            keepalive=True,
            reconnect_attempts_max=-1,
            reconnect_sleep_initial=2,
            reconnect_sleep_increase=2,
            reconnect_sleep_jitter=0.5
        )  # heartbeats=(4000, 4000)
        conn.set_ssl(for_hosts=[(server, port)], ssl_version=ssl.PROTOCOL_TLS)
        logging.info(f'Connecting to {server}:{port} with user: {user}')
        conn.connect(user, passwd, wait=True)
        return conn

    def reconnect(self):
        self.conn.connect(self.user, self.passwd, wait=True)
djairhogeuens commented 3 years ago

I have encountered exactly the same issue and this was also quite unexpected for me... I had to set up a work-around to handle this.

deepsworld commented 3 years ago

@djairhogeuens If you don't mind, can you please share the workaround. is it to restart the broker thread and reconnect?

xiandong79 commented 3 years ago

@djairhogeuens If you don't mind, can you please share the workaround.

D [2020-11-30 21:36:57,859] [1606743417.859224] nothing received, raising CCE
I [2020-11-30 21:36:57,859] [1606743417.859558] Receiver loop ended

yes, the re-connect does not work sometimes.

djairhogeuens commented 3 years ago

@deepsworld @xiandong79 For a publisher, the workaround I implemented was to catch the NotConnectedException when trying to send a message to the broker and then calling the connect function again. I did not register any listener for the connection. For a subscriber, I did register a listener for the connection and implemented the on_disconnected handler in the listener to create an entirely new connection instance and reconnect.

This results in a stable retry mechanism on both producer and subscriber side for me that can always recover from disconnects.

A structural fix would be welcome though.

deepsworld commented 3 years ago

I just noticed there is listener function for receiver loop as well which could be used to reconnect the broker. Haven't tested with it yet though.

def on_receiver_loop_completed(self, frame):
        """
        Called when the connection receiver_loop has finished.
        """
        pass