airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.52k stars 128 forks source link

Random failure in CI #532

Closed davorrunje closed 1 year ago

davorrunje commented 1 year ago

We have random failures in CI, e.g. this one failed for test (3.8, pydantic-v1)

..............ss........................................................ [ 12%]
........................................................................ [ 24%]
....................................F
=================================== FAILURES ===================================
________________________ TestConsume.test_consume_batch ________________________

self = <tests.brokers.kafka.test_consume.TestConsume object at 0x7f1cd8fc2730>
queue = '103c2da9-a01e-4122-b6db-df3e91dba54e'
broker = <faststream.kafka.broker.KafkaBroker object at 0x7f1cd66a2d60>

    @pytest.mark.asyncio
    async def test_consume_batch(self, queue: str, broker: KafkaBroker):
        msgs_queue = asyncio.Queue(maxsize=1)

        @broker.subscriber(queue, batch=True)
        async def handler(msg):
            await msgs_queue.put(msg)

        async with broker:
            await broker.start()

            await broker.publish_batch(1, "hi", topic=queue)

            result, _ = await asyncio.wait(
                (asyncio.create_task(msgs_queue.get()),),
                timeout=3,
            )

>       assert [{1, "hi"}] == [set(r.result()) for r in result]
E       AssertionError: assert [{1, 'hi'}] == []
E         Left contains one more item: {1, 'hi'}
E         Full diff:
E         - []
E         + [{1, 'hi'}]

tests/brokers/kafka/test_consume.py:29: AssertionError
----------------------------- Captured stdout call -----------------------------
2023-09-05 [14](https://github.com/airtai/fastkafka/actions/runs/6085650665/job/16510248152#step:10:15):01:56,029 INFO     - 103c2da9-a01e-4122-b6db-df3e91dba54e |            - `Handler` waiting for messages
------------------------------ Captured log call -------------------------------
ERROR    aiokafka.cluster:cluster.py:90 Topic 103c2da9-a01e-4122-b6db-df3e91dba54e not found in cluster metadata
=========================== short test summary info ============================
FAILED tests/brokers/kafka/test_consume.py::TestConsume::test_consume_batch - AssertionError: assert [{1, 'hi'}] == []
  Left contains one more item: {1, 'hi'}
  Full diff:
  - []
  + [{1, 'hi'}]
!!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!
1 failed, 178 passed, 2 skipped in 18.07s
..............ss........................................................ [ 12%]
........................................................................ [ 24%]
......................................F
=================================== FAILURES ===================================
______________________ TestRabbitRouter.test_double_real _______________________

self = <tests.brokers.kafka.test_fastapi.TestRabbitRouter object at 0x7ff61073ff40>
mock = <Mock id='140694771886832'>
queue = '4a84a7f3-c42f-434a-965f-b8082a504e61'
event = <asyncio.locks.Event object at 0x7ff60deccfa0 [set]>

    async def test_double_real(self, mock: Mock, queue: str, event: asyncio.Event):
        event2 = asyncio.Event()
        router = self.router_class()

        @router.subscriber(queue)
        @router.subscriber(queue + "2")
        async def hello(msg):
            if event.is_set():
                event2.set()
            else:
                event.set()
            mock()

        async with router.broker:
            await router.broker.start()
            await asyncio.wait(
                (
                    asyncio.create_task(router.broker.publish("hi", queue)),
                    asyncio.create_task(router.broker.publish("hi", queue + "2")),
                    asyncio.create_task(event.wait()),
                    asyncio.create_task(event2.wait()),
                ),
                timeout=3,
            )

        assert event.is_set()
>       assert event2.is_set()
E       AssertionError

tests/brokers/base/fastapi.py:69: AssertionError
----------------------------- Captured stdout call -----------------------------
2023-09-05 14:02:[15](https://github.com/airtai/fastkafka/actions/runs/6085650665/job/16510248152#step:10:16),084 INFO     - 4a84a7f3-c42f-434a-965f-b8082a504e612 |            - `Hello` waiting for messages
2023-09-05 14:02:15,202 INFO     - 4a84a7f3-c42f-434a-965f-b8082a504e61  |            - `Hello` waiting for messages
2023-09-05 14:02:15,331 INFO     - 4a84a7f3-c42f-434a-965f-b8082a504e612 | 0-[16](https://github.com/airtai/fastkafka/actions/runs/6085650665/job/16510248152#step:10:17)939225 - Received
2023-09-05 14:02:15,332 INFO     - 4a84a7f3-c42f-434a-965f-b8082a504e612 | 0-16939225 - Processed
------------------------------ Captured log call -------------------------------
ERROR    aiokafka.cluster:cluster.py:90 Topic 4a84a7f3-c42f-434a-965f-b8082a504e612 not found in cluster metadata
ERROR    aiokafka.cluster:cluster.py:90 Topic 4a84a7f3-c42f-434a-965f-b8082a504e61 not found in cluster metadata
=========================== short test summary info ============================
FAILED tests/brokers/kafka/test_fastapi.py::TestRabbitRouter::test_double_real - AssertionError
!!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!
1 failed, 180 passed, 2 skipped in 18.28s
..............ss........................................................ [ 12%]
........................................................................ [ 24%]
........................................................................ [ 37%]
...................F
=================================== FAILURES ===================================
_______ TestRouter.test_local_middleware_not_shared_between_subscribers ________

self = <tests.brokers.kafka.test_router.TestRouter object at 0x7fc6fd3bdf10>
queue = '7b27e29a-4d54-4e15-9316-db5fce8a30a5'
mock = <Mock id='140492604865888'>
raw_broker = <faststream.kafka.broker.KafkaBroker object at 0x7fc6fbc83df0>

    async def test_local_middleware_not_shared_between_subscribers(
        self, queue: str, mock: Mock, raw_broker
    ):
        event1 = asyncio.Event()
        event2 = asyncio.Event()

        class mid(BaseMiddleware):
            async def on_receive(self):
                mock.start(self.msg)
                return await super().on_receive()

            async def after_processed(self, exc_type, exc_val, exec_tb):
                mock.end()
                return await super().after_processed(exc_type, exc_val, exec_tb)

        broker = self.broker_class()

        @broker.subscriber(queue)
        @broker.subscriber(queue + "1", middlewares=(mid,))
        async def handler(m):
            if event1.is_set():
                event2.set()
            else:
                event1.set()
            mock()
            return ""

        broker = self.patch_broker(raw_broker, broker)

        async with broker:
            await broker.start()
            await asyncio.wait(
                (
                    asyncio.create_task(broker.publish("", queue)),
                    asyncio.create_task(broker.publish("", queue + "1")),
                    asyncio.create_task(event1.wait()),
                    asyncio.create_task(event2.wait()),
                ),
                timeout=3,
            )

        assert event1.is_set()
>       assert event2.is_set()
E       AssertionError

tests/brokers/base/middlewares.py:101: AssertionError
----------------------------- Captured stdout call -----------------------------
[20](https://github.com/airtai/fastkafka/actions/runs/6085650665/job/16510248152#step:10:21)23-09-05 14:02:41,848 INFO     - 7b27e29a-4d54-4e15-9316-db5fce8a30a51 |            - `Handler` waiting for messages
2023-09-05 14:02:41,967 INFO     - 7b27e29a-4d54-4e15-9316-db5fce8a30a5  |            - `Handler` waiting for messages
2023-09-05 14:02:42,093 INFO     - 7b27e29a-4d54-4e15-9316-db5fce8a30a51 | 0-16939[22](https://github.com/airtai/fastkafka/actions/runs/6085650665/job/16510248152#step:10:23)5 - Received
20[23](https://github.com/airtai/fastkafka/actions/runs/6085650665/job/16510248152#step:10:24)-09-05 14:02:42,094 INFO     - 7b27e29a-4d54-4e15-9316-db5fce8a30a51 | 0-169392[25](https://github.com/airtai/fastkafka/actions/runs/6085650665/job/16510248152#step:10:26) - Processed
------------------------------ Captured log call -------------------------------
ERROR    aiokafka.cluster:cluster.py:90 Topic 7b[27](https://github.com/airtai/fastkafka/actions/runs/6085650665/job/16510248152#step:10:28)e[29](https://github.com/airtai/fastkafka/actions/runs/6085650665/job/16510248152#step:10:30)a-4d54-4e15-9316-db5fce8a[30](https://github.com/airtai/fastkafka/actions/runs/6085650665/job/16510248152#step:10:31)a51 not found in cluster metadata
ERROR    aiokafka.cluster:cluster.py:90 Topic 7b27e29a-4d54-4e15-9[31](https://github.com/airtai/fastkafka/actions/runs/6085650665/job/16510248152#step:10:32)6-db5fce8a30a5 not found in cluster metadata
=========================== short test summary info ============================
FAILED tests/brokers/kafka/test_router.py::TestRouter::test_local_middleware_not_shared_between_subscribers - AssertionError
!!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!
1 failed, 2[33](https://github.com/airtai/fastkafka/actions/runs/6085650665/job/16510248152#step:10:34) passed, 2 skipped in 25.[85](https://github.com/airtai/fastkafka/actions/runs/6085650665/job/16510248152#step:10:86)s
Lancetnik commented 1 year ago

This is due to the fact that we are using the real containers in our CI: lost packages, broken connetction, etc The only one solution - retries We should use a real brokers to test our project (cuz peoples will use it with the real one in their cases), so failures will happens

davorrunje commented 1 year ago

Maybe we should add up to 3 retries on the tests that fail frequently. As far as I can tell, there are only a few of those.