zeromq / libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
https://www.zeromq.org
Mozilla Public License 2.0
9.77k stars 2.36k forks source link

Reconnecting to a PUB socket causes memory leakage #171

Closed chuckremes closed 13 years ago

chuckremes commented 13 years ago

Code to reproduce:

https://gist.github.com/839709

It's Ruby but it could be translated to C by someone competent in that language. Here are the basic steps to repro:

  1. Create a Publisher in a thread. Let it publish a single byte every 25ms.
  2. Create a set of Subscribers (32 in the example). Each thread creates SUB socket that subscribes to everything. Every 100ms the SUB socket is closed and a new one is created and connected. Any message received is just closed. Note that the subscriber(s) should easily be able to keep pace with a publisher sending only a single byte every 25ms; that is, no real queueing is going to happen here.
  3. Put a FORWARDER device between the Publisher and Subscribers. I recommend putting this in a separate program so you can watch its memory footprint continually grow while the other program containing the publisher & subscriber levels off and stops growing.

This bug exists for both :ipc and :tcp transport. I did not test :inproc transport.

The resident size (rsize) of the forwarder device program will grow at a rate of about 20MB per minute. You can tell this isn't a message queue buffering issue due to a) HWM set to 1 for all sockets, b) LINGER set to 0 for all sockets, c) killing the publisher/subscriber program does not cause the forwarder program heap to shrink, d) restarting the publisher/subscriber program, the heap will start growing again immediately (long before there would be time for the large heap to get filled again).

My use-case is as follows. I have a trade backtesting system that activates a new "strategy" when it receives a buy/sell signal. Part of the strategy is a small component that contains a single SUB socket and subscribes to all trades. The backtester runs much faster than real-time (e.g. 1 second of wall clock time is 1 hour inside the system). So a short-lived strategy that only survives a few minutes within the backtester may shutdown the trade subscriber within 50-1000 milliseconds of connecting. This can happen very often and very rapidly. And a single PUB socket could not be shared amongst several strategies since their subscriptions are all unique.

chuckremes commented 13 years ago

Here is a C version of the program that illustrates the leak.

https://gist.github.com/841318

I didn't use a forwarder this time but it could be easily added. I recommend using one because debugging a program with just two sockets (the forwarder) will be far simpler than debugging one where there are dozens due to all of the threading.

hintjens commented 13 years ago

Minimal version of that C program: https://gist.github.com/843774

Still shows the symptom (memory usage increasing) after commit d4e418.

hintjens commented 13 years ago

Some notes on this issue:

So issue seems to be that socket closure isn't happening properly until zmq_term, and opening/closing many sockets in a loop will cause memory leaks.

Opening and closing sockets very quickly will cause system file handles to run out, see https://gist.github.com/843786:

Too many open files
rc == 0 (mailbox.cpp:374)
Aborted (core dumped)
chuckremes commented 13 years ago

Please share the code that inits/terminates the context in the subscriber to avoid the leak. I modified my local copy to do that yet I still see the same leak.

chuckremes commented 13 years ago

I'm still fairly certain this leak is on the PUB socket and not the SUB socket. Double-check my work by putting a forwarder device in between the PUB/SUB sockets in the example. You will see the memory footprint grow on the FORWARDER while it remains mostly static on the original program. That indicates to me that the PUB socket in the forwarder is the location of the leak.

sustrik commented 13 years ago

Ok, one more try. Can you check whether following patch helps?

diff --git a/src/xpub.cpp b/src/xpub.cpp
index 630450b..224079e 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -29,7 +29,7 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
     fq (this)
 {
     options.type = ZMQ_XPUB;
-    options.requires_in = true;
+    options.requires_in = false;
     options.requires_out = true;
 }

@@ -40,9 +40,9 @@ zmq::xpub_t::~xpub_t ()
 void zmq::xpub_t::xattach_pipes (class reader_t *inpipe_,
     class writer_t *outpipe_, const blob_t &peer_identity_)
 {
-    zmq_assert (inpipe_ && outpipe_);
+    zmq_assert (!inpipe_ && outpipe_);
     dist.attach (outpipe_);
-    fq.attach (inpipe_);
+    //  fq.attach (inpipe_);
 }

 void zmq::xpub_t::process_term (int linger_)
chuckremes commented 13 years ago

Success!!!!!

!!!!!!

sustrik commented 13 years ago

Fixed in master in 67b1f14.