Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

Error while consuming from multiple queues: Waiter already exists #105

Closed farefernandez closed 5 years ago

farefernandez commented 8 years ago

I'm trying to set up my code to be able to consume from multiple queues concurrently, using one channel.

Apart from connecting and creating the channel, this is the relevant part of the code:

tasks = []
for u in users:
    tasks.append(channel.basic_consume(callback, queue_name=u.name))
await asyncio.gather(*tasks)

Unfortunately, when I run it, I receive the following Waiter already exists error:

  File "manage.py", line 10, in <module>
    execute_from_command_line(sys.argv)
  File "/home/app/.local/lib/python3.5/site-packages/django/core/management/__init__.py", line 367, in execute_from_command_line
    utility.execute()
  File "/home/app/.local/lib/python3.5/site-packages/django/core/management/__init__.py", line 359, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "/home/app/.local/lib/python3.5/site-packages/django/core/management/base.py", line 305, in run_from_argv
    self.execute(*args, **cmd_options)
  File "/home/app/.local/lib/python3.5/site-packages/django/core/management/base.py", line 356, in execute
    output = self.handle(*args, **options)
  File "/code/callgen/management/commands/filldata.py", line 79, in handle
    loop.run_until_complete(self.connect(loop))
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "/code/callgen/management/commands/filldata.py", line 75, in connect
    await asyncio.gather(*tasks)
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 361, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 296, in _wakeup
    future.result()
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/home/app/.local/lib/python3.5/site-packages/aioamqp/channel.py", line 617, in basic_consume
    'basic_consume', frame, request, no_wait, timeout=timeout)
  File "/home/app/.local/lib/python3.5/site-packages/aioamqp/channel.py", line 196, in _write_frame_awaiting_response
    f = self._set_waiter(waiter_id)
  File "/home/app/.local/lib/python3.5/site-packages/aioamqp/channel.py", line 41, in _set_waiter
    raise exceptions.SynchronizationError("Waiter already exists")
aioamqp.exceptions.SynchronizationError: Waiter already exists

Is there a way to consume from multiple queues using one channel?

Thanks!

farefernandez commented 8 years ago

Someone suggested me to open a channel for each queue, since waiting for messages from a specific queue should pretty much lock the entiner channel. Do you think this is correct?

nhumrich commented 6 years ago

I am running into this issue trying to listen to only a single queue with a single channel