celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.82k stars 922 forks source link

Failover with alternates/list of hostnames in Connection objects does not work #357

Open ngie-eign opened 10 years ago

ngie-eign commented 10 years ago

alternatives support in kombu.Connection does not cycle through the next host when it tries to connect; instead the transport layer throws socket.errors and dies, which seems to defeat the purpose of failover support:

# script -c 'set -x; for TRANSPORT in amqp pyamqp; do export TRANSPORT; nosetests -v test_kombu_alt_support.py; done'
Script started on Thu 10 Apr 2014 01:23:42 PM PDT
+ for TRANSPORT in amqp pyamqp
+ export TRANSPORT
+ nosetests -v test_kombu_alt_support.py
Creating vhost "test" ...
...done.
Creating user "test" ...
...done.
Setting permissions for user "test" in vhost "test" ...
...done.
test_kombu_alt_support.test_conn ... ok
test_kombu_alt_support.test_conn_with_alterative ... ERROR
Deleting vhost "test" ...
...done.
Deleting user "test" ...
...done.

======================================================================
ERROR: test_kombu_alt_support.test_conn_with_alterative
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/opt/python-2.7/lib/python2.7/site-packages/nose/case.py", line 197, in runTest
    self.test(*self.arg)
  File "/mnt/test_kombu_alt_support.py", line 54, in test_conn_with_alterative
    SimpleQueue(conn, 'myqueue')
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/simple.py", line 125, in __init__
    compression=compression)
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/messaging.py", line 84, in __init__
    self.revive(self._channel)
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/messaging.py", line 218, in revive
    self.declare()
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/messaging.py", line 104, in declare
    self.exchange.declare()
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/entity.py", line 163, in declare
    return self.channel.exchange_declare(
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/abstract.py", line 115, in channel
    channel = self._channel = channel()
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/utils/__init__.py", line 422, in __call__
    value = self.__value__ = self.__contract__()
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/messaging.py", line 205, in 
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/connection.py", line 773, in default_channel
    self.connection
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/connection.py", line 758, in connection
    self._connection = self._establish_connection()
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/connection.py", line 713, in _establish_connection
    conn = self.transport.establish_connection()
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/transport/librabbitmq.py", line 125, in establish_connection
    conn = self.Connection(**opts)
  File "/opt/python-2.7/lib/python2.7/site-packages/librabbitmq/__init__.py", line 188, in __init__
    self.connect()
ConnectionError: Error opening socket: hostname lookup failed

----------------------------------------------------------------------
Ran 2 tests in 5.799s

FAILED (errors=1)
+ for TRANSPORT in amqp pyamqp
+ export TRANSPORT
+ nosetests -v test_kombu_alt_support.py
Creating vhost "test" ...
...done.
Creating user "test" ...
...done.
Setting permissions for user "test" in vhost "test" ...
...done.
test_kombu_alt_support.test_conn ... ok
test_kombu_alt_support.test_conn_with_alterative ... ERROR
Deleting vhost "test" ...
...done.
Deleting user "test" ...
...done.

======================================================================
ERROR: test_kombu_alt_support.test_conn_with_alterative
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/opt/python-2.7/lib/python2.7/site-packages/nose/case.py", line 197, in runTest
    self.test(*self.arg)
  File "/mnt/test_kombu_alt_support.py", line 54, in test_conn_with_alterative
    SimpleQueue(conn, 'myqueue')
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/simple.py", line 125, in __init__
    compression=compression)
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/messaging.py", line 84, in __init__
    self.revive(self._channel)
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/messaging.py", line 218, in revive
    self.declare()
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/messaging.py", line 104, in declare
    self.exchange.declare()
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/entity.py", line 163, in declare
    return self.channel.exchange_declare(
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/abstract.py", line 115, in channel
    channel = self._channel = channel()
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/utils/__init__.py", line 422, in __call__
    value = self.__value__ = self.__contract__()
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/messaging.py", line 205, in 
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/connection.py", line 773, in default_channel
    self.connection
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/connection.py", line 758, in connection
    self._connection = self._establish_connection()
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/connection.py", line 713, in _establish_connection
    conn = self.transport.establish_connection()
  File "/opt/python-2.7/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 112, in establish_connection
    conn = self.Connection(**opts)
  File "/opt/python-2.7/lib/python2.7/site-packages/amqp/connection.py", line 165, in __init__
    self.transport = create_transport(host, connect_timeout, ssl)
  File "/opt/python-2.7/lib/python2.7/site-packages/amqp/transport.py", line 294, in create_transport
    return TCPTransport(host, connect_timeout)
  File "/opt/python-2.7/lib/python2.7/site-packages/amqp/transport.py", line 75, in __init__
    socket.SOCK_STREAM, SOL_TCP):
gaierror: [Errno -2] Name or service not known

----------------------------------------------------------------------
Ran 2 tests in 0.871s

FAILED (errors=1)

Script done on Thu 10 Apr 2014 01:23:49 PM PDT
# cat test_kombu_alt_support.py
import os
import subprocess

from kombu import Connection
from kombu.simple import SimpleQueue

HOSTNAME = 'localhost'
VHOST = 'test'
USERID = 'test'
PASSWORD = 'test'

conn_dict = {
    'hostname': HOSTNAME,
    'userid': USERID,
    'ssl': False,
    'virtual_host': VHOST,
    'password': PASSWORD,
    'transport': os.environ.get('TRANSPORT', 'amqp'),
}

conn_dict_with_alt = dict(conn_dict, **{
    'hostname': 'a.bogus.host',
    'alternates': conn_dict['hostname'],
})

def setup():
    assert os.geteuid() == 0, 'must be root'
    commands = (
        ['rabbitmqctl', 'add_vhost', VHOST],
        ['rabbitmqctl', 'add_user', USERID, PASSWORD],
        ['rabbitmqctl', 'set_permissions', '-p', VHOST, USERID, '.*', '.*', '.*'],
    )
    try:
        for command in commands:
            subprocess.check_call(command)
    except:
        teardown()
        raise

def teardown():
    commands = (
        ['rabbitmqctl', 'delete_vhost', VHOST],
        ['rabbitmqctl', 'delete_user', USERID],
    )
    for command in commands:
        subprocess.call(command)

def test_conn():
    with Connection(**conn_dict) as conn:
        SimpleQueue(conn, 'myqueue')

def test_conn_with_alterative():
    with Connection(**conn_dict_with_alt) as conn:
        SimpleQueue(conn, 'myqueue')
#
ngie-eign commented 10 years ago

I tried adding a conn.ensure_connection(max_retries=len(conn.alt)) before the SimpleQueue call, but that failed too.

ngie-eign commented 10 years ago

I figured it out. The "problem" is that Connection.switch always assumes it's dealing with a URL, not a hostname. I have a patch to enhance the code to allow alternates to be a list of hostnames that I'll provide in my open pull request.