airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.2k stars 106 forks source link

Currently not suitable for work with several brokers #309

Closed Gerleff closed 1 year ago

Gerleff commented 1 year ago

Best expected result:

For the project I'm currently working on, if I wanna use fastkafka at current state, I need to set N containers up for N Kafka brokers, which does not suit well.

davorrunje commented 1 year ago

Hi,

We created the following (yet unimplemented) example of how it could be done. Would that work for you?

from fastkafka import FastKafka
from fastkafka.testing import Tester
from pydantic import BaseModel

kafka_brokers_en = {
    "localhost": {
        "url": "localhost",
        "description": "local development kafka broker for english",
        "port": 9092,
    },
    "staging": {
        "url": ["en-staging-1.airt.ai", "en-staging-2.airt.ai", "en-staging-3.airt.ai"],
        "description": "staging kafka broker for english",
        "port": 9092,
        "protocol": "kafka-secure",
        "security": {"type": "plain"},
    },
}

kafka_brokers_it = {
    "localhost": {
        "url": "localhost",
        "description": "local development kafka broker for italian",
        "port": 9091,
    },
    "staging": {
        "url": ["it-staging-1.airt.ai", "it-staging-2.airt.ai", "it-staging-3.airt.ai"],
        "description": "staging kafka broker for italian",
        "port": 9092,
        "protocol": "kafka-secure",
        "security": {"type": "plain"},
    },
}

app = FastKafka(kafka_brokers=kafka_brokers_en)

class Msg():
    value: str

@app.produces(topic="from_english", bootstrap_servers=kafka_brokers_it)
async def to_italian_from_english(msg: Msg) -> Msg:
    it_msg = Msg(value=f"Ciao {msg.value}")
    return it_msg

@app.produces(topic="from_italian", bootstrap_servers=kafka_brokers_en)
async def to_english_from_italian(msg: Msg) -> Msg:
    it_msg = Msg(value=f"Hello {msg.value}")
    return it_msg

@app.consumes(topic="input", bootstrap_servers=kafka_brokers_en)
async def on_english(msg: Msg) -> None:
    await to_italian(msg)

@app.consumes(topic="input", bootstrap_servers=kafka_brokers_it)
async def on_italian(msg: Msg) -> None:
    await to_english(msg)

async with Tester(app) as tester:
    await tester.to_input(Msg(value="John"), bootstrap_servers=kafka_brokers_en)
    await tester.async_mocks.on_from_english().assert_awaited_with(
        Msg(value="Ciao John"), timeout=3
    )

    await tester.to_input(Msg(value="Giovanni"), bootstrap_servers=kafka_brokers_it)
    await tester.async_mocks.on_from_italian().assert_awaited_with(
        Msg(value="Hello Giovanni"), timeout=3
    )
Gerleff commented 1 year ago

Hi

I have en error occurring, if execute fastkafka run ... --kafka-broker staging ...:

Traceback: ... /site-packages/fastkafka/_application/app.py:102 in <dictcomp> │ │ │ 99 │ else: │ │ 100 │ │ retval = KafkaBrokers( │ │ 101 │ │ │ brokers={ │ │ ❱ 102 │ │ │ │ k: KafkaBroker.parse_raw( │ │ 103 │ │ │ │ │ v.json() if hasattr(v, "json") else json.dumps(v) │ │ 104 │ │ │ │ ) │ │ 105 │ │ │ │ for k, v in kafka_brokers.items() │ │ │

...

│ │ .0 = <dict_itemiterator object at 0x1101be7f0> │ │ │ │ k = 'Transaction Kafka' │ │ │ │ v = { │ │ │ │ │ 'url': [ │ │ │ │ │ │ '127.0.0.1:9092', │ │ │ │ │ │ '127.0.0.1:9093', │ │ │ │ │ │ '127.0.0.1:9094' │ │ │ │ │ ], │ │ │ │ │ 'description': 'Transaction kafka broker', │ │ │ │ │ 'port': 9092 │ │ │ │ } │ │ ValidationError: 1 validation error for KafkaBroker url str type expected (type=type_error.str)

Gerleff commented 1 year ago

It would work, actually, if implemented )

My minimal requirement is to execute fastkafka run ... --kafka-broker staging ...

with

kafka_brokers = {'staging': {'url': ['url1', 'url2', 'url3'], ...}}
app = FastKafka(kafka_brokers= kafka_brokers)

and receive messages from each url.

davorrunje commented 1 year ago

Yeah, that was just the sketch of the idea :)

It is scheduled for this sprint, it should be finished by this Friday.