airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
3.13k stars 160 forks source link

Feature: Publisher support for async iterator #1304

Open johanjk opened 8 months ago

johanjk commented 8 months ago

I have a yielding generator that produces msgs into a topic.

I wish the following syntax was valid:

import asyncio
import random
from contextlib import asynccontextmanager
from typing import AsyncIterator

from faststream import ContextRepo, FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")

@broker.publisher("number_of_the_day")
async def num() -> AsyncIterator[int]:
    while True:
        yield random.randint(0, 10)
        await asyncio.sleep(86400)

@asynccontextmanager
async def lifespan(context: ContextRepo):
    asyncio.create_task(num())
    yield

async def main():
    await FastStream(broker, lifespan=lifespan).run()

if __name__ == '__main__':
    asyncio.run(main())

Current workaround

import functools

def pub(publisher):
    def decorator(func):
        @functools.wraps(func)
        async def inner(*args, **kwargs):
            async for x in func(*args, **kwargs):
                await publisher.publish(x)
        return inner
    return decorator
# [...]
@pub(broker.publisher("number_of_the_day"))
async def num() -> AsyncIterator[int]:
    while True:
        yield random.randint(0, 10)
        await asyncio.sleep(86400)
# [...]
Lancetnik commented 8 months ago

Seems like a good idea, but you can't use @broker.publisher(...) without @broker.subscriber(...). So, the valid case should looks like

@broker.subscriber("in")
@broker.publisher("out")
async def handler(...):
    yield from iterbale_interactor(...)

It should publishes N messages at 1 consumer call

johanjk commented 8 months ago

I would in fact also like publisher to be usable standalone.

In my example I am scheduling it in the background using asyncio.create_task as part of lifespan with the help of my "workaround" pub decorator.

Lancetnik commented 8 months ago

You can use publisher.publish interface alone I don't like an idea to create publish side-effect on original function call, so all interfaces can be used only in explicit way: @broker.publisher(...) only with @broker.subscriber(...), publisher.publish(...) - in other cases