benjamin-hodgson / asynqp

An AMQP library for asyncio
MIT License
84 stars 29 forks source link

do not pass host and port to loop.create_connection when there's a sock #14

Closed urbaniak closed 9 years ago

urbaniak commented 9 years ago

asyncio create_connection allows to pass a sock param with already open file descriptor number to use it instead of opening new connection to host/port.

It's raising an Exception: "host/port and sock can not be specified at the same time" when trying to pass sock as a keyword argument to asynqp.connect().

That's an easy way to allow both approaches work fine and be compliant with asyncio api.

Passing open socket descriptor is a common practice to restart an application without loosing any data from already open socket - it's used by gunicorn, uwsgi, nginx, etc.

gkocjan commented 9 years ago

:+1:

benjamin-hodgson commented 9 years ago

Looks great! :+1: Thanks very much for the contribution. Please could you add a unit test so that future commits don't accidentally cause regressions on this issue? Some simple instructions on how to get set up with the tests are in CONTRIBUTING.md (I can help you get set up if you run into problems :smile: )

urbaniak commented 9 years ago

Generally the integration test (cause it's not possible to unit test that, it's just about passing right params) for that should look like:

    transport = self.connection.channel_factory.protocol.transport

    # self.connection is pausing reading
    transport.pause_reading()

    # make a new asynqp.connection using the existing fd
    sock = transport._sock_fd
    new_connection = self.loop.run_until_complete(asyncio.wait_for(asynqp.connect(sock=sock), 0.2))

Have looked for a while on the integration_tests.py, but I'm not familiar at all with the contexts test framework.

Can you give some hints how that should be integrated there?

benjamin-hodgson commented 9 years ago

Something like this ought to do the trick (I just pasted and adjusted an existing, similar test):

class WhenConnectingWithAnExistingSocket:
    def given_the_loop_and_a_socket(self):
        self.loop = asyncio.get_event_loop()
        self.sock = socket.socket()  # set up the socket appropriately

    def when_I_connect(self):
        self.connection = self.loop.run_until_complete(asyncio.wait_for(asynqp.connect(sock=self.sock), 0.2))

    def it_should_connect(self):
        assert self.connection is not None

    def cleanup_the_connection(self):
        self.loop.run_until_complete(asyncio.wait_for(self.connection.close(), 0.2))

I am a little confused about what you're trying to achieve in your example. It looks like you are trying to set up a second Connection which uses the same socket as an existing Connection. That doesn't seem like a good idea to me. Why are you trying to do that?

urbaniak commented 9 years ago

The first connection stops receiving data (pause_reading method), then we're spawning a new instance of deamon (fork and execvp) and passing the socket (using environment variable) the new instance (just a fd number), which starts receiving data. The first instance is then killed.

That's the similar approach as gunicorn is using to achieve zero-downtime upgrades.

Haven't spawn the new processes, etc here, cause we're only testing passing the socked fd to new connection instance in the same process.

benjamin-hodgson commented 9 years ago

Okay, that makes sense. It might be worth exposing transport and protocol as (public, documented) attributes on Connection for that sort of reason. Have you had any luck getting a test to work?

urbaniak commented 9 years ago

Have found other issues with re-using the previous socket (authentication, subscription to queues) and decided to not go that way, so closing the issue.

benjamin-hodgson commented 9 years ago

No problem. Thanks for the contribution. I will probably cherry-pick your commit and add my own tests when I get time for it (since I think passing an already-open socket is still useful).