robinhood / faust

Python Stream Processing
Other
6.7k stars 538 forks source link

kafka-ConnectionError in test #770

Closed analopes-creditas closed 1 year ago

analopes-creditas commented 1 year ago

Checklist

Steps to reproduce

Tell us what you did to cause something to happen.?

------ MY AGENT --------

@app.agent(source_topic)
async def consumer(stream):
    async for key, value in stream.items():
        key_event = key.id
        value_event = Event(
            record=json.loads(value.after.json),
            action=value.op,
            status=Status(code=None, message=None, error=None),
        )
        await target_topic.send(key=key_event, value=value_event)
        yield value_event

------ MY TEST --------

@mark.asyncio()
@fixture(scope="session")
def event_loop():
    policy = asyncio.get_event_loop_policy()
    loop = policy.new_event_loop()
    yield app.loop
    loop.close()
@fixture(scope="session")
def test_app(event_loop):
    app.finalize()
    app.conf.store = 'memory://'
    app.flow_control.resume()
    return app
@mark.asyncio()
async def test_consumer_agent(
    test_app, stream_key, stream_value, expected_event
):
    """Testing agent consuming events from topic source_topic and sending to topic target_topic"""

    async with consumer.test_context() as agent:
        event = await agent.put(key=stream_key, value=stream_value)
        assert agent.results[event.message.offset] == expected_event

---- Mock Topic ----

@mark.asyncio()
async def test_consumer_agent(
    test_app, mocker, stream_key, stream_value, expected_event
):
    """Testing agent consuming events from topic source_topic and sending to topic target_topic"""

    def mock_load(key, value):
        return expected_event

    mocker.patch(
        "workers.consumer.target_topic.send",
        mock_load,
    )

    async with consumer.test_context() as agent:
        event = await agent.put(key=stream_key, value=stream_value)
        assert agent.results[event.message.offset] == expected_event

Expected behavior

Tell us what you expected to happen.?

I expected to run test without having a kafka instance running

Actual behavior

Tell us what happened instead.?

when my CI processes the tests I get a kafka client error as I don't have a kafka instance on the server yet. when I try to mock the topic it shows TypeError

Full traceback

---- CI ----

FAILED test/test.py::test_consumer_agent - kafka.errors.ConnectionError: ConnectionError: Unable to bootstrap from [('localhost', 9092, <AddressFamily.AF_UNSPEC: 0>)]

---- Mock ----

FAILED test/test.py::test_consumer_agent - TypeError: object dict can't be used in 'await' expression

Versions

analopes-creditas commented 1 year ago

Solve problem:

@mark.asyncio()
async def test_consumer_agent(
    test_app, mocker, stream_key, stream_value, expected_event
):
    """Testing agent consuming events from topic source_topic and sending to topic target_topic"""

    mocker.patch(
        "workers.consumer.target_topic.send",
        return_value=expected_event,
    )

    async with consumer.test_context() as agent:
        event = await agent.put(key=stream_key, value=stream_value)
        assert agent.results[event.message.offset] == expected_event