airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.2k stars 106 forks source link

Provide proxy/router to support module-distributed application #321

Closed Gerleff closed 1 year ago

Gerleff commented 1 year ago

Currently using

class FastKafkaRouter:
    """Proxy-app for FastKafka."""

    def __init__(self):
        self._consumers_store = {}
        self._producers_store = {}

    consumes = FastKafka.consumes
    produces = FastKafka.produces

    def transfer_handlers_to(self, app: FastKafka):
        """Transfer handlers into FastKafka application."""
        app._consumers_store.update(self._consumers_store)
        app._producers_store.update(self._producers_store)
        for attr, value in vars(self).items():
            if attr not in ("_consumers_store", "_producers_store"):
                setattr(app, attr, value)

# module <consumers/topic_handlers.py>
router = FastKafkaRouter()

@router.consumes(topic=...)
async def on_input(msg):
    ...

# module <fastkafka_app.py>
from consumers.topic_handlers import router

app = FastKafka(...)
router.transfer_handlers_to(app)

That helps support such distributed architecture.

Снимок экрана 2023-05-24 в 15 46 38
Gerleff commented 1 year ago

Updated example

Gerleff commented 1 year ago

[UPD] Works well with consumers, but doesn't work with producers in provided example.

Also, is it possible to use FastKafka.produces from FastAPI without launching FastKafka? (mentioned here, because root of both questions is the same, as I see)

davorrunje commented 1 year ago

You can start FastKafka using FastAPI and we have a new convenience method and a guide on how to do it in the upcoming release (RC is planned for tomorrow):

https://fastkafka.airt.ai/docs/next/guides/Guide_32_Using_fastapi_to_run_fastkafka_application

Would that be an option for you?

Gerleff commented 1 year ago

At the end, I just used

@lru_cache(maxsize=None)
def collect_all_kafka_handlers(app_name="kafka_app", root_path="."):
    """Find all uses of kafka_app to provide all producers and consumers without import conflicts."""
    counter = 0
    start = time()
    for _, module_name, _ in pkgutil.walk_packages(path=[root_path]):
        if "domain" in module_name or "application" in module_name:  # or another skip condition 
            continue
        module = importlib.import_module(module_name)
        if app_name in module.__dict__:
            print(f"Found {app_name} at {module_name}")
            counter += 1
    print(f"Found {counter} approaches! It took {round(time() - start, 2)} seconds to collect.")

This function walks among all the modules, which are in folder, presented at root_path, and import them. So all the FastKafka.produces and FastKafka.consumes are called

Gerleff commented 1 year ago

You can start FastKafka using FastAPI and we have a new convenience method and a guide on how to do it in the upcoming release (RC is planned for tomorrow):

https://fastkafka.airt.ai/docs/next/guides/Guide_32_Using_fastapi_to_run_fastkafka_application

Would that be an option for you?

For FastAPI instance with kafka producer functionality - yes! Thanks a lot!

davorrunje commented 1 year ago

Closing for now due to FastAPI alternative.