airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
3.15k stars 161 forks source link

Feature: Confluent message consuming refactoring #1904

Open Lancetnik opened 2 weeks ago

Lancetnik commented 2 weeks ago

Current ConfluentConsumer implementation is pretty dirty - https://github.com/airtai/faststream/blob/main/faststream/confluent/client.py#L219

It uses sync_to_async everywhere, that can leads to race conditions and segmentation faults in C code.

We should refactor it to use separated thread as synchronous consumer flow and requeue messages from in-memory to asynchronous code. I can suggest use something like that

import threading
from queue import Queue, Empty
from typing import Optional, Any, List
from types import TracebackType

import anyio
from typing_extensions import Self

from faststream._internal.utils.functions import run_in_threadpool

class EventSource:
    def __init__(self) -> None:
        self.thread = threading.Thread(target=self._put_task)

        self._queue = Queue(maxsize=3)
        self.__stop_signal = threading.Event()

    @property
    def completed(self) -> bool: 
        return self._queue.empty()

    async def get_one(self, timeout: Optional[float] = 3.0) -> Optional[Any]:
        try:
            return await run_in_threadpool(self._queue.get, True, timeout)
        except Empty:
            return None

    async def get_many(
        self,
        timeout: float = 0.1,
        max_records: Optional[int] = 10,
    ) -> List[Any]:
        data = []
        with anyio.move_on_after(delay=timeout):
            while len(data) < max_records:
                if msg := await self.get_one(timeout=timeout):
                    data.append(msg)
        return data

    def start(self) -> None:
        self.thread.start()

    def stop_nowait(self) -> None:
        self.__stop_signal.set()

    def stop(self) -> None:
        self.stop_nowait()
        self.thread.join()

    def __enter__(self) -> Self:
        self.start()
        return self

    def __exit__(
        self,
        exc_type: Optional[type[BaseException]] = None,
        exc_val: Optional[BaseException] = None,
        exc_tb: Optional[TracebackType] = None,
    ) -> None:
        self.stop()

    def _put_task(self) -> None:
        # TODO: implement messages consuming here
        import time
        import random
        while not self.__stop_signal.is_set():
            print("put msg")
            time.sleep(random.randint(5, 20) / 10)
            self._queue.put("CONFLUENT MESSAGE")

async def main():
    with EventSource() as source:
        while True:
            msg = await source.get_many(timeout=3)
            print(len(msg), msg)

anyio.run(main)