pystorm / streamparse

Run Python in Apache Storm topologies. Pythonic API, CLI tooling, and a topology DSL.
http://streamparse.readthedocs.io/
Apache License 2.0
1.5k stars 218 forks source link

Parallelism hint not propagated to Flux YAML Topology #364

Closed rwatler closed 7 years ago

rwatler commented 7 years ago

Python Topology DSL parallelism hints for spouts and bolts are not included in the Flux YAML DSL passed into Storm from a local sparse run invocation. This appears to be an oversight in the code, (see below), so this should be evident using any topology including the sample wordcount.

For example, the following bolt declaration in the Python DSL:

    uniques_bolt = \
        UniquesBolt.spec(name=b'uniques_bolt',
                         inputs={redis_spout[b'events']: Grouping.SHUFFLE},
                         par=2)

Resulted in this YAML DSL:

- className: org.apache.storm.flux.wrappers.bolts.FluxShellBolt
  configMethods:
  - args:
    - events
    - [event]
    name: setNamedStream
  constructorArgs:
  - [streamparse_run, bolts.uniques.UniquesBolt]
  id: uniques_bolt

There is obviously no parallelism hint included here. Looking at the log files generated by the running bolts, it was clear that there was only one executor running after invoking sparse run:

pystorm_events_uniques_bolt_12_4674.log

After a little digging, I patched streamparse/dsl/topology.py to include the parallelism hint:

# diff -u topology.py.orig topology.py
--- topology.py.orig    2017-04-28 10:01:40.645000000 -0400
+++ topology.py 2017-04-28 09:45:53.779000000 -0400
@@ -210,6 +210,7 @@
                             spec.outputs[output_stream].output_fields
                         ]
                     })
+            flux_dict['parallelism'] = spec.par
         else:
             if spec.component_object.serialized_java is not None:
                 raise TypeError('Flux does not support specifying serialized '

Afterwards, the YAML DSL looked like this:

- className: org.apache.storm.flux.wrappers.bolts.FluxShellBolt
  configMethods:
  - args:
    - events
    - [event]
    name: setNamedStream
  constructorArgs:
  - [streamparse_run, bolts.uniques.UniquesBolt]
  id: uniques_bolt
  parallelism: 2

And the expected number of executor logs appeared:

pystorm_events_uniques_bolt_16_17514.log
pystorm_events_uniques_bolt_17_17534.log

I am not sure this patch is absolutely correct, but it illustrates the problem and a potential solution. For reference, we are using Streamparse 3.4.0 on Storm 1.1.0, (CentOS 7.2, Python 2.7).

dan-blanchard commented 7 years ago

Thanks for the detail report and patch. I believe your solution is correct, so it would be greatly appreciated if you could submit it as a PR.