barryp / py-amqplib

Python client for the Advanced Message Queuing Procotol (AMQP)
GNU Lesser General Public License v2.1
24 stars 10 forks source link

Support for timeout #11

Open GoogleCodeExporter opened 9 years ago

GoogleCodeExporter commented 9 years ago
Versions: py-amqplib 0.6.1, RabbitMQ 1.5.4, Python 2.6.4

I would like to perform the following:

  chan.basic_consume(..)
  heartbeat_deadline = new_deadline(...) # Compute a time instant
  while 1:
    next_deadline = max(0, heartbeat_deadline - time.time())
    chan.wait(timeout = next_deadline)
    # We get there if a message was processed,
    # or if nothing was received for the specified amount of time.
    # Check if we reached our deadline:
    if time.time() > heartbeat_deadline:
       chan.basic_publish(...)
       heartbeat_deadline = new_deadline(...)

The purpose is to allow me to run a loop to receive new messages from
a queue synchronously, but that still allow me to send message back to
the server at regular interval (like heartbeat or something like that.)

Could py-ampqlib support that easily? I've not checked sources in detail,
but I think _AbstractTransport._read(self, n) could support an additionnal
parameter 'timeout', which would be used only for receiving the header
of a new frame (other uses would set timeout to None) if .wait() specified
a timeout itself, using select() before the read() operation.

But maybe I misusing the library. Is that another way to obtain the same
behavior? I thought about writing a second process (and thus opening
an additionnal TCP connection) but I'm not really happy with this idea.
I would prefer to perform all the stuff from a single process.

Original issue reported on code.google.com by fjolli...@gmail.com on 21 Jan 2010 at 2:37

GoogleCodeExporter commented 9 years ago
Here is a quick hackish patch which implement timeout. It doesn't handle SSL
(should be easy to add). It seems to work, but I do not guarantee anything.
DO NOT USE IT IN PRODUCTION!

It allows me to do:

    chan.basic_consume(queue = 'some_queue', callback = callback)
    while 1:
        try:
            chan.wait(timeout = 5)
        except amqp.Timeout:
            # Do something when timeout occurs.
            chan.basic_publish(...)

The way I coded it, the timeout apply to each attempt to read a frame 
internally.
Meaning that if a frame in read, but queued because it is not handled by 
.wait(),
the time spent to handle this first frame is not deduced from the timeout. Also,
a frame which started to be read will block until read entirely. Not sure about 
to
avoid that however. The timeout only apply when waiting for the begining of the
frame.

I send a patch because I've not studied how patch submission are handled on
Google Code. Hope it's ok.

Original comment by fjolli...@gmail.com on 21 Jan 2010 at 3:41

GoogleCodeExporter commented 9 years ago
[deleted comment]
GoogleCodeExporter commented 9 years ago
[deleted comment]
GoogleCodeExporter commented 9 years ago
Another version of the patch. This one fix a bug which occurs when something
was already buffered. Timeout applied anyway which was wrong. Works quite nicely
for me actually now. But before anyone can review it, it should NOT be used in
production.

Original comment by fjolli...@gmail.com on 21 Jan 2010 at 4:59

Attachments:

GoogleCodeExporter commented 9 years ago
Yea, this is really necessary for out application; where we are replacing
multiprocessing pipes & queues with AMQ & Rabbit.  We need to occasionally drop 
out
of the message wait to do some local work.

Original comment by awill...@whitemice.org on 27 Apr 2010 at 8:14

GoogleCodeExporter commented 9 years ago
amqplib-0.6.1-timeout-rev3.patch seems to work for me.
----
        try:
            self._channel.wait(timeout=timeout)
        except AMQTimeOut:
            return None
        except Exception, e:
            raise e

Original comment by awill...@whitemice.org on 27 Apr 2010 at 8:44

GoogleCodeExporter commented 9 years ago
Not having timeouts is a real problem for our application as well and we have 
to use
some lame hacks to get around it. I'm reluctant to use a patch when the author
specifically said "DO NOT USE IN PRODUCTION". It would be really good for this 
to be
added to the official amqplib package.

Original comment by emp....@gmail.com on 27 Apr 2010 at 11:21

GoogleCodeExporter commented 9 years ago
carrot now has support timeouts in amqplib:
http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97

Could this be merged into amqplib itself?

Original comment by emp....@gmail.com on 10 May 2010 at 11:10

GoogleCodeExporter commented 9 years ago
The carrot solution doesn't look anywhere near as robust as fjolliton's patch 
as it forces a timeout onto the transport socket which is not catered for in 
the current _read() code.

FWIW: http://github.com/lbt/py-amqplib/tree/proposed

Original comment by the....@gmail.com on 6 Jul 2010 at 10:32

GoogleCodeExporter commented 9 years ago
When implementing this patch on amqplib 1.0.2 (latest as of now) on Python 3 
then the few import statements in the patch must have a dot first, i.e.:

from .transport import Timeout

instead of

from transport import Timeout

Original comment by fgunder...@gmail.com on 7 Feb 2012 at 2:46

GoogleCodeExporter commented 9 years ago
Adding this patch has the nice side-effect that one may kill the process with 
ctrl+c and get a clean exit point. To catch the ctrl+c my code looks like this 
now:

            try:
                chan.wait(timeout=2);
            except amqp.Timeout:
                pass
            except KeyboardInterrupt:
                log.log('Interrupted by ctrl+c')
                runme = False
            except Exception as ex:
                log.log('Fatal exception in main loop: ' + str(ex))

Original comment by fgunder...@gmail.com on 5 Mar 2012 at 8:36

GoogleCodeExporter commented 9 years ago
After running with this exact code in two worker processes for a few weeks with 
about 10K messages per day, it seems that frequent timeouts (i.e. a low value 
such as 2 seconds shown above) will occasionally (a few times per day in my 
situation) introduce 10-20 seconds delays in the processing. By using a 20 sec 
timeout instead, this problem almost goes away. Although causality is not 
proved, this one change caused a major improvement.

Original comment by fgunder...@gmail.com on 24 Mar 2012 at 9:22