Yelp / pyleus

Pyleus is a Python framework for developing and launching Storm topologies.
Apache License 2.0
404 stars 113 forks source link

Usin Pyleus with PyPy throws "Acked a non-existent or already acked/failed id" error #89

Open victorpoluceno opened 9 years ago

victorpoluceno commented 9 years ago

I've a topology that works perfectly with CPython, but in order to achieve better performance I'm testing pyleus with PyPy and I've stumbled in a very strange bug. When running this topology with PyPy after a while pyleus breaks with this error:

99517 [Thread-11-commit] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: Acked a non-existent or already acked/failed id: -1661235904777352941
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Unknown Source) [na:1.7.0_72]
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Acked a non-existent or already acked/failed id: -1661235904777352941
    at backtype.storm.task.ShellBolt.execute(ShellBolt.java:157) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    ... 6 common frames omitted
Caused by: java.lang.RuntimeException: Acked a non-existent or already acked/failed id: -1661235904777352941
    at backtype.storm.task.ShellBolt.handleAck(ShellBolt.java:186) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.task.ShellBolt.access$200(ShellBolt.java:64) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.task.ShellBolt$1.run(ShellBolt.java:111) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    ... 1 common frames omitted
99518 [Thread-11-commit] ERROR backtype.storm.daemon.executor -
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: Acked a non-existent or already acked/failed id: -1661235904777352941
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Unknown Source) [na:1.7.0_72]
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Acked a non-existent or already acked/failed id: -1661235904777352941
    at backtype.storm.task.ShellBolt.execute(ShellBolt.java:157) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    ... 6 common frames omitted
Caused by: java.lang.RuntimeException: Acked a non-existent or already acked/failed id: -1661235904777352941
    at backtype.storm.task.ShellBolt.handleAck(ShellBolt.java:186) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.task.ShellBolt.access$200(ShellBolt.java:64) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.task.ShellBolt$1.run(ShellBolt.java:111) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    ... 1 common frames omitted

As far as I can tell this is the only problem preventing pyleus to work with PyPy. Does anyone uses PyPy with Pyleus, or knows what cause this error?

Worth noticing that this error goes way when I reduce the size of each tuple that goes through Storm.

poros commented 9 years ago

We never explicitly designed Pyleus to be compatible with PyPy, so I am afraid I don't have more insight about that. As far as I know there might even be other issues arising when trying to run Pyleus on PyPy.

Can it be a conversion problem of the message id number due to messagepack or json/simplejson used with PyPy? That number seems kinda weird to me. Did you try with both serializers?

victorpoluceno commented 9 years ago

Yes, you are right! It works perfectly with json as serialize. It seems the problem is with msgpack and PyPy.

Also, to add more information to the issue I got this another exception when running with msgpack as serialize:

2015-02-09 22:27:58,798 29432 MainProcess 140567940040480 MainThread ERROR pyleus.storm.component Exception in bolt.run
Traceback (most recent call last):
  File "/var/storm/supervisor/stormdist/topology-production-13-1423520840/resources/pyleus_venv/site-packages/pyleus/storm/component.py", line 233, in run
    self.run_component()
  File "/var/storm/supervisor/stormdist/topology-production-13-1423520840/resources/pyleus_venv/site-packages/pyleus/storm/bolt.py", line 46, in run_component
    self._process_tuple(tup)
  File "/var/storm/supervisor/stormdist/topology-production-13-1423520840/resources/pyleus_venv/site-packages/pyleus/storm/bolt.py", line 39, in _process_tuple
    return self.process_tuple(tup)
  File "/var/storm/supervisor/stormdist/topology-production-13-1423520840/resources/jabba/processor/topology/commit.py", line 20, in process_tuple
    self.process(tup)
  File "/var/storm/supervisor/stormdist/topology-production-13-1423520840/resources/jabba/processor/topology/commit.py", line 32, in process
    self.update(tup)
  File "/var/storm/supervisor/stormdist/topology-production-13-1423520840/resources/jabba/processor/topology/commit.py", line 67, in update
    self.ack(tup)
  File "/var/storm/supervisor/stormdist/topology-production-13-1423520840/resources/pyleus_venv/site-packages/pyleus/storm/bolt.py", line 64, in ack
    'id': tup.id,
  File "/var/storm/supervisor/stormdist/topology-production-13-1423520840/resources/pyleus_venv/site-packages/pyleus/storm/component.py", line 321, in send_command
    self._serializer.send_msg(command_dict)
  File "/var/storm/supervisor/stormdist/topology-production-13-1423520840/resources/pyleus_venv/site-packages/pyleus/storm/serializers/msgpack_serializer.py", line 50, in send_msg
    self._output_stream.flush()
IOError: [Errno 32] Broken pipe: '<fdopen>'