sewenew / redis-plus-plus

Redis client written in C++
Apache License 2.0
1.64k stars 351 forks source link

[QUESTION]: Continuously listen to Redis stream using xreadgroup #339

Closed sukhiwrring closed 2 years ago

sukhiwrring commented 2 years ago

I am trying write a code for a consumergroup to continuously listen to the server and reads whenever a new message is added to the stream. The BLOCK parameter would block only until message is unavailable. It is required to send XREADGROUP command again in order to listen to the last added message or a new message. How can consumergroup be made online/active all the time and make the program to not end and keep listening to the stream?

krishna116 commented 2 years ago

I think any operation can block forever if you like, for example:

while(ping(redis server is not dead))
{
    //your_block_operation();
    //sleep(some time);
}

thanks.

sukhiwrring commented 2 years ago

The purpose here is to keep listening for the added messages over the streams. Whenever a message comes in, XREADGROUP should process that message and then instead of exit, it should wait for the next message and this goes on.

krishna116 commented 2 years ago

I think it is user's responsibility to make it block forever or not, because the API design may be more flexible to most users.

You can see the official document has the same idea as I said before. Follow code come from https://redis.io/commands/xreadgroup

WHILE true
    entries = XREADGROUP GROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >
    if entries == nil
        puts "Timeout... try again"
        CONTINUE
    end

    FOREACH entries AS stream_entries
        FOREACH stream_entries as message
            process_message(message.id,message.fields)

            # ACK the message as processed
            XACK mystream $GroupName message.id
        END
    END
END

thanks.

sewenew commented 2 years ago

@sukhiwrring You need to run XREADGROUP in a loop as @krishna116 suggested. Redis::xreadgroup is a low level API, and works the same as XREADGROUP command.

If you need to make it process that message and then instead of exit, it should wait for the next message and this goes on, so far, you have to do it in application code.

Regards

sukhiwrring commented 2 years ago

@sewenew @krishna116 Thanks for your suggestion. I am getting the expected output by using loop.