robinhood / faust

Python Stream Processing
Other
6.73k stars 535 forks source link

Awaiting 2 asks #619

Open TsvetanRangelov opened 4 years ago

TsvetanRangelov commented 4 years ago

Checklist

Steps to reproduce

 From within an agent, I wanted to asynchronously send requests but I cannot seem to gather the awaits.

data = await asyncio.gather( agent1.ask(value=article), agent2.ask(value=article) )

Tell us what you did to cause something to happen.

Expected behavior

Expected to get the two results

Actual behavior

Crashes with File "/opt/anaconda3/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 1052, in subscribe "You should provide either topics or pattern") ValueError: You should provide either topics or pattern

Full traceback


[2020-07-20 18:30:06,578] [54387] [ERROR] Exception in callback Topic._on_published(message=<FutureMessag...stamp_type=0)>, state={}, producer=<Producer: running >)(<Future finis...stamp_type=0)>)
handle: <Handle Topic._on_published(message=<FutureMessag...stamp_type=0)>, state={}, producer=<Producer: running >)(<Future finis...stamp_type=0)>)> 
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/opt/anaconda3/lib/python3.7/site-packages/faust/topics.py", line 450, in _on_published
    self.app.sensors.on_send_completed(producer, state, res)
  File "/opt/anaconda3/lib/python3.7/site-packages/faust/sensors/base.py", line 251, in on_send_completed
    sensor.on_send_completed(producer, state[sensor], metadata)
KeyError: <Monitor: init >
[2020-07-20 18:30:06,587] [54387] [ERROR] Exception in callback Topic._on_published(message=<FutureMessag...stamp_type=0)>, state={}, producer=<Producer: running >)(<Future finis...stamp_type=0)>)
handle: <Handle Topic._on_published(message=<FutureMessag...stamp_type=0)>, state={}, producer=<Producer: running >)(<Future finis...stamp_type=0)>)> 
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/opt/anaconda3/lib/python3.7/site-packages/faust/topics.py", line 450, in _on_published
    self.app.sensors.on_send_completed(producer, state, res)
  File "/opt/anaconda3/lib/python3.7/site-packages/faust/sensors/base.py", line 251, in on_send_completed
    sensor.on_send_completed(producer, state[sensor], metadata)
KeyError: <Monitor: init >
[2020-07-20 18:30:06,589] [54387] [ERROR] [^--Conductor]: Crashed reason=ValueError('You should provide either `topics` or `pattern`') 
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/site-packages/mode/services.py", line 779, in _execute_task
    await task
  File "/opt/anaconda3/lib/python3.7/site-packages/faust/transport/conductor.py", line 238, in _subscriber
    await self.app.consumer.subscribe(await self._update_indices())
  File "/opt/anaconda3/lib/python3.7/site-packages/faust/transport/consumer.py", line 1273, in subscribe
    await self._thread.subscribe(topics=topics)
  File "/opt/anaconda3/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 513, in subscribe
    listener=self._rebalance_listener,
  File "/opt/anaconda3/lib/python3.7/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/opt/anaconda3/lib/python3.7/site-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/opt/anaconda3/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 1052, in subscribe
    "You should provide either `topics` or `pattern`")
ValueError: You should provide either `topics` or `pattern````

# Versions
python 3.7.6
faust 1.10.4
kafka 2.12
* Python version
* Faust version
* Operating system
* Kafka version
* RocksDB version (if applicable)
Locustv2 commented 3 years ago

@TsvetanRangelov were you able to figure out this issue/