Yelp / pyleus

Pyleus is a Python framework for developing and launching Storm topologies.
Apache License 2.0
403 stars 107 forks source link

Using kafka-spout, bolt dies after a few hours of running #108

Open jswetzen opened 9 years ago

jswetzen commented 9 years ago

There's a problem that I've been unable to solve for a very long time now, so I need to ask about it. I have Kafka set up with a script reading lines from a csv file into it. Then I have a Pyleus topology with the kafka spout, a line reader bolt that splits the lines by comma and prints them to a log file and finally a "black hole" endpoint bolt that just accepts all the tuples and emits nothing. It's the simplest topology I could come up with for testing.

My problem is this: it runs fine for a few hours (from two to seven) but then it suddenly crashes. I have rate limited the Kafka input so it's constant at about 700 tuples/sec and looking at the Kafka process in jconsole I can see that the input is matched by the output, byte by byte. Storm memory usage is fluctuating between 200-400 MB but never goes above that. That being said, here is the Traceback from my bolt that crashes, it's the line reader bolt right after the spout.

ERROR:pyleus.storm.component:Exception in bolt.run
Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/pyleus/storm/component.py", line 233, in run
    self.run_component()
  File "/usr/lib/python2.7/site-packages/pyleus/storm/bolt.py", line 49, in run_component
    self._process_tuple(tup)
  File "/usr/lib/python2.7/site-packages/pyleus/storm/bolt.py", line 42, in _process_tuple
    return self.process_tuple(tup)
  File "/tmp/f9b90a1e-da22-46a3-8893-15c6475c851b/supervisor/stormdist/log_readings-1-1427806732/resources/log_readings/meter_li
ne_reader.py", line 42, in process_tuple
    self.emit(meterposition, anchors=[tup], need_task_ids=False)
  File "/usr/lib/python2.7/site-packages/pyleus/storm/bolt.py", line 148, in emit
    self.send_command('emit', command_dict)
  File "/usr/lib/python2.7/site-packages/pyleus/storm/component.py", line 321, in send_command
    self._serializer.send_msg(command_dict)
  File "/usr/lib/python2.7/site-packages/pyleus/storm/serializers/msgpack_serializer.py", line 50, in send_msg
    self._output_stream.flush()
IOError: [Errno 32] Broken pipe

I'm running in local mode and Storms says there was a java.lang.RuntimeException: Acked a non-existing or already acked/failed id:

7919789 [Thread-9-aimir-line-reader] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: Acked a non-existent or already acked/failed
 id: 4750536584590284839
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.2-incubating.jar:0
.9.2-incubating]
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.2-incubating.j
ar:0.9.2-incubating]
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.2-incubating.jar:0.9.
2-incubating]
        at backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746) ~[storm-core-0.9.2-incubating.jar:
0.9.2-incubating]
        at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Acked a non-existent or already acked/failed id: 475053658459
0284839
        at backtype.storm.task.ShellBolt.execute(ShellBolt.java:157) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631) ~[storm-core-0.9.2-incubating.
jar:0.9.2-incubating]
        at backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399) ~[storm-core-0.9.2-incubating.jar:0
.9.2-incubating]
        at backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) ~[storm-core-0.9.2-incubating.jar:0.9.2
-incubating]
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.2-incubating.jar:0
.9.2-incubating]
        ... 6 common frames omitted
Caused by: java.lang.RuntimeException: Acked a non-existent or already acked/failed id: 4750536584590284839
        at backtype.storm.task.ShellBolt.handleAck(ShellBolt.java:186) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.task.ShellBolt.access$200(ShellBolt.java:64) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.task.ShellBolt$1.run(ShellBolt.java:111) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        ... 1 common frames omitted
7919791 [Thread-9-aimir-line-reader] ERROR backtype.storm.daemon.executor -                                                     
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: Acked a non-existent or already acked/failed
 id: 4750536584590284839                                                                                                        
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.2-incubating.jar:0
.9.2-incubating]                                                                                                                
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.2-incubating.j
ar:0.9.2-incubating]                                                                                                            
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.2-incubating.jar:0.9.
2-incubating]                                                                                                                   
        at backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746) ~[storm-core-0.9.2-incubating.jar:
0.9.2-incubating]                                                                                                               
        at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]      
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]                                                             
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]                                                                  
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Acked a non-existent or already acked/failed id: 475053658459
0284839                                                                                                                         
        at backtype.storm.task.ShellBolt.execute(ShellBolt.java:157) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]        
        at backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631) ~[storm-core-0.9.2-incubating.
jar:0.9.2-incubating]                                                                                                           
        at backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399) ~[storm-core-0.9.2-incubating.jar:0
.9.2-incubating]                                                                                                                
        at backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) ~[storm-core-0.9.2-incubating.jar:0.9.2
-incubating]                                                                                                                    
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.2-incubating.jar:0
.9.2-incubating]                                                                                                                
        ... 6 common frames omitted                                                                                             
Caused by: java.lang.RuntimeException: Acked a non-existent or already acked/failed id: 4750536584590284839                     
        at backtype.storm.task.ShellBolt.handleAck(ShellBolt.java:186) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]      
        at backtype.storm.task.ShellBolt.access$200(ShellBolt.java:64) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]      
        at backtype.storm.task.ShellBolt$1.run(ShellBolt.java:111) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]          
        ... 1 common frames omitted                                                                                             
7919914 [Thread-9-aimir-line-reader] INFO  backtype.storm.util - Halting process: ("Worker died")
pyleus local: error: [StormError] Storm command failed. Run with --verbose for more info.

Has anyone else encountered anything similar? I'm running the latest pyleus version and Storm 0.9.2-incubating.

jswetzen commented 9 years ago

I haven't been able to solve this, and it's extremely hard to debug (you have to run it for up to seven hours to see if it crashed). Unfortunately, the result is that I'm giving up on Pyleus for my master thesis that I'm writing right now. I have lost too much time, and using Storm with Trident works without crashing. I hope Pyleus will continue to mature and hopefully provide a fully functioning alternative for Python enthusiasts in the future.

poros commented 9 years ago

Sorry @jswetzen, I missed this one in the flow of my emails. :( And I'm sorry you're going to abandon Pyleus due to this bug. :(

Have you tried using the json serializer instead of the messagepack one? We wrote that ourselves, so we may have introduced exotic bugs that the Storm community didn't experience.

jswetzen commented 9 years ago

Thanks for your reply @poros. Changing serializer is one thing I have not tried. I can run a final test and see if that makes a difference.

jswetzen commented 9 years ago

It worked! It ran through all my 31 million lines without crashing!

I wonder what kind of speed loss I get since messagepack is supposed to be a lot faster. But this probably means that there's a bug somewhere in the messagepack code!

poros commented 9 years ago

Happy to hear that the json serializer sorted things out!

A bug which is triggered after hours of running is a pain to tackle. In case you have any other/new useful info, do not hesitate to update this issue, please.

hpxiaoming commented 7 years ago

I faced the same problem, but I don't know how to set json serializer. could you give me some help? thank you !