Lancetnik / FastDepends

FastDepends - FastAPI Dependency Injection system extracted from FastAPI and cleared of all HTTP logic. Async and sync modes are both supported.
https://lancetnik.github.io/FastDepends/
MIT License
254 stars 9 forks source link

Weird dependency injection issue #13

Closed chrisgoddard closed 10 months ago

chrisgoddard commented 10 months ago

Hey there! Me again šŸ˜µā€šŸ’« (btw - big fan of this and the FastStream project!)

Running into this issue in the context of FastStream.

The following code works as expected:

import os
import typing
from redis import Redis, ConnectionPool
from pydantic import BaseModel, Field
from faststream import FastStream, Depends, ContextRepo, Context
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import (
    Logger,
    KafkaBroker as BrokerAnnotation,
)

KAFKA_HOST = os.environ.get('KAFKA_HOST', 'localhost')
KAFKA_PORT = os.environ.get('KAFKA_PORT', '19092')

broker = KafkaBroker(f'{KAFKA_HOST}:{KAFKA_PORT}')
app = FastStream(broker)

class SimpleDependency(BaseModel):
    id: int

def simple_dependency():
    return SimpleDependency(id=1)

class SimpleMessage(BaseModel):
    message: str

class ComplexMessage(SimpleMessage):
    id: int = Field(..., gt=0)
    message: str = Field(..., min_length=1)

class NestedMessage(BaseModel):
    message: SimpleMessage
    id: int

def inject_redis_client() -> Redis:
    return Redis(
        connection_pool=ConnectionPool(
            host=os.environ.get('REDIS_HOST', 'localhost'),
            port=int(os.environ.get('REDIS_PORT', '6379')),
            db=int(os.environ.get('REDIS_DB', '0'))
        )
    )

@broker.subscriber('testing-a')
async def testing_a(
    message: ComplexMessage,
    logger: Logger,
    redis: Redis = Depends(inject_redis_client),
    dep: SimpleDependency = Depends(simple_dependency),
):
    await broker.publish(
        NestedMessage(
            message=message,
            id=dep.id
        ),
        'testing-b'
    )

@broker.subscriber('testing-b')
async def testing_b(
    message: NestedMessage,
    logger: Logger,
    dep: SimpleDependency = Depends(simple_dependency),
):
    logger.info(message)
    logger.info(dep)

@app.after_startup
async def after_startup(
    context: ContextRepo,
    logger: Logger,
):
    for i in range(0, 10):
        await broker.publish(ComplexMessage(id=i+1, message=f'hello {i}'), 'testing-a')

testing_a correctly receives message: ComplexMessage and the redis client instance is available.

However, if change my inject_redis_client function to accept an argument, for instance:

def inject_redis_client(
    redis_db: typing.Optional[int] = None
) -> Redis:
    return Redis(
        connection_pool=ConnectionPool(
            host=os.environ.get('REDIS_HOST', 'localhost'),
            port=int(os.environ.get('REDIS_PORT', '6379')),
            db=redis_db or int(os.environ.get('REDIS_DB', '0'))
        )
    )

Then when I run the app I get the following error:


2023-09-22 10:28:19,730 INFO     - testing-a | 449-169540 - Received
2023-09-22 10:28:19,731 ERROR    - testing-a | 449-169540 - ValidationError: 1 validation error for testing_a
message
  Input should be a valid dictionary or instance of ComplexMessage [type=model_type, input_value='hello 9', input_type=str]
    For further information visit https://errors.pydantic.dev/2.3/v/model_type
Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/example/lib/python3.11/site-packages/faststream/broker/core/asyncronous.py", line 550, in log_wrapper
    r = await func(message)
        ^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/example/lib/python3.11/site-packages/faststream/kafka/broker.py", line 240, in process_wrapper
    r = await self._execute_handler(func, message)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/example/lib/python3.11/site-packages/faststream/broker/core/asyncronous.py", line 487, in _execute_handler
    return await func(message)
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/example/lib/python3.11/site-packages/faststream/broker/core/asyncronous.py", line 415, in decode_wrapper
    return await func(**msg)
           ^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/example/lib/python3.11/site-packages/fast_depends/use.py", line 135, in injected_wrapper
    r = await real_model.asolve(
        ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/example/lib/python3.11/site-packages/fast_depends/core/model.py", line 396, in asolve
    final_args, final_kwargs = cast_gen.send(kwargs)
                               ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/example/lib/python3.11/site-packages/fast_depends/core/model.py", line 211, in _solve
    casted_model = self.model(**solved_kw)
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/example/lib/python3.11/site-packages/pydantic/main.py", line 165, in __init__
    __pydantic_self__.__pydantic_validator__.validate_python(data, self_instance=__pydantic_self__)
pydantic_core._pydantic_core.ValidationError: 1 validation error for testing_a
message
  Input should be a valid dictionary or instance of ComplexMessage [type=model_type, input_value='hello 9', input_type=str]
    For further information visit https://errors.pydantic.dev/2.3/v/model_type
^C2023-09-22 10:28:21,580 INFO     - FastStream app shutting down...
2023-09-22 10:28:21,581 INFO     - FastStream app shut down gracefully.

In this context, I'm not passing a redis_db argument to the testing_a function - but that shouldn't matter as it's optional.

Even if I update the function to set a value:


@broker.subscriber('testing-a')
async def testing_a(
    message: ComplexMessage,
    logger: Logger,
    redis_db: int = 1,
    redis: Redis = Depends(inject_redis_client),
    dep: SimpleDependency = Depends(simple_dependency),
):
    await broker.publish(
        NestedMessage(
            message=message,
            id=dep.id
        ),
        'testing-b'
    )

I get the same issue.

I might not 100% be understanding how you're supposed to pass arguments to dependency injection functions but nothing I try seems to make a difference.

Lancetnik commented 10 months ago

@chrisgoddard hello again and thanks for your pleasure feedback about projects

Depends are parsing incoming functional (not dafault) arguments to use some of them as an own incoming arguments too. To support parametrize dependencies you can use them like this

from fast_depends import Depends, inject

def parametrize_dependency(argument: int):
    def real_dependency(function_argument: int):
        return argument + function_argument
    return real_dependency

@inject
def call(d = Depends(parametrize_dependency(1))):
    return d

assert call(1) == 2

or use a callable class

class ParamDep:
    def __init__(self, param):
        self.param = param

    def __call__(self, function_argument):
        return function_argument + self.param

Also, Depends can be nested in each other (as a Context too), so you can use this feature as well

Lancetnik commented 10 months ago

Also, thanks for being with the Propan project since it start Can you give us any feedback about its usage?