Closed yishenggudou closed 11 years ago
have all messages left the buffer? (closing sockets does not mean that messages are not present)
@minrk how to left the message?
If they have arrived at the peer (the PULL socket), then they have left.
Also note that often the process will not release memory to the OS, though it is available (as indicated by subsequent allocations not actually increasing the memory footprint).
It is very difficult to figure out if there is an issue here without sample code and platform information, so please do provide more information.
Can you provide example code that reproduces this issue? I've done plenty of tests with many GB of throughput, and no leaks detected.
Please provide platform and version (Python, pyzmq, and libzmq) information as well.
@minrk ok this is my send code the zmq bind with tcp port the zmq version is 2.2.1
In [2]: zmq.zmq_version()
Out[2]: '2.2.1'
In [3]: zmq.zmq_version_info()
Out[3]: (2, 2, 1)
import storm_conf
import sys
def stdin():
for line in sys.stdin:
yield line
STDIN = stdin()
def cradle():
u"""
"""
import zmq
context = zmq.Context(10)
pusher = context.socket(zmq.PUSH)
pusher.setsockopt(zmq.HWM,10)
pusher.setsockopt(zmq.SWAP,20*1024*1024*8)
pusher.setsockopt(zmq.SNDBUF,10240)
pusher.setsockopt(zmq.AFFINITY,1)
pusher.bind(storm_conf.PIPE_CRADLE)
while True:
try:
task = STDIN.next()
pusher.send(task)
except:
sys.exit()
break
if __name__ == "__main__":
cradle()
and this is my revice code
import os
import sys
import storm_conf
import json
DIR_PATH = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
sys.path[0:0] = [os.path.split(DIR_PATH)[0]]
from realtime.parse.parse import Qparse
from upload.upload import up2jdata
from tools.merge import Merge
from tools.tasks import Q
from conf import realtime as realtime_conf
from tools.qfile import split_file_by_date
from tools.paths import buffile
import zmq
from time import sleep
def clean():
u"""
"""
context = zmq.Context()
socket_in = context.socket(zmq.PULL)
socket_in.setsockopt(zmq.HWM, storm_conf.HWM_CLEAN)
#socket.bind(storm_conf.PIPE_PORT)
socket_in.connect(storm_conf.PIPE_CRADLE)
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.connect(storm_conf.PIPE_ROUTER_IN)
n = 0
merge = Merge(mergerule=realtime_conf.REALTIME_MERGE_RULE)
while True:
task = socket_in.recv()
#print task
if task is None:
sleep(1)
continue
task = Qparse(task)
if task is None:
continue
buff = [task['date_5m_s'], task['customer'],task['idc'], task['isp'],task['area'], task['contlength'],1, task['HIT'],task['MISS']]
if n < 400000:
merge.add(buff)
n += 1
#print n
else:
print "upload to jdata"
n = 0
path = buffile()
merge.to_file(path)
sys.stdout.write(">> file" + str(path))
merge.reset()
files = split_file_by_date(path)
sys.stdout.write('files:>>'+str(files))
for date, f in files.iteritems():
Q.enqueue(up2jdata,f.name, date, "cache_realtime_bw", ',', keep=True)
#up2jdata(f.name, date, "cache_realtime_bw", ',', keep=True)
del buff
del task
if __name__ == "__main__":
clean()
I run the send code readlog from syslog-ng and send to the worker recoder code
and run 40 worker process on two machine
the send process use 10GB
memory after has run one days
[root@host strom]# ps aux|grep cradle_ng.py
root 10966 71.3 22.2 11297940 10970608 ? Sl Nov08 1106:16 python2.7 /data/qlog/strom/cradle_ng.py
[root@host strom]# free -m
total used free shared buffers cached
Mem: 48256 25833 22423 0 688 12816
-/+ buffers/cache: 12328 35927
Swap: 4094 38 4055
[root@host strom]# uname -a
Linux localhost.localdomain 2.6.18-308.el5 #1 SMP Fri Jan 27 17:17:51 EST 2012 x86_64 x86_64 x86_64 GNU/Linux
[root@host strom]# ls /usr/local/lib/libzmq.*
/usr/local/lib/libzmq.a /usr/local/lib/libzmq.la /usr/local/lib/libzmq.so /usr/local/lib/libzmq.so.1 /usr/local/lib/libzmq.so.1.0.1
Try reducing it to a minimal test case that others can actually run / confirm. For instance, remove your STDIN.read, and on your workers, do nothing but while True: socket_in.recv()
.
There are too many factors to get a good handle on a zmq or pyzmq bug here, if there is one. For instance, I see you have tweaked SNDBUF, SWAP, and HWM, all of which indicate that you are generating messages to send faster than you can send them. If that's really true, then your memory will grow because you are not flushing stdin fast enough (regardless of zmq).
How many messages/sec does this send? How big are messages? What happens if you don't set SWAP or SNDBUF or HWM? How many messages are typically outstanding, not yet sent? Do you really need 160 MB of swap to stay afloat?
In all, None of these are actually pyzmq questions, and this should probably be directed at libzmq-dev. Also note: SWAP was removed in libzmq-3, so any bug in SWAP is not likely to be addressed, if that is the issue here.
@minrk
I want to send log file with pyzmq 200Mb/s
may 100,000
/s
so i set SENDBUF
SWAP
and HWM
now i use pub and sub pattern and don't set any socket options
the pub code https://gist.github.com/4085500 the sub code https://gist.github.com/4085510 the router code https://gist.github.com/4085511
i use the code to transmission row file
time cat /data/temp/buff/2012/201211/20121111/20121111*|python2.7 test_zmq_pub.py
python2.7 test_zmq_router.py
python2.7 test_zmq_sub.py > test.recv.buff
the /data/temp/buff/2012/201211/20121111/20121111*
file size is 5G
run two process time cat /data/temp/buff/2012/201211/20121111/20121111*|python2.7 test_zmq_pub.py
send the file twice
the message will block in router and the memory use will go up and never go down unitl the message send finish
I'm not sure what you mean by 'the message will block in router', but it makes perfect sense for the router's memory to grow, because it is receiving messages faster than it can send them (hence, they are filling up buffers). If you wait after sending the whole file (easier if you use a smaller file, such as 100MB or 1GB), the router device will eventually finish relaying messages, and its memory usage returns to idle (20-40 MB).
So this is definitely not the issue you initially described (failing to release unused memory), it's just the classic zeromq slow subscriber problem. Setting HWM on your device sockets changes the buildup from happening in the router's SUB to happening in the upstream PUB, and setting HWM in PUB eliminates memory buildup altogether, but the way it does that is to drop messages when HWM is hit (PUSH blocks on HWM, but SUB drops messages).
If PUSH/PULL is really what you use, then here's a version of your scripts that has no memory growth problems. HWM does it all, and setting SWAP would ruin it by changing the HWM behavior.
I've never used ZMQ_SWAP, but since it is removed in zmq3, I wouldn't recommend using that as a solution (in part because it may not even work).
Since this is confirmed as not a pyzmq issue I am closing it, but feel free to ask more questions. zeromq-dev is really a better place for general zeromq usage questions like this one, as I am typically the only one here, and I am far from a zeromq expert.
I use use zmq's push pattern to send message and pull pattern to revice message
but when i close the socket the procees's memory still keep high memory usage
When the process memory usage up will not be down