mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Best Practice with Pytest ? #514

Open dgarros opened 1 year ago

dgarros commented 1 year ago

I was wondering if there are some resources available (doc or library) to help get started with aio-pika and pytest ?

I'm specifically looking for a clean solution to mock a RPC call, something similar to pytest-http which make it really easy to mock a response to a request ahead of time (example below)

To be precise, I would like to test a FastAPI endpoint that is making some RPC calls and I would like to mock the response to these calls as much as possible.

import httpx
from pytest_httpx import HTTPXMock

def test_json(httpx_mock: HTTPXMock):
    httpx_mock.add_response(json=[{"key1": "value1", "key2": "value2"}])

    with httpx.Client() as client:
        assert client.get("https://test_url").json() == [{"key1": "value1", "key2": "value2"}]

thanks

maciossek commented 1 year ago

our unit tests run through a locally running DB and rabbitMQ. Not sure if this is of any help to you, but also in Github we spin up a rabbitMQ Service run the tests and shut it down. In order to make this work there was no way around having to add a await asyncio.sleep(1) there:

async def test_subscribe():
    # that one subsripes to "test"
    rabbitmq_connection = await subscribe_topic()
    exchange = rabbitmq_connection._exchange

    message_payload = [{"uuid": "00000000-1337-0000-0000-000000000001", "data": "test"}]
    message = Message(body=json.dumps(message_payload).encode(), content_type="application/json")
    await exchange.publish(message, routing_key="test")
    await asyncio.sleep(1)

    # We check if a Db entry has been updated so sth like:
    assert get_from_db() == "DONE"
Lancetnik commented 1 year ago

@dgarros you can see an example RabbitMQ-based services testing with Propan framework here

Underground it uses a quote simple code: building IncomingMessage object directly from code and call handler function with builded object like a regular function. You can find sources here. This way you can test aio-pika services too.