Open yuxifu opened 6 years ago
Hello! Please share the code you are using. I will re-open this issue if I can reproduce. Thanks!
It's not always reproducible. Basically this is what I do:
def consume_until_empty_or_expire(self, channel, callback=None, expires_sec=0, echo=False):
"""
consume the messages in the queue until no message available
callback: func(event_data)
expires_sec: quit consuming after certain time. <=0, never expires.
"""
start_time = time.time()
if callback is None:
callback = self._default_consume_until_empty_callback
total = 0
processed = 0
returned = 0
skipped = 0
message_count = 0
while True:
elapsed_time = time.time() - start_time
if expires_sec > 0 and elapsed_time >= expires_sec:
return {
'success': True,
'total': total,
'processed': processed,
'returned': returned,
'skipped': skipped,
'message_count': message_count,
'elapsed_time_sec': elapsed_time
}
method_frame, header_frame, body = channel.basic_get(
queue=self.queue_name)
if method_frame is None or method_frame.NAME == 'Basic.GetEmpty':
return {
'success': True,
'total': total,
'processed': processed,
'returned': returned,
'skipped': skipped,
'message_count': message_count,
'elapsed_time_sec': elapsed_time
}
else:
event_data = {}
event_data['frame'] = vars(method_frame)
event_data['header'] = vars(header_frame)
event_data['body'] = json.loads(body)
message_count = event_data['frame']['message_count']
total += 1
result = callback(event_data)
if result == EventProcessingResult.PROCESSED:
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
processed += 1
if result == EventProcessingResult.SKIPPED:
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
skipped += 1
if result == EventProcessingResult.RETURED:
returned += 1
if echo:
print event_data['frame']['routing_key'] + ': ' + result
Nothing really fancy. Probably this is related:
if instance.queue_ttl_min > 0:
queue_args = {"x-expires": instance.queue_ttl_min * 60000}
channel.queue_declare(
instance.queue_name,
durable=instance.queue_durable,
arguments=queue_args)
I did not really see the problem until started to use "x-expires". Could be a coincidence, though.
Hi, I'm getting the same messages : Duplicate callback found for "0:Connection.Unblocked"
did you solve this?
@benizri-ofir - please provide code that can reproduce this, or at the very least, a working set of code or steps that might reproduce the issue.
Hi did you resolve that bug? I'm getting same stuff with BlockingConnection
@nkborisov no, I never received the requested information. There is no evidence at this point of a bug in Pika.
@lukebakken Thank you for your answer So let me try to describe my case, throught i'm not sure, that it's exactly a bug of pika. Perhaps it's some kind of wrong using scenario or something like that. I have following simple code:
class QueueConnection(object):
def __init__(self, pair_node_id, conn_params, msg_queue, is_master):
self.is_master = is_master
self.reconnect_delay = 1 # sec
self.node_id = pair_node_id
self.thread = None
self.is_running = False
self.connection = None
self.attempts_count = 1
self.channel = None
self.conn_params = conn_params
self.msg_queue = msg_queue
self.queue_name = None
@staticmethod
def retrieve_queue_name(node_id, pair_node_id, is_master):
nodes = sorted([int(node_id), int(pair_node_id)])
queue_name = "{}_{}_".format(nodes[0], nodes[1])
queue_name += "TX_QUEUE" if is_master else "RX_QUEUE"
return queue_name
def start(self):
self._connect(self.conn_params)
if self.is_running:
raise RuntimeError("{} is already running for node #{}".format(self.__class__.__name__, self.node_id))
self.is_running = True
self.thread = Thread(target=self._loop, name="Thread [{}({})]".format(self.__class__.__name__, self.node_id))
self.thread.start()
def stop(self):
if not self.is_running or self.thread is None:
return
self._disconnect()
self.is_running = False
try:
self.thread.join(timeout=30)
except RuntimeError:
log.warning("Thread[{}({})] join timeout".format(self.__class__.__name__, self.node_id))
def _connect(self, conn_params):
self.connection = pika.BlockingConnection(conn_params)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue_name, auto_delete=False, durable=False)
self.channel.basic_qos(prefetch_count=1)
def _disconnect(self):
if not self.connection.is_closed:
self.channel.cancel()
self.channel.close()
self.connection.close()
self.channel.queue_delete(queue=self.queue_name)
def _reconnect(self):
log.info('Connection recovering {} with node #{}, attempt {}...'.format(self.__class__.__name__,
self.node_id,
self.attempts_count))
try:
log.info('Connection was deleted')
self._connect(self.conn_params)
except Exception as e:
log.error('Exception caught while reconnecting attempt: {}: {}'
' (next will be performed after {} seconds...)'.format(e, e.message,
self.reconnect_delay))
sleep(self.reconnect_delay)
self.attempts_count += 1
return
self.attempts_count = 1
log.info('Connection {} with node #{} was successfully recovered'.format(self.__class__.__name__,
self.node_id))
def _process_queue(self):
raise NotImplementedError()
def _loop(self):
assert self.connection is not None
loop_name = self.__class__.__name__
while self.is_running:
if self.connection.is_open:
try:
self._process_queue()
except pika.exceptions.ConnectionClosed:
continue
except pika.exceptions.AMQPChannelError as exc:
log.error("{} channel error {}".format(loop_name, exc))
continue
except pika.exceptions.AMQPConnectionError as exc:
log.error("{} connection error {}".format(loop_name, exc))
continue
except NotImplementedError:
log.error("{} process_queue method isn't implemented".format(loop_name))
self.is_running = False
self._disconnect()
elif self.connection.is_closed:
self._reconnect()
That class is base for two inheritances - SenderConnection and ReceiverConnection (just dummy handlers of user queues). Top-level code has been creating one instance of each class with following connection parameters:
conn_params = pika.ConnectionParameters(host=rabbit_host, port=int(rabbit_port), socket_timeout=1, heartbeat=server_heartbeat_interval, blocked_connection_timeout=server_heartbeat_interval * 2, credentials=ExternalCredentials(), ssl=True, ssl_options=ssl_opts)
Next in RabbitMQ management plugin has been reproducing connection failure situation by force closing. As a result i got following log every time:
2020-01-14 07:50:48,855|WRN|7f25c6a9a700|pika.callback|Duplicate callback found for "0:Connection.Blocked" 2020-01-14 07:50:48,856|WRN|7f25c6a9a700|pika.callback|Duplicate callback found for "0:Connection.Unblocked" 2020-01-14 07:50:48,856|ERR|7f25c6a9a700|pika.adapters.blocking_connection|Connection close detected; result=BlockingConnection__OnClosedArgs(connection=<SelectConnection CLOSED socket=None params=
>, reason_code=320, reason_text='CONNECTION_FORCED - Closed via management plugin') 2020-01-14 07:50:48,857|WRN|7f25c6a9a700|queue_connection|ReceiverQueueConnection connection with node #2 lost 2020-01-14 07:50:48,857|INF|7f25c6a9a700|queue_connection|Connection recovering ReceiverQueueConnection with node #2, attempt 1... 2020-01-14 07:50:48,859|INF|7f25c6a9a700|queue_connection|Connection was deleted 2020-01-14 07:50:48,903|INF|7f25c6a9a700|queue_connection|Connection ReceiverQueueConnection with node #2 was successfully recovered
"Duplicate callback" log warning appears every time just before expected message about the connection closing event. By debugging i understood that the warning message always appears before any calls of BlockingConnection constructors and destructors. It seems very strange for me.
I wouldn't worry too much about it. See #894 and #192
At some point I may have time to track that warning down to fix it. Not any time soon.
The problem occurred also in my case (for both blocked/unblocked callbacks). Did some additional tracing and looks like during the Connection::_on_terminate call block/unblock callbacks are unregistered and a connection reset is performed - where the block/unblock callbacks are re-registered. However the unregister callback from Callback::remove has a bug - in case you do not provide a callback_value - it will not remove all callbacks for the provided prefix/key - as mentioned in the documentation
def remove(self, prefix, key, callback_value=None, arguments=None):
"""Remove a callback from the stack by prefix, key and optionally
the callback itself. **If you only pass in prefix and key, all
callbacks for that prefix and key will be removed.**
:param str or int prefix: The prefix for keeping track of callbacks with
:param str key: The callback key
:param method callback_value: The method defined to call on callback
:param dict arguments: Optional arguments to check
:rtype: bool
"""
if callback_value:
offsets_to_remove = list()
for offset in xrange(len(self._stack[prefix][key]), 0, -1):
callback_dict = self._stack[prefix][key][offset - 1]
if (callback_dict[self.CALLBACK] == callback_value and
self._arguments_match(callback_dict, [arguments])):
offsets_to_remove.append(offset - 1)
for offset in offsets_to_remove:
try:
LOGGER.debug('Removing callback #%i: %r', offset,
self._stack[prefix][key][offset])
del self._stack[prefix][key][offset]
except KeyError:
pass
self._cleanup_callback_dict(prefix, key)
return True
The solution would be to call the remove_all method in case the callback_value is not provided
def remove(self, prefix, key, callback_value=None, arguments=None):
"""Remove a callback from the stack by prefix, key and optionally
the callback itself. If you only pass in prefix and key, all
callbacks for that prefix and key will be removed.
:param str or int prefix: The prefix for keeping track of callbacks with
:param str key: The callback key
:param method callback_value: The method defined to call on callback
:param dict arguments: Optional arguments to check
:rtype: bool
"""
if callback_value:
offsets_to_remove = list()
for offset in xrange(len(self._stack[prefix][key]), 0, -1):
callback_dict = self._stack[prefix][key][offset - 1]
if (callback_dict[self.CALLBACK] == callback_value and
self._arguments_match(callback_dict, [arguments])):
offsets_to_remove.append(offset - 1)
for offset in offsets_to_remove:
try:
LOGGER.debug('Removing callback #%i: %r', offset,
self._stack[prefix][key][offset])
del self._stack[prefix][key][offset]
except KeyError:
pass
self._cleanup_callback_dict(prefix, key)
**else:
self.remove_all(prefix, key)**
return True
Apparently this problem was introduced with #192
We are using
BlockingConnection
andbasic_get
and no callback function. How come we got a lot of Duplicate callback found for "0:Connection.Unblocked" errors? We use pika 0.11.2Any idea? or thoughts?