airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.41k stars 123 forks source link

Feature: Unify the middleware interface between Broker and subscriber/publisher #1646

Open andreaimprovised opened 1 month ago

andreaimprovised commented 1 month ago

Is your feature request related to a problem? Please describe.

Yes.

The API protocol/types for middleware dare different for Broker/Router instantiation vs. subscriber/publisher instantiation.

These are the current types in faststream/broker/types.py:

BrokerMiddleware: TypeAlias = Callable[[Optional[MsgType]], BaseMiddleware]
SubscriberMiddleware: TypeAlias = Callable[
    [AsyncFuncAny, MsgType],
    MsgType,
]

class PublisherMiddleware(Protocol):
    """Publisher middleware interface."""

    def __call__(
        self,
        call_next: AsyncFunc,
        *__args: Any,
        **__kwargs: Any,
    ) -> Any: ...

The subscriber/publisher middleware API is specific to the consume_scope and publish_scope signatures.

This is a bit surprising. It makes faststream.broker.middleware.BaseMiddleware only work for brokers. As a result, the middleware is not really portable.

Describe the solution you'd like

Make the subscriber/publisher protocols the same as the broker/router protocols.

Or, perhaps, allow both? Or at least document on the website that there's a difference.

Feature code example This is what I wish were possible.

from faststream import FastStream
from faststream.confluent import KafkaBroker
from faststream.broker.middleware import BaseMiddleware

class CustomMiddleware1(BaseMiddleware):
    pass

class CustomMiddleware2(BaseMiddleware):
    pass

broker = KafkaBroker("localhost:9092", middlewares=[CustomMiddleware1])

app = FastStream(broker)

@broker.subscribe("in-topic", middlewares=[CustomMiddleware2])
async def handle(body: str):
    print(str)

Describe alternatives you've considered

There is an adapter strategy, but it's a little extra effort and more indirection.

from faststream.broker.types import BrokerMiddleware

class SubscriberMiddlewareAdapter:
    def __init__(self, middleware: BrokerMiddleware):
        self.middleware = middleware

    async def __call__(
        self, call_next: AsyncFuncAny, msg: KafkaMessage
    ) -> KafkaMessage:
        async with self.middleware(msg.raw_message) as m:
            return await m.consume_scope(call_next, msg)

Additional context Include any other relevant context or screenshots related to the feature request.

Lancetnik commented 1 month ago

Thank you for the suggestion! I am still designing some breaking changes to release in 0.6.0 and subscriber & publisher middlewares rework is one of them. I'll contact you here, when we have some final view about middlewares.