pystorm / streamparse

Run Python in Apache Storm topologies. Pythonic API, CLI tooling, and a topology DSL.
http://streamparse.readthedocs.io/
Apache License 2.0
1.5k stars 218 forks source link

Dictionary inputs fail for BoltSpec with more than 5 inputs #452

Open colebaileygit opened 5 years ago

colebaileygit commented 5 years ago

At the end of my topology, I am creating a bolt roughly as follows:

 ## Final Node which captures all data and formats response
    fulfillment_bolt = JavaBolt.spec(full_class_name="bolt.FulfillmentBolt",
                                    args_list=[],
                                    inputs={
                                        nlg_bolt['default']: Grouping.fields('msgId'),
                                        audio_arousal['default']: Grouping.fields('msgId'),
                                        audio_emotion['default']: Grouping.fields('msgId'),
                                        text_sentiment_LIWC['default']: Grouping.fields('msgId'),
                                        text_sentiment_LSTM['default']: Grouping.fields('msgId'),
                                        text_sentiment_CNN['default']: Grouping.fields('msgId'),
                                        text_sentiment_google['default']: Grouping.fields('msgId'),
                                        text_emotion['default']: Grouping.fields('msgId')
                                    })

The above snippet is the current work-around - using ['default'] to get the GlobalStreamId for each input component.

The error occurs if I remove the ['default'] by each input.

When I limit it to any 5 components, the code builds successfully. But after I add a 6th input component (no matter which one I add) I get the following error:

File "topologies/dialog.py", line 9, in class Dialog(Topology): File "/home/me/anaconda3/lib/python3.6/site-packages/streamparse/dsl/topology.py", line 36, in new TopologyType.clean_spec_inputs(spec, specs) File "/home/me/anaconda3/lib/python3.6/site-packages/streamparse/dsl/topology.py", line 104, in clean_spec_inputs if isinstance(stream_id.componentId, ComponentSpec): AttributeError: 'JavaBoltSpec' object has no attribute 'componentId'

I found that clean_spec_inputs simply maps each ComponentSpec to a GlobalStreamId using the ['default'] and so I'm using this hack for now.

When I add 6 components, 5 will be transformed to GlobalStreamId if I debug using print(fulfillment_bolt.inputs) but the 6th one is still a JavaBoltSpec.

It always seems to be the first component returned by bolt.inputs.items() which has not been transformed properly.

Any idea what is going wrong here?