airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.37k stars 118 forks source link

Feature: Features for the RPC mod #1228

Open maxim-f1 opened 7 months ago

maxim-f1 commented 7 months ago

Two cases of rpc mods for producer.

1) If None is returned from the consume function, I would like the publish function to return None instead of an empty byte string.

2) I would like to be able to put pydantic model on publish with rpc=True so that the RPC response is automatically validated.

# ------------- SERVICE 1 -------------

from faststream.rabbit import RabbitRouter, RabbitBroker, RabbitQueue

broker = RabbitBroker()
service_1 = RabbitRouter()
broker.include_router(service_1)

class DTO(BaseModel):
    user_id: UUID | None

    queue: ClassVar[RabbitQueue] = RabbitQueue('v1.get', auto_delete=True)

class UserDTO(BaseModel):
    id: UUID
    name: str

@service_1.subscriber(
    queue=DTO.queue
)
async def handler(body: DTO) -> UserDTO | None:
    if body.user_id is None:
        return None
    return UserDTO(id=body.user_id, name='name')

# ------------- SERVICE 2 -------------

from faststream.rabbit import RabbitBroker, RabbitQueue

class DTO(BaseModel):
    user_id: UUID | None

    queue: ClassVar[RabbitQueue] = RabbitQueue('v1.get', auto_delete=True)

class UserDTO(BaseModel):
    id: UUID
    name: str

broker = RabbitBroker()

async def send():
    # 1 CASE
    message = DTO(user_id=None)
    result = await broker.publish(message, message.queue, rpc=True)
    # result => b'' 
    # but I want to:
    # result => None

    # 2 CASE
    message = DTO(user_id=uuid4())
    result = await broker.publish(message, message.queue, rpc=True)
    # result => {'id': *some UUID*, 'name': 'name'}
    # but I want to:
    # result = await broker.publish(message, message.queue, rpc=True, response_model=UserDTO)
    # result => UserDTO(id=*some UUID*, name='name')
Lancetnik commented 1 month ago

I think, we should implement it via Generic: broker.publish[int](..., rpc=True)

Or even with broker.publish[RabbitMessage](..., rpc=True) to return raw message with all headers and other params

Lancetnik commented 3 weeks ago

Well, the concept changed. I think, we will make decoder lazy in 0.6.0, so u will be able to serialize RPC response to any time right from bytes (also, we should merge #1649 first)