majek / puka

Puka - the opinionated RabbitMQ client
https://github.com/majek/puka
Other
182 stars 34 forks source link

Puka with green threads #46

Closed istalker2 closed 11 years ago

istalker2 commented 11 years ago

In our project we use both puka and green threads via eventlet library. I.e. puka is imported using eventlet.patcher.import_patched('puka') instead of "import puka" and that gives a great value as we can now write simple synchronous code with client.wait(...) and still have puka not blocking our single thread.

The only non-obvious issue that I saw using puka in such way it that you cannot ack mesages while another green thread is waiting for another message to be received.

Suppose we have 2 green threads. Green-thread 1: basic_consume_promise = self.client.basic_consume( queue='test', prefetch_count=1) while True: msg = self.client.wait(basic_consume_promise) eventlet.spawn(self.message_received, msg)

Green-thread 2: def message_received(self, msg): eventlet.sleep(1) self.client.basic_ack(msg)

The problem with the code above is that the order of execution is p = basic_consume() msg = wait(p) msg = wait(p) basic_ack(msg)

and because basic_ack get called after wait() is waiting for another message data writen by basic_ack to send buffer remains unsent.

The fix is to always wait for writes in select() event if send buffer is empty at the moment. I.e.

def wait(self, promise_numbers, timeout=None, raise_errors=True): .... r, w, e = select.select((self,), (self,), #!!! if self.needs_write() else (), (self,), 0) ... while True: ... r, w, e = select.select([self], [self],#!!! if self.needs_write() else [], [self], td) AFAIU this wouldn't harm normal single thread use cases in any way but would make puka useful for scenarios involving green-threads

majek commented 11 years ago

Yes, but with select([self], [self], [self]) won't you be spinning 100% if there is nothing in the write buffer?

istalker2 commented 11 years ago

Yes, you are right. Silly mistake :)