faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.67k stars 183 forks source link

Why doesn't Event.forward default to the timestamp of the event? #427

Open cowboygneox opened 1 year ago

cowboygneox commented 1 year ago

So in Event.forward: https://github.com/faust-streaming/faust/blob/master/faust/events.py#L167

which is used by the Stream.group_by: https://github.com/faust-streaming/faust/blob/master/faust/streams.py#L890

When using group_by, the original timestamp of the event is lost. In the case of the windowing I'm experimenting with, losing this timestamp breaks the utility of using group_by and instead forces me to either build the topic with the correct partitioning key (which I may not always have control over) or to effectively do what group_by is supposed to do.

Is this intended behavior? Can I make a PR that will change Event.forward to default to Event.message.timestamp if timestamp is not provided?

Thanks!

cowboygneox commented 1 year ago

I guess we could also explicitly pass the event.message.timestamp in the Stream.group_by.repartition as well. That would fix this immediate problem and lessen any blast radius.

So I'm suggesting something like:

        async def repartition(value: T) -> T:
            event = self.current_event
            if event is None:
                raise RuntimeError("Cannot repartition stream with non-topic channel")
            new_key = await format_key(key, value)
            await event.forward(channel, key=new_key, timestamp=event.message.timestamp)
            return value