python-streamz / streamz

Real-time stream processing for python
https://streamz.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.24k stars 148 forks source link

missing positional argument: 'topic' in to_mqtt #453

Closed dspruell closed 2 years ago

dspruell commented 2 years ago

Using:

I'm encountering this traceback when trying to invoke Stream.to_mqtt():

Traceback (most recent call last):
  File "test.py", line 47, in <module>
    run()
  File "test.py", line 32, in run
    output = Stream.to_mqtt(
  File "/usr/local/pyvenv/logpipe/lib/python3.8/site-packages/streamz/core.py", line 161, in wrapped
    return func(*args, **kwargs) 
TypeError: __init__() missing 1 required positional argument: 'topic'

Test: https://gist.github.com/dspruell/308b94de3e0bb2df083389cbbb0b0ac1#file-test-py

I may not understand the API right, but:

https://streamz.readthedocs.io/en/latest/api.html#streamz.Stream.to_mqtt

Documented as:

Stream.to_mqtt(host, port, topic, keepalive=60, client_kwargs=None, **kwargs)

In to_mqtt:

https://github.com/python-streamz/streamz/blob/8744c83a0fad1fbd9ee9318d1a79ee538415a4e4/streamz/sinks.py#L252-L253

Is upstream eating one of the args?

martindurant commented 2 years ago

I am not sure how you are calling this, but it should be something like

s = Stream()
s2 = s.map(..).aggregate(..)
s2.to_mqtt(host, port, topic)

where map and aggregate and other elements are whatever stages your pipeline might have. The test shows an example of usage.

If you get this working, please do suggest better documentation!

dspruell commented 2 years ago

@martindurant Thank you! Indeed I was calling it incorrectly. The bulk of the documentation kind of leads me to think that the end of a pipeline typically uses sink() to terminate a pipeline (~ final delivery/output), but network transport outputs obviously work differently. I'll try to see if I can suggest documentation around this. Would an API that uses sink() to allow message output for any configured output (e.g. a MQTT transport) as an argument make any sense (based on it being a consistent pattern for passing messages out from the pipeline and terminating it?

s.map(foo).sink(my_mqtt)

I was also curious to ask if a new release of streamz might be in the works, potentially with MQTT support (maybe as a RC)?

martindurant commented 2 years ago

Right, the sink method is for an ad-hoc function (func: callable A function that will be applied on every element); there are also a bunch of these to_* methods which, instead, create node instances. I guess the docstring for .sink could mention this.

martindurant commented 2 years ago

but yes, we should do a release, to answer the second part of your question :)

martindurant commented 2 years ago

Released 0.6.4