airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.36k stars 118 forks source link

Feature: Replay messages #1618

Open limx0 opened 1 month ago

limx0 commented 1 month ago

Is your feature request related to a problem? Please describe. I would like to be able to replay the messages stored on a persistent queue on subscribe on process start. If my process restarts, I would like the option to replay some or all of the messages on a given topic/queue.

Describe the solution you'd like Add some sort of replay or start_timestamp kwargs for subscribe that allows replaying messages stored in backend on startup.

Feature code example To help others understand the proposed feature, illustrate it with a FastStream code example:

from faststream import FastStream

broker = RedisBroker()

@broker.subscriber(stream="mytopic", replay=True)
async def handle(msg):
    # Message is the first message in the redis steam
    print(msg)

Describe alternatives you've considered I thought this would have discussed/implemented given the majority of backends support this functionality (RMQ, redis, nats, kafka), but I can't find anything in the docs or issues.

Additional context Apologies if I've somehow missed something here!

Lancetnik commented 1 month ago

Hi! I am sure, that you are able to implement such behavior already by playing with various stream options (last_id at least). I'll try to dig into to help your.

Btw, did you tried to use persisntent subscriber (it saves read position between restarts): https://faststream.airt.ai/0.5/redis/streams/groups/

Lancetnik commented 3 weeks ago

@kumaranvpl you have a bigger expertiece in Redis, can you please take a quick look and decide, should has it sence, should we add new API for it or it is already implemented by some options. Just for backlog