airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.6k stars 135 forks source link

feature: extend Response API #1587

Closed Lancetnik closed 2 months ago

Lancetnik commented 3 months ago

We should specify concrete broker Response classes (like KafkaResponse) to make them supporting all broker features (Publisher.publish options)

Another words, KafkaResponse should has the following signature:

class KafkaResponse(Response):
    def __init__(
        self,
        body: "SendableMessage",
        *,
        headers: Optional["AnyDict"] = None,
        correlation_id: Optional[str] = None,
        # Kafka specific
        timestamp_ms: Optional[int] = None,
        key: Optional[bytes] = None,
        ...
    ) -> None:
        ...

Another brokers Response classes should be extended the same way

Flosckow commented 3 months ago

Do I understand correctly that it is necessary to change all "SpecificBroker".Response classes, transfer variables that are in the "SpecificBroker".publisher.use case to the init method ("SpecificBroker".Response)? And why is the body: "SendableMessage" parameter specified in Response, and in the publish methods we have message: "SendableMessage"

Lancetnik commented 3 months ago

@Flosckow yes, you undestood all things correctly.

About message VS body. I used message in broker.publish and publisher.publish interfaces due they should be used by user directly and it is explicit argument name for such usage. In the Response class I used body name due Response is whole object to publish (with body, headers and other options).

But, you point is correct and we should think about it again.

Flosckow commented 3 months ago

Ok, I'll wait for an answer. Сan you link this pull request? https://github.com/airtai/faststream/pull/1607