reagento / dishka

Cute DI framework with agreeable API and everything you need
https://dishka.readthedocs.io
Apache License 2.0
358 stars 39 forks source link

Faststream testing problem #221

Closed own3dh4rd closed 3 weeks ago

own3dh4rd commented 3 weeks ago

Hi guys, great library! Could you please explain how to test with FastStream?

Here is a code example that isn't working for me:

Details

``` import pytest from unittest.mock import create_autospec import pytest_asyncio from pydantic import BaseModel from faststream import FastStream from faststream.kafka import KafkaBroker, TestKafkaBroker from dishka import Provider, provide, Scope, FromDishka, make_async_container from dishka.integrations.faststream import FastStreamProvider, setup_dishka class Task(BaseModel): pass class Interactor: async def do(self, task: Task): pass class InteractorProvider(Provider): @provide(scope=Scope.APP) async def provide_interactor(self) -> Interactor: return Interactor() broker = KafkaBroker() subscriber = broker.subscriber("in") publisher = broker.publisher("out") @subscriber @publisher async def handler( task: Task, interactor: FromDishka[Interactor] ) -> Task: await interactor.do(task) return task class MockedInteractorProvider(Provider): @provide(scope=Scope.APP) async def provide_interactor(self) -> Interactor: return create_autospec(Interactor) @pytest_asyncio.fixture def container(): return make_async_container( MockedInteractorProvider(), FastStreamProvider(), ) @pytest_asyncio.fixture async def test_broker(container): app = FastStream(broker) setup_dishka(container, app, auto_inject=True, finalize_container=True) async with TestKafkaBroker(broker) as br: await app.start() yield br await app.stop() @pytest_asyncio.fixture async def mocked_interactor(container): return await container.get(Interactor) @pytest.mark.asyncio async def test_handle(test_broker, mocked_interactor): task = Task() await test_broker.publish(task, topic="in") handler.mock.assert_called_once() publisher.mock.assert_called_once() mocked_interactor.do.assert_called_once_with(task) ```

Traceback:

Details

``` FAILED [100%] t.py:71 (test_handle) test_broker = mocked_interactor = @pytest.mark.asyncio async def test_handle(test_broker, mocked_interactor): task = Task() > await test_broker.publish(task, topic="in") mocked_interactor = task = Task() test_broker = t.py:76: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .venv/lib/python3.11/site-packages/faststream/kafka/broker/broker.py:732: in publish return await super().publish( __class__ = correlation_id = '2c4a9990-ed1a-48c7-b612-0f3e0ee05008' headers = None key = None kwargs = {} message = Task() partition = None reply_to = '' self = timestamp_ms = None topic = 'in' .venv/lib/python3.11/site-packages/faststream/broker/core/usecase.py:344: in publish return await publish(msg, **kwargs) kwargs = {'correlation_id': '2c4a9990-ed1a-48c7-b612-0f3e0ee05008', 'headers': None, 'key': None, 'partition': None, 'reply_to': '', 'timestamp_ms': None, 'topic': 'in'} m = msg = Task() producer = publish = functools.partial(>, >) self = .venv/lib/python3.11/site-packages/faststream/broker/middlewares/base.py:115: in publish_scope await self.after_publish(err) args = () call_next = > err = TypeError("object NoneType can't be used in 'await' expression") kwargs = {'correlation_id': '2c4a9990-ed1a-48c7-b612-0f3e0ee05008', 'headers': None, 'key': None, 'partition': None, 'reply_to': '', 'timestamp_ms': None, 'topic': 'in'} msg = Task() self = .venv/lib/python3.11/site-packages/faststream/broker/middlewares/base.py:90: in after_publish raise err err = TypeError("object NoneType can't be used in 'await' expression") self = .venv/lib/python3.11/site-packages/faststream/broker/middlewares/base.py:102: in publish_scope result = await call_next( args = () call_next = > err = TypeError("object NoneType can't be used in 'await' expression") kwargs = {'correlation_id': '2c4a9990-ed1a-48c7-b612-0f3e0ee05008', 'headers': None, 'key': None, 'partition': None, 'reply_to': '', 'timestamp_ms': None, 'topic': 'in'} msg = Task() self = .venv/lib/python3.11/site-packages/faststream/kafka/testing.py:114: in publish handle_value = await call_handler( correlation_id = '2c4a9990-ed1a-48c7-b612-0f3e0ee05008' handler = headers = None incoming = ConsumerRecord(topic='in', partition=0, offset=0, timestamp=1724246108, timestamp_type=0, key=b'', value=b'{}', checksum=248, serialized_key_size=0, serialized_value_size=2, headers=[('content-type', b'application/json'), ('correlation_id', b'2c4a9990-ed1a-48c7-b612-0f3e0ee05008')]) key = None message = Task() partition = None raise_timeout = False reply_to = '' return_value = None rpc = False rpc_timeout = None self = timestamp_ms = None topic = 'in' .venv/lib/python3.11/site-packages/faststream/testing/broker.py:240: in call_handler result = await handler.process_message(message) handler = message = ConsumerRecord(topic='in', partition=0, offset=0, timestamp=1724246108, timestamp_type=0, key=b'', value=b'{}', checksum=248, serialized_key_size=0, serialized_value_size=2, headers=[('content-type', b'application/json'), ('correlation_id', b'2c4a9990-ed1a-48c7-b612-0f3e0ee05008')]) raise_timeout = False rpc = False rpc_timeout = None .venv/lib/python3.11/site-packages/faststream/broker/subscriber/usecase.py:379: in process_message await p.publish( base_m = cache = {>: KafkaMessage(raw_message=ConsumerRecord(topic='in', partition=0, offset=0, timestamp=1724246108, timestamp_type=0, key=b'', value=b'{}', checksum=248, serialized_key_size=0, serialized_value_size=2, headers=[('content-type', b'appl' b'icat' b'ion/' b'json'), ('correlation_id', b'2c4a' b'9990' b'-ed1' b'a-48' b'c7-b' b'612-' b'0f3e' b'0ee0' b'5008')]), body=b'{}', headers={'content-type': 'application/json', 'correlation_id': '2c4a9990-ed1a-48c7-b612-0f3e0ee05008'}, batch_headers=[], path={}, content_type='application/json', reply_to='', message_id='0-1724246108', correlation_id='2c4a9990-ed1a-48c7-b612-0f3e0ee05008', decoded_body={}, processed=False, committed=True), >: {}} h = <'handler': filter='default_filter'> m = message = KafkaMessage(raw_message=ConsumerRecord(topic='in', partition=0, offset=0, timestamp=1724246108, timestamp_type=0, key=b'', value=b'{}', checksum=248, serialized_key_size=0, serialized_value_size=2, headers=[('content-type', b'application/json'), ('correlation_id', b'2c4a9990-ed1a-48c7-b612-' b'0f3e0ee05008')]), body=b'{}', headers={'content-type': 'application/json', 'correlation_id': '2c4a9990-ed1a-48c7-b612-0f3e0ee05008'}, batch_headers=[], path={}, content_type='application/json', reply_to='', message_id='0-1724246108', correlation_id='2c4a9990-ed1a-48c7-b612-0f3e0ee05008', decoded_body={}, processed=False, committed=True) middleware = middlewares = [] msg = ConsumerRecord(topic='in', partition=0, offset=0, timestamp=1724246108, timestamp_type=0, key=b'', value=b'{}', checksum=248, serialized_key_size=0, serialized_value_size=2, headers=[('content-type', b'application/json'), ('correlation_id', b'2c4a9990-ed1a-48c7-b612-0f3e0ee05008')]) p = parsing_error = None result_msg = self = stack = .venv/lib/python3.11/site-packages/faststream/kafka/publisher/usecase.py:184: in publish return await call( _extra_middlewares = . at 0x1076b32a0> call = functools.partial(>, >) correlation_id = '2c4a9990-ed1a-48c7-b612-0f3e0ee05008' headers = None key = None m = > message = Task() partition = None reply_to = '' self = timestamp_ms = None topic = 'out' .venv/lib/python3.11/site-packages/faststream/broker/middlewares/base.py:115: in publish_scope await self.after_publish(err) args = () call_next = > err = TypeError("object NoneType can't be used in 'await' expression") kwargs = {'correlation_id': '2c4a9990-ed1a-48c7-b612-0f3e0ee05008', 'headers': None, 'key': None, 'partition': None, 'reply_to': '', 'timestamp_ms': None, 'topic': 'out'} msg = Task() self = .venv/lib/python3.11/site-packages/faststream/broker/middlewares/base.py:90: in after_publish raise err err = TypeError("object NoneType can't be used in 'await' expression") self = .venv/lib/python3.11/site-packages/faststream/broker/middlewares/base.py:102: in publish_scope result = await call_next( args = () call_next = > err = TypeError("object NoneType can't be used in 'await' expression") kwargs = {'correlation_id': '2c4a9990-ed1a-48c7-b612-0f3e0ee05008', 'headers': None, 'key': None, 'partition': None, 'reply_to': '', 'timestamp_ms': None, 'topic': 'out'} msg = Task() self = .venv/lib/python3.11/site-packages/faststream/kafka/testing.py:114: in publish handle_value = await call_handler( correlation_id = '2c4a9990-ed1a-48c7-b612-0f3e0ee05008' handler = headers = None incoming = ConsumerRecord(topic='out', partition=0, offset=0, timestamp=1724246108, timestamp_type=0, key=b'', value=b'{}', checksum=248, serialized_key_size=0, serialized_value_size=2, headers=[('content-type', b'application/json'), ('correlation_id', b'2c4a9990-ed1a-48c7-b612-0f3e0ee05008')]) key = None message = Task() partition = None raise_timeout = False reply_to = '' return_value = None rpc = False rpc_timeout = None self = timestamp_ms = None topic = 'out' .venv/lib/python3.11/site-packages/faststream/testing/broker.py:240: in call_handler result = await handler.process_message(message) handler = message = ConsumerRecord(topic='out', partition=0, offset=0, timestamp=1724246108, timestamp_type=0, key=b'', value=b'{}', checksum=248, serialized_key_size=0, serialized_value_size=2, headers=[('content-type', b'application/json'), ('correlation_id', b'2c4a9990-ed1a-48c7-b612-0f3e0ee05008')]) raise_timeout = False rpc = False rpc_timeout = None .venv/lib/python3.11/site-packages/faststream/broker/subscriber/usecase.py:365: in process_message await h.call( base_m = cache = {>: {}, >: KafkaMessage(raw_message=ConsumerRecord(topic='out', partition=0, offset=0, timestamp=1724246108, timestamp_type=0, key=b'', value=b'{}', checksum=248, serialized_key_size=0, serialized_value_size=2, headers=[('content-type', b'appl' b'icat' b'ion/' b'json'), ('correlation_id', b'2c4a' b'9990' b'-ed1' b'a-48' b'c7-b' b'612-' b'0f3e' b'0ee0' b'5008')]), body=b'{}', headers={'content-type': 'application/json', 'correlation_id': '2c4a9990-ed1a-48c7-b612-0f3e0ee05008'}, batch_headers=[], path={}, content_type='application/json', reply_to='', message_id='0-1724246108', correlation_id='2c4a9990-ed1a-48c7-b612-0f3e0ee05008', decoded_body={}, processed=False, committed=True)} h = <'publisher_response_subscriber': filter='default_filter'> m = message = KafkaMessage(raw_message=ConsumerRecord(topic='out', partition=0, offset=0, timestamp=1724246108, timestamp_type=0, key=b'', value=b'{}', checksum=248, serialized_key_size=0, serialized_value_size=2, headers=[('content-type', b'application/json'), ('correlation_id', b'2c4a9990-ed1a-48c7-b612-' b'0f3e0ee05008')]), body=b'{}', headers={'content-type': 'application/json', 'correlation_id': '2c4a9990-ed1a-48c7-b612-0f3e0ee05008'}, batch_headers=[], path={}, content_type='application/json', reply_to='', message_id='0-1724246108', correlation_id='2c4a9990-ed1a-48c7-b612-0f3e0ee05008', decoded_body={}, processed=False, committed=True) middleware = middlewares = [] msg = ConsumerRecord(topic='out', partition=0, offset=0, timestamp=1724246108, timestamp_type=0, key=b'', value=b'{}', checksum=248, serialized_key_size=0, serialized_value_size=2, headers=[('content-type', b'application/json'), ('correlation_id', b'2c4a9990-ed1a-48c7-b612-0f3e0ee05008')]) parsing_error = None self = stack = .venv/lib/python3.11/site-packages/faststream/broker/subscriber/call_item.py:172: in call raise e _extra_middlewares = . at 0x10787da40> call = functools.partial(>, >) message = KafkaMessage(raw_message=ConsumerRecord(topic='out', partition=0, offset=0, timestamp=1724246108, timestamp_type=0, key=b'', value=b'{}', checksum=248, serialized_key_size=0, serialized_value_size=2, headers=[('content-type', b'application/json'), ('correlation_id', b'2c4a9990-ed1a-48c7-b612-' b'0f3e0ee05008')]), body=b'{}', headers={'content-type': 'application/json', 'correlation_id': '2c4a9990-ed1a-48c7-b612-0f3e0ee05008'}, batch_headers=[], path={}, content_type='application/json', reply_to='', message_id='0-1724246108', correlation_id='2c4a9990-ed1a-48c7-b612-0f3e0ee05008', decoded_body={}, processed=False, committed=True) middleware = > self = <'publisher_response_subscriber': filter='default_filter'> .venv/lib/python3.11/site-packages/faststream/broker/subscriber/call_item.py:164: in call result = await call(message) _extra_middlewares = . at 0x10787da40> call = functools.partial(>, >) message = KafkaMessage(raw_message=ConsumerRecord(topic='out', partition=0, offset=0, timestamp=1724246108, timestamp_type=0, key=b'', value=b'{}', checksum=248, serialized_key_size=0, serialized_value_size=2, headers=[('content-type', b'application/json'), ('correlation_id', b'2c4a9990-ed1a-48c7-b612-' b'0f3e0ee05008')]), body=b'{}', headers={'content-type': 'application/json', 'correlation_id': '2c4a9990-ed1a-48c7-b612-0f3e0ee05008'}, batch_headers=[], path={}, content_type='application/json', reply_to='', message_id='0-1724246108', correlation_id='2c4a9990-ed1a-48c7-b612-0f3e0ee05008', decoded_body={}, processed=False, committed=True) middleware = > self = <'publisher_response_subscriber': filter='default_filter'> .venv/lib/python3.11/site-packages/dishka/integrations/faststream.py:77: in consume_scope await call_next(msg), call_next = > msg = KafkaMessage(raw_message=ConsumerRecord(topic='out', partition=0, offset=0, timestamp=1724246108, timestamp_type=0, key=b'', value=b'{}', checksum=248, serialized_key_size=0, serialized_value_size=2, headers=[('content-type', b'application/json'), ('correlation_id', b'2c4a9990-ed1a-48c7-b612-' b'0f3e0ee05008')]), body=b'{}', headers={'content-type': 'application/json', 'correlation_id': '2c4a9990-ed1a-48c7-b612-0f3e0ee05008'}, batch_headers=[], path={}, content_type='application/json', reply_to='', message_id='0-1724246108', correlation_id='2c4a9990-ed1a-48c7-b612-0f3e0ee05008', decoded_body={}, processed=False, committed=True) request_container = self = .venv/lib/python3.11/site-packages/faststream/broker/wrapper/call.py:201: in decode_wrapper return await func(msg) func = .publisher_response_subscriber at 0x107752ca0> message = KafkaMessage(raw_message=ConsumerRecord(topic='out', partition=0, offset=0, timestamp=1724246108, timestamp_type=0, key=b'', value=b'{}', checksum=248, serialized_key_size=0, serialized_value_size=2, headers=[('content-type', b'application/json'), ('correlation_id', b'2c4a9990-ed1a-48c7-b612-' b'0f3e0ee05008')]), body=b'{}', headers={'content-type': 'application/json', 'correlation_id': '2c4a9990-ed1a-48c7-b612-0f3e0ee05008'}, batch_headers=[], path={}, content_type='application/json', reply_to='', message_id='0-1724246108', correlation_id='2c4a9990-ed1a-48c7-b612-0f3e0ee05008', decoded_body={}, processed=False, committed=True) msg = {} params_ln = 1 .venv/lib/python3.11/site-packages/fast_depends/use.py:148: in injected_wrapper r = await real_model.asolve( args = ({},) kwargs = {} overrides = {} real_model = stack = .venv/lib/python3.11/site-packages/fast_depends/core/model.py:533: in asolve response = await run_async(call, *final_args, **final_kwargs) _ = .publisher_response_subscriber at 0x107753420> args = () cache_dependencies = {} call = .publisher_response_subscriber at 0x107753420> cast_gen = custom_to_solve = [] dep_to_solve = [] dependency_overrides = {} final_args = () final_kwargs = {'msg': {}} kwargs = {'msg': {}} nested = False self = stack = tg = .venv/lib/python3.11/site-packages/fast_depends/utils.py:48: in run_async return await cast(Callable[P, Awaitable[T]], func)(*args, **kwargs) args = () func = .publisher_response_subscriber at 0x107753420> kwargs = {'msg': {}} .venv/lib/python3.11/site-packages/faststream/utils/functions.py:53: in to_async_wrapper return await call_or_await(func, *args, **kwargs) args = () func = .publisher_response_subscriber at 0x107753600> kwargs = {'msg': {}} .venv/lib/python3.11/site-packages/fast_depends/utils.py:48: in run_async return await cast(Callable[P, Awaitable[T]], func)(*args, **kwargs) args = () func = .publisher_response_subscriber at 0x107753600> kwargs = {'msg': {}} _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ args = (), kwargs = {'msg': {}}, solved = {} async def auto_injected_func(*args: P.args, **kwargs: P.kwargs) -> T: container = container_getter(args, kwargs) for param in additional_params: kwargs.pop(param.name) solved = { name: await container.get( dep.type_hint, component=dep.component, ) for name, dep in dependencies.items() } > return await func(*args, **kwargs, **solved) E TypeError: object NoneType can't be used in 'await' expression additional_params = () args = () container = container_getter = . at 0x1077534c0> dependencies = {} func = .publisher_response_subscriber at 0x1076719e0> kwargs = {'msg': {}} solved = {} .venv/lib/python3.11/site-packages/dishka/integrations/base.py:192: TypeError ```

py3.11 dishka==1.3.0 faststream==0.5.18

Btw if remove the publisher everything is fine.

It would be great if the documentation included an example of testing with FastStream, similar to the example with FastAPI.

Lancetnik commented 3 weeks ago

@own3dh4rd thank you for the report! I found FastStream bug. Wait for fix today, please

own3dh4rd commented 3 weeks ago

@Lancetnik Thank you! It now works as expected. Sorry for selecting the wrong repository when creating the issue.