I am able to send and receive the message on topic. I want to write unit test case using pytest python. I am facing some problem to write the test_case for target topic(bar_topic)
I have two topics, foo_topic and bar_topic
foo as an agent to receive the data, and send to bar_topic
@app.agent(foo_topic)
async def foo(foo_topic):
async for value in foo:
print(" value)
await bar_topic.send(value) // sending message to bar_topic (bar_topic as a variable here)
+++++++++++++++++++++++++++++++++++++++++++
Now added a new file test_example.py:
_testexample.py
import faust
import pytest
from example import app, foo, foo_topic, bar_topic
@pytest.fixture()
def test_app(event_loop):
"""passing in event_loop helps avoid 'attached to a different loop' error"""
app.finalize()
app.conf.store = 'memory://'
app.flow_control.resume()
return app
@pytest.mark.asyncio()
async def test_foo_topic():
async with foo.test_context() as agent:
data = Order(account_id='1', product_id='2')
event = await agent.put(key="key1", value=data) // it is sending message to foo_topic
//here I want to show the value of bar_topic
async with bar_topic.test_context() as agent:
event = await agent.put(value = data) // don't want to use put(), just want to fetch the the data of bar_topic
I want to apply assertion here on bar_topic receiving data
event = await agent.put(key="key1", value=data) // it is sending message to foo_topic
assert value.key=='something'
in test_example.py I am able to send the data on foo_topic.
how to get/show the data on bar_topic and apply assertion on that. can you please looking forward to this.
Hi Guys, I have created a faust python app.
I am able to send and receive the message on topic. I want to write unit test case using pytest python. I am facing some problem to write the test_case for target topic(bar_topic) I have two topics, foo_topic and bar_topic
Like: Example.py
import faust
app = faust.App('example-test-agent-call1', broker='kafka://localhost')
class Order(faust.Record, serializer='json'): account_id: str product_id: str
foo_topic = app.topic('foo', value_type=Order) bar_topic = app.topic('bar', value_type=Order)
foo as an agent to receive the data, and send to bar_topic
@app.agent(foo_topic) async def foo(foo_topic): async for value in foo: print(" value) await bar_topic.send(value) // sending message to bar_topic (bar_topic as a variable here)
+++++++++++++++++++++++++++++++++++++++++++ Now added a new file test_example.py: _testexample.py import faust import pytest from example import app, foo, foo_topic, bar_topic
@pytest.fixture() def test_app(event_loop): """passing in event_loop helps avoid 'attached to a different loop' error""" app.finalize() app.conf.store = 'memory://' app.flow_control.resume() return app
@pytest.mark.asyncio() async def test_foo_topic(): async with foo.test_context() as agent: data = Order(account_id='1', product_id='2') event = await agent.put(key="key1", value=data) // it is sending message to foo_topic
in test_example.py I am able to send the data on foo_topic. how to get/show the data on bar_topic and apply assertion on that. can you please looking forward to this.
Referring: https://faust.readthedocs.io/en/latest/userguide/testing.html(I am using the same scenario) , but don't want to use:
Thanks