benjamin-hodgson / asynqp

An AMQP library for asyncio
MIT License
84 stars 29 forks source link

How to consume from multiple queues concurrently #84

Closed farefernandez closed 8 years ago

farefernandez commented 8 years ago

I'm trying to read and process messages from multiple queues concurrently. Below you can find the code that I'm using to set-up queues, messages and callbacks.

The code works fine, but now I'd like to issue an http call using aiohttp in my process_msg callback. The thing is: process_msg it is not a coroutine hence I cannot await aiohttp coroutines.

Is there a way to schedule the execution of a coroutine instead of a simple callback when calling queue.consume?

#!/usr/bin/env python3

import asyncio
import asynqp

USERS = {'betty', 'bob', 'luis', 'tony'}

def process_msg(msg):
    print('>> {}'.format(msg.body))
    msg.ack()

async def connect():
    connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test')
    channel = await connection.open_channel()
    exchange = await channel.declare_exchange('inboxes', 'direct')

    # we have 10 users. Set up a queue for each of them
    # use different channels to avoid any interference
    # during message consumption, just in case.
    for username in USERS:
        user_channel = await connection.open_channel()
        queue = await user_channel.declare_queue('Inbox_{}'.format(username))
        await queue.bind(exchange, routing_key=username)
        await queue.consume(process_msg)

    # deliver 10 messages to each user
    for username in USERS:
        for msg_idx in range(10):
            msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username))
            exchange.publish(msg, routing_key=username)

loop = asyncio.get_event_loop()
loop.run_until_complete(connect())
loop.run_forever()
tvoinarovskyi commented 8 years ago

It can be done fairy easy using loop.create_task. Didn't run code below thou.

#!/usr/bin/env python3

import asyncio
import asynqp

USERS = {'betty', 'bob', 'luis', 'tony'}
TASKS = set([])

def process_msg(msg):
    loop = asyncio.get_event_loop()
    task = loop.create_task(process_msg_coro(msg))
    task.add_done_callback(lambda t: TASKS.remove(t))
    TASKS.add(task)

async def process_msg_coro(msg):
    print('>> {}'.format(msg.body))
    # You can now do await's here
    msg.ack()

async def connect():
    connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test')
    channel = await connection.open_channel()
    exchange = await channel.declare_exchange('inboxes', 'direct')

    # we have 10 users. Set up a queue for each of them
    # use different channels to avoid any interference
    # during message consumption, just in case.
    for username in USERS:
        user_channel = await connection.open_channel()
        queue = await user_channel.declare_queue('Inbox_{}'.format(username))
        await queue.bind(exchange, routing_key=username)
        await queue.consume(process_msg)

    # deliver 10 messages to each user
    for username in USERS:
        for msg_idx in range(10):
            msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username))
            exchange.publish(msg, routing_key=username)

loop = asyncio.get_event_loop()
loop.run_until_complete(connect())
try:
    loop.run_forever()
finally:
    if TASKS:
        loop.run_until_complete(asyncio.wait(TASKS))
farefernandez commented 8 years ago

@Drizzt1991 thanks a lot, it works great :smile: BTW, it seems that opening a channel for each queue is not necessary.

farefernandez commented 8 years ago

@Drizzt1991 a question related to your answer. While the processing code works, the cleanup code gives me the error below if I hit CTRL-C to stop the process. My understanding is that there are some tasks waiting for RabbitMQ I/O that are there and should be stopped before exiting. Do you know how this can be achieved?

Thanks!

Traceback (most recent call last):
  File "/code/testparallel3.py", line 67, in <module>
    loop.run_forever()
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 345, in run_forever
    self._run_once()
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 1276, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/local/lib/python3.5/selectors.py", line 441, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
Task was destroyed but it is pending!
task: <Task pending coro=<QueueReader._read_next() running at /usr/local/lib/python3.5/site-packages/asynqp/routing.py:106> wait_for=<Future pending cb=[Task._wakeup()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<QueueReader._read_next() running at /usr/local/lib/python3.5/site-packages/asynqp/routing.py:106> wait_for=<Future pending cb=[Task._wakeup()]>>
Exception ignored in: <generator object QueueReader._read_next at 0x7f4b08d42308>
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/site-packages/asynqp/routing.py", line 106, in _read_next
  File "/usr/local/lib/python3.5/asyncio/queues.py", line 170, in get
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 227, in cancel
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed
Exception ignored in: <generator object QueueReader._read_next at 0x7f4b08d42b48>
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/site-packages/asynqp/routing.py", line 106, in _read_next
  File "/usr/local/lib/python3.5/asyncio/queues.py", line 170, in get
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 227, in cancel
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed