faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.66k stars 183 forks source link

Add a method to include the whole message when using the take method #616

Closed fonty422 closed 7 months ago

fonty422 commented 7 months ago

A suggestion to accommodate a need I have to also access the headers when using the take method. The option is to do one of the following:

  1. Add a new option to decide if the user wants the values or the whole message:
  2. Add a new method called take_events that is basically a carbon copy of the current take method that adds events to the list rather than the values

The code is effectively the same regardless:

async def take_events(self, max_: int, within: Seconds) -> AsyncIterable[Sequence[T_co]]:
        """
        Buffer n values at a time and yield a list of buffered values.

        Arguments:
            max_: Max number of messages to receive. When more than this
                number of messages are received within the specified number of
                seconds then we flush the buffer immediately.
            within: Timeout for when we give up waiting for another value,
                and process the values we have.
                Warning: If there's no timeout (i.e. `timeout=None`),
                the agent is likely to stall and block buffered events for an
                unreasonable length of time(!).
        """
        buffer: List[T_co] = []
        events: List[EventT] = []
        buffer_add = buffer.append
        event_add = events.append
        buffer_size = buffer.__len__
        buffer_full = asyncio.Event()
        buffer_consumed = asyncio.Event()
        timeout = want_seconds(within) if within else None
        stream_enable_acks: bool = self.enable_acks

        buffer_consuming: Optional[asyncio.Future] = None

        channel_it = aiter(self.channel)

        async def add_to_buffer(value: T) -> T:
            try:
                nonlocal buffer_consuming
                if buffer_consuming is not None:
                    try:
                        await buffer_consuming
                    finally:
                        buffer_consuming = None
                buffer_add(cast(T_co, value))
                event = self.current_event
                if event is None:
                    raise RuntimeError("Take buffer found current_event is None")
                event_add(event)
                if buffer_size() >= max_:
                    buffer_full.set()
                    buffer_consumed.clear()
                    await self.wait(buffer_consumed)
            except CancelledError:
                raise
            except Exception as exc:
                self.log.exception("Error adding to take buffer: %r", exc)
                await self.crash(exc)
            return value

        self.enable_acks = False

        self.add_processor(add_to_buffer)
        self._enable_passive(cast(ChannelT, channel_it))
        try:
            while not self.should_stop:
                await self.wait_for_stopped(buffer_full, timeout=timeout)
                if buffer:
                    buffer_consuming = self.loop.create_future()
                    try:
                        yield list(events)  # Yield events instead of values

                    finally:
                        buffer.clear()
                        for event in events:
                            await self.ack(event)
                        events.clear()
                        notify(buffer_consuming)
                        buffer_full.clear()
                        buffer_consumed.set()
                else:
                    pass
            else:
                pass
        finally:
            self.enable_acks = stream_enable_acks
            self._processors.remove(add_to_buffer)

You could also just add use_events option to the take method which defaults to False. If it is set to true, then the yield will return the events, otherwise it will return the buffer.

But there might be a reason that this isn't a good idea that's beyond my level, so let me know if that's not a good idea.

fonty422 commented 7 months ago

Ah, it's already there with the take_events function, yeah?