Yelp / pyleus

Pyleus is a Python framework for developing and launching Storm topologies.
Apache License 2.0
403 stars 113 forks source link

A fast spout and a slow blot in my topology. The slow blot always hang up. #149

Closed aq2004723 closed 9 years ago

aq2004723 commented 9 years ago

For instance,if the blot write data into msyql which make it slow .Soon I found no new data in MySQL.

What's wrong with that?

imcom commented 9 years ago

I have a very similar situation here, I am guessing that the inbound queue of the bolt is full or some other queues are overflowed so that bolt hangs. I am spending weeks already but no really useful intel gathered ...

imcom commented 9 years ago

Actually, I am seeing OutOfMemory each time after the topology running for like 1-2 hrs. I have not found the root cause for memory leak, from the heap dump, it just shows many tups/messages in this topology and some bolts hung on SynchroziedQueue objects or so forever. It looks like either the bolt/subprocess died without knowing by ShellBolt or there is a block somewhere down the path

when memory exhaustion happens, I can always find bolt in the following state

Name: Thread-103-l3_stats_filter_aggregator
State: WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@da7a922
Total blocked: 0  Total waited: 59,389

Immediate help wanted!

poros commented 9 years ago

If you happen to emit tuples into the topology much faster than you are able to process them, Storm itself will start to go out of memory, because it both needs to maintain the tuples in the downstream bolts inbound queues and to keep track of all of them in ackers.

The problem is not only Pyleus fault (which is only responsible of slowing down things, given the multilang overhead), but it's deeper at the Storm level.

You may want to try to either increase the parallelism of your bolts or to insert a sleep call in your spout, in order to slow it down. If the database is the bottleneck, instead, consider moving to a faster key-value store, if possible.

imcom commented 9 years ago

yea, dude, I already using Redis already for DB-like operations, I do not wanna use bolts mem on this because it does not scale well like Redis. However, I still have some questions to ask, some of them might be a little bit off Pyleus's scope and related to Storm anyway. 1) I disabled acker already in my topology, so Storm should not track them in ackers as I imagined, am I correct about this? 2) I understand that Storm has to keep all the tuples have not been processed in downstream inbound queues, but isn't it strange that downstream bolts just hang on the fulfilled queues forever? They just need to pick tuples from the queues and there should be plenty of those in the queues ... am I missing something here? I heard about Storm that it uses the same queue for commanding, coordination and data, so if the queue is full of data and no other messages could be sent, then the bolts just wait on some conditions forever? I thought it at least should read something out of the queue ...

imcom commented 9 years ago

also, is there any way I can monitor the size of the aforementioned queues? I tried JMX and also the Graphite metrics plugins but got no luck ... and would it be a good idea that I buffer the fast spouts in an extra bolts layer and emit the tuples in a constant rate towards following bolts?

mzbyszynski commented 9 years ago

Are you using the max_spout_pending setting in your topology? That will throttle the output of your spout.

imcom commented 9 years ago

hi @mzbyszynski yes, I used max_spout_pending but it does not ease the case. I think it is not a throttling issue, please refer to my following comment

imcom commented 9 years ago

hi guys, more findings today

write(1, "\201\247command\244sync", 14) = 14
read(0, "\205\242id\264-2748381353226709512\246stream"..., 1048576) = 63
write(1, "\201\247command\244sync", 14) = 14
read(0, "\205\242id\264-1491252504059631995\246stream"..., 1048576) = 63
write(1, "\201\247command\244sync", 14) = 14
read(0, "\205\242id\2632950208721902025632\246stream\253"..., 1048576) = 62
write(1, "\201\247command\244sync", 14) = 14
read(0, "\205\242id\2638313233637446736666\246stream\253"..., 1048576) = 62
write(1, "\201\247command\244sync", 14) = 14
read(0, "\205\242id\264-2701657939574195409\246stream"..., 1048576) = 63
write(1, "\201\247command\244sync", 14) = 14
read(0, "\205\242id\264-3064939443133503355\246stream"..., 1048576) = 63
write(1, "\201\247command\244sync", 14) = 14
read(0, "\205\242id\2639075995340754665021\246stream\253"..., 1048576) = 62
write(1, "\201\247command\244sync", 14) = 14
read(0, "\205\242id\264-8726533184469558620\246stream"..., 1048576) = 63
write(1, "\201\247command\244sync", 14) = 14
read(0, "\205\242id\264-4222159716794756541\246stream"..., 1048576) = 63
write(1, "\201\247command\244sync", 14) = 14
read(0, "\205\242id\263-855439931644089241\246stream\246"..., 1048576) = 74
write(1, "\204\247command\244emit\247anchors\220\255need_tas"..., 4096) = 4096
write(1, "\37\t6g\4\0006\16j\24Vg\4\0006\22\1\26\00456*\1\26\0005>\36\t\35\274>"..., 3072000) = 3072000
write(1, "\0007F\232\nJ\371\4\0007F\230\n:z\0:\251\0:u\3\215\371\00471\16\226\10>\371"..., 2542) = 2542
write(1, "\204\247command\244emit\247anchors\220\255need_tas"..., 4096) = 4096
write(1, "\4\0006\16j\24Vg\4\0006\22\1\26\00456*\1\26\0005>\36\t\35\274>\225$\1\274"..., 3072000

seems like the python bolt is having trouble to write data to STDOUT .... is it because of that the tup size is too large or what?

poros commented 9 years ago

Hi @imcom, quick answers to your questions: 1) If you disabled ackers and you are emitting both unanchored tuples from bolts and tuples without tup_ids from spouts, Storm should not track your tuples 2) Yes, you're right, that is weird...

About the tuple size, during my experiments I tried to send 1MB tuples at a consistent rate and it worked smoothly (even though with terrible performance), so I don't really know if that can be the cause of your problem or not.

imcom commented 9 years ago

Hi @poros I am using the default Kafka-spout in this case, I think Storm should not be tracking my tuples but I had no idea how should I confirm this... From what I've seen, the Kafka-spout itself acks tuples pretty quickly with no hesitation.

I got slightly larger tuples in my topo that is around 1-2M each but it works OK. I've managed to eliminate/mitigate the memory issue by break down my operations. More precisely, let's say each time I receive a tuple in my bolt, the tuple will be extracted first (a google-protobuf compacted message), so I will get 10k+ items need to deal with for each one of those and the tuples come every minute (for now, might be faster in future). Instead of processing all the items in process_tuple function, I save them in memory and taking advantage of tick tuples to smoothly process them one at a time. Similar pattern has been applied through out my topo. Fast process_tuple and ack immediately and then apply my logic on tuples on tick.

Now I am able to reach my goal in reasonable delay and not suffering from "memory leaking" on my topology

screen shot 2015-09-29 at 10 31 47 am

screen shot 2015-09-29 at 10 32 47 am

I am using Redis and InfluxDB in my topo, I have three Redis instances on two physical servers, not in cluster mode and I manage the sharding my configuration.

screen shot 2015-09-29 at 10 34 56 am

poros commented 9 years ago

Cool, I'm happy you solved your problem! :) I'm tempted to close this issue. Do you believe there is still something to be done at the Pyleus end that is not already tracked in #60?

imcom commented 9 years ago

just say from my experience, this issue should be closed and similar issues are more likely caused by either the topology design or Storm intrinsic nature. #60 is certainly a potential risk but whenever someone runs into that problem, it also indicates that the structure of topology has some space to refine and very likely the problem can be tackled.