majek / puka

Puka - the opinionated RabbitMQ client
https://github.com/majek/puka
Other
182 stars 34 forks source link

memory leak - RPC pattern #57

Closed fishtek closed 9 years ago

fishtek commented 9 years ago

Hello,

An application I have written which uses Puka in the RPC pattern, and is experiencing a memory leak when many messages are passed through it.

I believe this is probably the same as Serious memory leak - if same channel used to consume & publish.

I was able to reproduce this problem With Puka 0.0.7 using a slightly modified version of the example code: rpc_client.py and rpc_server.py.

Here is the modified server:

#!/usr/bin/env python
import puka

client = puka.Client("amqp://admin:admin@localhost/")
promise = client.connect()
client.wait(promise)

promise = client.queue_declare(queue='rpc_queue')
client.wait(promise)

print " [x] Awaiting RPC requests"
consume_promise = client.basic_consume(queue='rpc_queue', prefetch_count=1)
while True:
    msg_result = client.wait(consume_promise)
    n = int(msg_result['body'])

    print " [.] fib(%s)" % (n,)
    response = '99'  # Don't actually calculate anything, we want speed!

    # This publish doesn't need to be synchronous.
    client.basic_publish(exchange='',
                         routing_key=msg_result['headers']['reply_to'],
                         headers={'correlation_id':
                                  msg_result['headers']['correlation_id']},
                         body=str(response))
    client.basic_ack(msg_result)

Here is the modified client:

#!/usr/bin/env python
import puka
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.client = client = puka.Client("amqp://admin:admin@localhost/")
        promise = client.connect()
        client.wait(promise)

        promise = client.queue_declare(exclusive=True)
        self.callback_queue = client.wait(promise)['queue']

        self.consume_promise = client.basic_consume(queue=self.callback_queue,
                                                    no_ack=True)

    def call(self, n):
        correlation_id = str(uuid.uuid4())
        # We don't need to wait on promise from publish, let it happen async.
        self.client.basic_publish(exchange='',
                                  routing_key='rpc_queue',
                                  headers={'reply_to': self.callback_queue,
                                           'correlation_id': correlation_id},
                                  body=str(n))
        while True:
            msg_result = self.client.wait(self.consume_promise)
            if msg_result['headers']['correlation_id'] == correlation_id:
                return int(msg_result['body'])

fibonacci_rpc = FibonacciRpcClient()

while True:
    print " [x] Requesting fib(30)"
    response = fibonacci_rpc.call(30)
    print " [.] Got %r" % (response,)

Not sure if it matters but my RabbitMQ version is: RabbitMQ 3.3.5, Erlang R16B03

The memory usage for both the client and server will continue to increase the longer the two programs are run.

Thanks

majek commented 9 years ago

Thanks for the report. Here is what happens.

You are not waiting on promises returned by basic_publish. Unfortunately Puka has no way of understanding if you do or do not wish to wait for them and it keeps the state of every single promise, until it's been "waited" for.

There are two workarounds:

fishtek commented 9 years ago

Majek, thank you!

I did as you suggested and placed a client.wait() on the promise returned by basic_publish right after that call. The memory is now staying steady and not creeping up!

I also thought that there would be a performance hit, since the publish is now fully synchronous? But it seems to run the same speed and the rabbit server reports over 1460 messages per second, so I think this solution will certainly do the trick!

thanks again