galaxyproject / pulsar

Distributed job execution application built for Galaxy
https://pulsar.readthedocs.io
Apache License 2.0
37 stars 50 forks source link

Allow recovery from rabbitmq restart #324

Closed mvdbeek closed 1 year ago

mvdbeek commented 1 year ago

ConnectionForced seems very deliberate and may not fix all of https://github.com/galaxyproject/pulsar/issues/316, but a controlled restart of a rabbitmq server results in:

Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]: Exception in thread consume-kill-pyamqp://pulsar_au:********@gat-4.eu.galaxy.training:5671//pulsar/pulsar_au?ssl=1:
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]: Traceback (most recent call last):
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:   File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:     self.run()
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:   File "/usr/lib/python3.10/threading.py", line 953, in run
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:     self._target(*self._args, **self._kwargs)
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:   File "/mnt/pulsar/venv/lib/python3.10/site-packages/pulsar/messaging/bind_amqp.py", line 52, in drain
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:     __drain(name, queue_state, pulsar_exchange, callback)
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:   File "/mnt/pulsar/venv/lib/python3.10/site-packages/pulsar/messaging/bind_amqp.py", line 100, in __drain
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:     pulsar_exchange.consume(name, callback=callback, check=queue_state)
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:   File "/mnt/pulsar/venv/lib/python3.10/site-packages/pulsar/client/amqp_exchange.py", line 119, in consume
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:     connection.drain_events(timeout=self.__timeout)
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:   File "/mnt/pulsar/venv/lib/python3.10/site-packages/kombu/connection.py", line 318, in drain_events
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:     return self.transport.drain_events(self.connection, **kwargs)
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:   File "/mnt/pulsar/venv/lib/python3.10/site-packages/kombu/transport/pyamqp.py", line 101, in drain_events
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:     return connection.drain_events(**kwargs)
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:   File "/mnt/pulsar/venv/lib/python3.10/site-packages/amqp/connection.py", line 522, in drain_events
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:     while not self.blocking_read(timeout):
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:   File "/mnt/pulsar/venv/lib/python3.10/site-packages/amqp/connection.py", line 528, in blocking_read
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:     return self.on_inbound_frame(frame)
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:   File "/mnt/pulsar/venv/lib/python3.10/site-packages/amqp/method_framing.py", line 53, in on_frame
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:     callback(channel, method_sig, buf, None)
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:   File "/mnt/pulsar/venv/lib/python3.10/site-packages/amqp/connection.py", line 534, in on_inbound_method
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:     return self.channels[channel_id].dispatch_method(
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:   File "/mnt/pulsar/venv/lib/python3.10/site-packages/amqp/abstract_channel.py", line 143, in dispatch_method
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:     listener(*args)
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:   File "/mnt/pulsar/venv/lib/python3.10/site-packages/amqp/connection.py", line 664, in _on_close
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]:     raise error_for_code(reply_code, reply_text,
Apr 20 09:20:12 gat-24.oz.galaxy.training pulsar[71616]: amqp.exceptions.ConnectionForced: (0, 0): (320) CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'

and this fixes that.