pystorm / streamparse

Run Python in Apache Storm topologies. Pythonic API, CLI tooling, and a topology DSL.
http://streamparse.readthedocs.io/
Apache License 2.0
1.5k stars 218 forks source link

TypeError when changing the serializer #428

Open kbruegge opened 6 years ago

kbruegge commented 6 years ago

Hello,

I'm trying to use the msgpack serializer with streamparse. Here is my current configuration:

{
    "serializer": "msgpack",
    "topology_specs": "topologies/",
    "virtualenv_specs": "virtualenvs/",
    "envs": {
        "prod": {
            "user": "",
            "ssh_password": "",
            "nimbus": "",
            "workers": [],
            "log": {
                "path": "",
                "max_bytes": 1000000,
                "backup_count": 10,
                "level": "info"
            },
            "virtualenv_root": ""
        }
    }
}

I also added a dependency tot he project.clj.

(defproject wordcount "0.0.1-SNAPSHOT"
  :resource-paths ["_resources"]
  :target-path "_build"
  :min-lein-version "2.0.0"
  :jvm-opts ["-client"]
  :dependencies  [[org.apache.storm/storm-core "1.2.1"]
                  [org.apache.storm/flux-core "1.2.1"]
                  [org.msgpack/msgpack "0.6.12"]]
  :jar-exclusions     [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
  :uberjar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
  )

I get an exception when executing the topology using sparse run:


14061 [Thread-20-count_bolt-executor[3 3]] ERROR o.a.s.util - Async loop died!
java.lang.RuntimeException: org.apache.storm.multilang.NoOutputException: Pipe to subprocess seems to be broken! No output read.
Serializer Exception:
Traceback (most recent call last):
  File "/usr/local/bin/streamparse_run", line 11, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.6/site-packages/streamparse/run.py", line 45, in main
    cls(serializer=args.serializer).run()
  File "/usr/local/lib/python3.6/site-packages/pystorm/component.py", line 483, in run
    storm_conf, context = self.read_handshake()
  File "/usr/local/lib/python3.6/site-packages/pystorm/component.py", line 311, in read_handshake
    pid_dir, _conf, _context = msg['pidDir'], msg['conf'], msg['context']
TypeError: 'int' object is not subscriptable

    at org.apache.storm.utils.ShellProcess.launch(ShellProcess.java:96) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.task.ShellBolt.prepare(ShellBolt.java:153) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.executor$fn__5043$fn__5056.invoke(executor.clj:803) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482) [storm-core-1.2.1.jar:1.2.1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102]

Thanks for your help.

dan-blanchard commented 6 years ago

What Java implementation of the msgpack serializer are you using? There is no official one yet, and the one from the now-abandoned Pyleus project would cause mysterious issues for us when we used it under load.

kbruegge commented 6 years ago

Hmm I'm guessed we are a bit confused about how to this at all. We thought changing the serializer in the config.json would suffice. I cant seem to find the docs on how to do that properly. We are trying to serialize small numpy arrays and scalars. We found this PR here issue and thought msgpack would be part of pystorm now. https://github.com/pystorm/pystorm/pull/1

dan-blanchard commented 6 years ago

Yeah, the serializer setting isn't really documented in streamparse (or pystorm) because we were waiting on official upstream support for msgpack that has yet to materialize. When we first added it to pystorm, Pyleus was still under active development and they had created the necessary Java version of things, but that code is very out of date at this point and doesn't work on Storm 1.x.

There's an old Storm PR (apache/Storm#1136), but someone else will probably need to pick up the work there and make it use a more modern version of the Java messagepack library.