alm0ra / mockafka-py

Mockafka-py is a Python library designed for in-memory mocking of Kafka.[aiokafka - confluence-kafka-python]
https://mockafka-py.readthedocs.io
MIT License
47 stars 11 forks source link

Basic use case examples not working: `fixture 'message' not found` #175

Open th0ger opened 3 weeks ago

th0ger commented 3 weeks ago

Describe the bug Following the basic use cases in the README does not work. pytest returns fixture 'message' not found. Tested with poetry and pip.

To Reproduce

poetry init
poetry add mockafka-py
poetry shell
### test.py ###
from mockafka import produce, consume

@produce(topic='test', key='test_key', value='test_value', partition=4)
@consume(topics=['test'])
def test_produce_and_consume_decorator(message):
    """
    This test showcases the usage of both @produce and @consume decorators in a single test case.
    It produces a message to the 'test' topic and then consumes it to perform further logic.
    # Notice you may get message None
    """
    # Your test logic for processing the consumed message here

    if not message:
        return

    pass
$ pytest test.py

=================================================================================================================================================== test session starts ====================================================================================================================================================
platform linux -- Python 3.11.6, pytest-8.3.3, pluggy-1.5.0
rootdir: 
configfile: pyproject.toml
plugins: asyncio-0.23.8, cov-5.0.0
asyncio: mode=Mode.STRICT
collected 1 item

test.py E                                                                                                                                                                                                                                                                                                            [100%]

========================================================================================================================================================== ERRORS ==========================================================================================================================================================
___________________________________________________________________________________________________________________________________ ERROR at setup of test_produce_and_consume_decorator ___________________________________________________________________________________________________________________________________
file test.py, line 4
  @produce(topic='test', key='test_key', value='test_value', partition=4)
  @consume(topics=['test'])
  def test_produce_and_consume_decorator(message):
E       fixture 'message' not found
>       available fixtures: _session_event_loop, cache, capfd, capfdbinary, caplog, capsys, capsysbinary, cov, doctest_namespace, event_loop, event_loop_policy, monkeypatch, no_cover, pytestconfig, record_property, record_testsuite_property, record_xml_attribute, recwarn, test.py::<event_loop>, tmp_path, tmp_path_factory, tmpdir, tmpdir_factory, unused_tcp_port, unused_tcp_port_factory, unused_udp_port, unused_udp_port_factory
>       use 'pytest --fixtures [testpath]' for help on them.

Expected behavior Test passes successfully.

Desktop (please complete the following information):

Nothooon commented 3 weeks ago

Same here with:

You can work arround it by using the FakeConsumer and FakeProducer classes.

In my case, I needed to pass a Kafka Message to the function I'm testing, so I added this function in my tests file:

import pytest
import json

from confluent_kafka import Message
from mockafka import FakeConsumer, FakeProducer, setup_kafka

def get_fake_kafka_message(payload: dict) -> Message:
    consumer = FakeConsumer()
    producer = FakeProducer()

    consumer.subscribe(['test.topic'])
    producer.produce(
        key='test_key',
        value=json.dumps(payload),
        topic='test.topic',
        partition=0
    )

    message = consumer.poll()

    return message

Then, in my test:

@setup_kafka(topics=[{"topic": "test.topic", "partition": 1}])
def test_run_creation_message():
    my_payload = {...}
    message = get_fake_kafka_message(my_payload)

   # Rest of my test

It's a bit annoying to have to jump through so many hoops, but at least we can work arround it while waiting for a fix to this bug :/

andreaturli commented 2 days ago

is it possible to use a FakeProducer and a real Consumer in a test with this approach?

    @asetup_kafka(topics=[{'topic': 'test_topic', 'partition': 16}], clean=True)
    @aproduce(topic='test_topic', value='test_value', key='test_key', partition=0)
    async def test_produce_with_decorator(self):
        consumer = AIOKafkaConsumer('test_topic', bootstrap_servers='localhost:9092')
        await consumer.start()
        consumer.subscribe(['test_topic'])
        message = await consumer.getone()

        assert message.key == b'test_key'
        assert message.value == b'test_value'

This doesn't seem to work as

>           raise KafkaConnectionError(f"No connection to node with id {node_id}")
E           aiokafka.errors.KafkaConnectionError: KafkaConnectionError: No connection to node with id 1