capnproto / pycapnp

Cap'n Proto serialization/RPC system - Python bindings
BSD 2-Clause "Simplified" License
470 stars 124 forks source link

Memory leak in TwoPartyClient or Promises? #98

Open clintfred opened 8 years ago

clintfred commented 8 years ago

First, thanks for the python binding!

Are there any known issues with memory leaks in TwoPartyClient or Promises? I have a batch of threads making a simple RPC call in a loop and the memory usage just continues to rise and rise over time.

Here is the thread target I'm running

def poll_endpoint(polling_interval_secs, endpoint, redis_pub, stop_event, queue=None):
    try:
        while not stop_event.isSet():
            hs_client = create_client(endpoint)
            start = time.time()
            promise = hs_client.deepStatus()
            response = promise.wait()

            after = time.time()
            work_time = after - start
            sleep_time = polling_interval_secs - work_time

            if sleep_time > 0:
                time.sleep(sleep_time)
            else:
                print "WARNING: thread falling behind"

            del start, after, work_time, sleep_time
    except Exception, e:
        print "problem starting client for endpoint " + str(endpoint)
        print e

I've tried moving the create_client outside of the loop to no effect. The python memory_profiler module attributes the every increasing amount of memory to the while loop condition line. I've set that to be a simple counter and run some tests, just to rule out something with the events.

I noticed pycapnp/capnp/lib/capnp.pyx has a few "TODO:MEMORY" markers specifically around Promises and Event Loops, so I thought I'd reach out here.

Thanks in advance for your help.

jparyani commented 8 years ago

Ya, there's some complicated memory handling logic for TwoPartyClient/Capabilities/Promises, and it's very possible accidental cyclical references have been introduced. The "TODO:MEMORY" warnings mostly have do to with creating TwoPartyClients, which are known to leak their "network" objects because sometimes the global EventLoop will access the underlying network even after the client is gone. There are possible fixes for this, but I never got around to testing what worked, and it wasn't high priority since it's bad practice to create multiple clients to the same resource anyways.

What I think may actually be happening in your case is that you're using threads (did you call capnp.create_event_loop(threaded=True) somewhere in your code)? If so and if you're creating a new thread on every call, this will create a new global EventLoop for each thread and never clean itself up. Perhaps pycapnp should grow some logic to check for terminated threads and clean up their event loops, but for now a good workaround may be to use a thread pool.

clintfred commented 8 years ago

Thanks for the quick response.

Yes. At the top of my file I call:

capnp.remove_event_loop()
capnp.create_event_loop(threaded=True)

I don't think I'm creating a thread per call... The code I posted before is for a long-lived thread that is created once when the application starts. I start one per server instance.

t = threading.Thread(target=poll_endpoint, args= (polling_interval, endpoint, redis_pub, stop_event, _q))

That is, in my simple testing, I was connecting to a single server, from a single thread. The thread is created on startup and the while loop

while not stop_event.isSet():

keeps the thread active until the event is set causing all threads to exit.

Perhaps I misunderstood your comment or perhaps it's bad practice to use pycapnp in a long lived thread like this?

clintfred commented 8 years ago

oops, accidentally closed the issue. reopening!

jparyani commented 8 years ago

Hmm ya, I'd agree then. It does seem to be leaking Promises.

clintfred commented 8 years ago

Anyone have any idea how to start debugging this? I'm willing to do some legwork, but I am somewhat new to Python and have only a cursory knowledge of how calling out to C libraries works.

I'm also open to any workarounds that might allow my team to keep using pycapnp's RPC implementation while this gets fixed. Otherwise we're going to have to drop something else in there to maintain forward progress 👎

jparyani commented 8 years ago

I'm really sorry, but I took another look and nothing is jumping out at me as the cause of the leak. I imagine it's a cycle being made somewhere with references to parent objects, but I'm unable to track it down. If anyone knows of good memory debugging tools for python, I'd be happy to hear suggestions.

clintfred commented 8 years ago

Jason -- Thanks for the response and the second look. We were also able to see a memory leak doing simple fromBytes calls, but I was unable to reproduce this in a simple test case, so I'm less confident in how to report this. I'm only mentioning it as it seems the two may be related.

Given that we don't have any deep Py/C experts on our team, we moved on to ProtoBuf. I really liked Capnp and the utility provided by pycapnp, but we don't seem to have a way forward at this time.

I'll continue to monitor this project and thread. Thanks for the binding and all your work. Good luck as the library continues to mature.

kaos commented 7 years ago

@jparyani I've also run across memory leak issues similar to that @clintfred describes.

I've put together a little test script (attached at the bottom), based on the threading test from your tests that exhibits the mem leak for diagnosis. So far, I'm not further along as to having a easily reproducible test code to muck about with, and found objgraph a tidy tool for diagnosing objects and references.

Using that, I spot these objects gets created (and possibly leaked) on each RPC invocation in the test code:

>>> import objgraph
>>> objgraph.show_growth()
function                       1270     +1270
wrapper_descriptor             1211     +1211
builtin_function_or_method      831      +831
method_descriptor               742      +742
dict                            668      +668
weakref                         592      +592
tuple                           497      +497
getset_descriptor               371      +371
list                            263      +263
member_descriptor               221      +221
>>> loop()
(<test_capability_capnp:TestInterface.foo$Results reader (x = "250100")>, 50001)
>>> objgraph.show_growth()
wrapper_descriptor     1220        +9
getset_descriptor       375        +4
member_descriptor       224        +3
weakref                 594        +2
dict                    670        +2
tuple                   498        +1
_Response                 2        +1
_StructSchema             6        +1

Inspecting the reference chain for _StructSchema, it renders this image: image

Quite handy. I'll keep digging, just wanted to post a heads up, in case you've been down this alley already, or perhaps something pops up and says "Hellu" to you ;)

After running 50k loop iterations, top reports this stats for the process:

PID    COMMAND   %CPU TIME     #TH #WQ #PORTS MEM   PURG CMPRS PGRP  PPID  STATE    BOOSTS %CPU_ME %CPU_OTHRS UID       FAULTS  COW  MSGSENT MSGRECV SYSBSD   SYSMACH CSW     PAGEINS IDLEW POWER USER #MREGS RPRVT VPRVT VSIZE KPRVT KSHRD
90325  python2.7 0.0  00:06.81 2   0   14+    427M+ 0B   0B    90325 41104 sleeping *0[1+] 0.00000 0.00000    582949443 111857+ 512+ 103+    48+     1057862+ 295+    101512+ 147+    0     0.0   myself N/A    N/A   N/A   N/A   N/A   N/A  

Test script I use:

#!/usr/bin/env python

import capnp, threading, socket
import test_capability_capnp, gc
import objgraph, code

gc.set_debug(gc.DEBUG_LEAK)

capnp.remove_event_loop(True)
capnp.create_event_loop(True)

def memory_info(prefix = None, collect = False):
    import gc
    from pprint import pprint
    pprint(
        dict(
            prefix = prefix,
            counts = gc.get_count(),
            threshold = gc.get_threshold(),
            garbage = len(gc.garbage)
        )
    )

    if collect:
        collected = gc.collect()
        print prefix, ': collected: ', collected
        if collected:
            memory_info ('post collect')

def dump_garbage():
    """
    show us what's the garbage about
    """

    # force collection
    print "\nGARBAGE:"
    gc.collect()

    print "\nGARBAGE OBJECTS:"
    for x in gc.garbage:
        s = str(x)
        if len(s) > 80: s = s[:80]
        print type(x),"\n  ", s

def test_loop():
    timeout = 1
    counter = 0

    while True:
        counter = counter + 1
        print 'sleep... ', counter
        capnp.getTimer().after_delay(timeout * 1000 * 1000 * 1000).wait()

class Server(test_capability_capnp.TestInterface.Server):

    def __init__(self, val=1):
        self.val = val

    def foo(self, i, j, **kwargs):
        return str(i * 5 + self.val)

class SimpleRestorer(test_capability_capnp.TestSturdyRefObjectId.Restorer):

    def restore(self, ref_id):
        assert ref_id.tag == 'testInterface'
        return Server(100)

def test_thread():
    read, write = socket.socketpair(socket.AF_UNIX)

    def run_server():
        restorer = SimpleRestorer()
        server = capnp.TwoPartyServer(write, restorer)
        capnp.wait_forever()

    print 'Start server...'

    server_thread = threading.Thread(target=run_server)
    server_thread.daemon = True
    server_thread.start()

    print 'Start client...'

    client = capnp.TwoPartyClient(read)

    print 'Setup client...'

    ref = test_capability_capnp.TestSturdyRefObjectId.new_message(tag='testInterface')
    cap = client.restore(ref)
    cap = cap.cast_as(test_capability_capnp.TestInterface)

    print 'Run test loop...'

    counter = 0
    def loop():
        return call_server (cap, counter), counter + 1

    while counter < (50 * 1000):
        response, counter = loop()
        if counter % (10 * 1000) == 0:
            print 'foo({}) == {}'.format(counter, response.x)
            memory_info(counter, True)

    code.interact(local=locals())

def call_server (cap, i):
    remote = cap.foo(i)
    return remote.wait()

if __name__ == '__main__':
    test_thread()
    dump_garbage()
kaos commented 7 years ago

Using valgrind, I got a bunch of pointers to memory leaks, but they're a bit too involved for me to dig through, so we've opted to workaround this issue, rather than solving it (sadly, that is abandoning the use of capnproto in our project).