Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

Ack messages with multiple connections #44

Closed thijsterlouw closed 8 years ago

thijsterlouw commented 9 years ago

Usecase: my Python application is connecting to multiple RabbitMQ servers. These are non-clustered. Our events are sharded across these servers. We want to consume events from all these servers. These events need to be ack'ed.

Problem: channel.basic_consume(queue_name, callback=callback) callback looks like: def callback(body, envelope, properties): thus inside the callback you have no idea from which server you received the event. Ticket https://github.com/Polyconseil/aioamqp/issues/39 suggests creating a separate channel to ack. Problem I don't know which server to create the channel for.

Initially I thought I just create a lambda for the callback and bind the ack-channel, but then I found: https://github.com/Polyconseil/aioamqp/commit/ff74ee7359910dd4647be58f11499e159549ce7a So that's probably not going to work out. Wonder why this change was made anyway.

I'm pretty new to Python, so I am not sure what I'm supposed to do in this case. Should I wrap these functions in objects and can the callback function point to a member-function of the object? The object would then have access to the server/channel, thus we're able to send an ack.

dzen commented 9 years ago

Hello @thijsterlouw,

Thank you for reporting your use case. This issue really shows that we didn't solve the problem correctly. For instance we should probably pass the channel instance to ack this specific message. Problem is: this channel is currently overflooded by incomming message and won't send any message to the server :/

thijsterlouw commented 9 years ago

Hi @dzen , thanks for your quick reply. I've created a little bit of Python code to test the idea, and it seems wrapping my code in Objects would be a good way to go forward. That way I can pass my context around in the objects and have the object just create a reply channel to ack the messages (like #39 suggested)

import asyncio

def tester():
    print('tester')
    x = MyClass(1)
    y = MyClass(2)
    print('created classes')
    print(asyncio.iscoroutinefunction(x.f))
    val_x = yield from x.f()
    val_y = yield from y.f()
    print('x: {0}, y: {1}'.format(val_x, val_y))

class MyClass:
    def __init__(self, i):
        self.i = i

    @asyncio.coroutine
    def f(self):
        return 'hello world {0}'.format(self.i)

if __name__ == '__main__':
    print('main')
    loop = asyncio.get_event_loop()
    loop.run_until_complete(tester())
    loop.run_forever()
    loop.stop()
thijsterlouw commented 9 years ago

I have everything setup nicely. One instance of a class for each server, which wraps the callback and provides the necessary context. Acking now works great (even when using the same channel). So for me: problem solved :)

Initially I had a small application bug where I didn't ack invalid events, but that was easily fixed.

dzen commented 8 years ago

Hi @thijsterlouw,

I tried a little thing: the callback is converted as a Task, and the channel instance is passed to the callback. This way it's possible to ack the message from the coroutine called when a message is received. The only problem is that we will create a Task for each received message. The eventloop could be very BIG if your coroutine is slow.

I've got a publisher which publish at 20k msg/s. My computer goes very slow :(

dzen commented 8 years ago

screenshot from 2015-09-29 18 15 49

dzen commented 8 years ago

@mpaolini hello. Any thought on this PR / Issue ?

mpaolini commented 8 years ago

@dzen will have a look later this weekend

dzen commented 8 years ago

Hello @thijsterlouw,

Did you tried this branch against your code ?

beware, the definition of your callback should be like:

@asyncio.coroutine
def callback(channel, body, envelope, properties):
    # stuff
    yield from channel.basic_client_ack(envelope.delivery_tag)
ObjReponse commented 8 years ago

Hi I tried out that branch and I have a question. How to perform BASIC_ACK, because basic_client_ack and basic_server_ack both did not work?

dzen commented 8 years ago

do you have any problem using basic_client_ack ? what's the error generated ?

ObjReponse commented 8 years ago

Yes I have, it does not work. No error, no ack, task still in queue.

dzen commented 8 years ago

Can you paste some code ?

ObjReponse commented 8 years ago

@dzen sorry for missleading you, I forgot about yield from and when use yield from ch.basic_client_ask... it is working now. sorry again.

dzen commented 8 years ago

No problem. For now I didn't wrote enough documentation. It's planned

dzen commented 8 years ago

hello @thijsterlouw . Did you try the latest release ? (0.5.1, since we got a packaging problem) ?

dzen commented 8 years ago

@thijsterlouw you now have the channel handle to ack a message correctly.