rabbitmq / rabbitmq-stream-rust-client

A client library for RabbitMQ streams
Other
156 stars 15 forks source link

Issue starting the Consumer from a given offset #222

Closed DanielePalaia closed 2 months ago

DanielePalaia commented 2 months ago

Describe the bug

OffsetSpecification::Offset is sometimes working wrong:

let stored_offset:u64 = 45;
let mut consumer = environment
    .consumer()
    .name("consumer-1")
    .offset(OffsetSpecification::Offset(stored_offset))
    .build(stream)
    .await
    .unwrap();

it is sometimes consuming from a wrong offset (like 32 instead of 45)

Reproduction steps

use futures::StreamExt;
use rabbitmq_stream_client::error::StreamCreateError;
use rabbitmq_stream_client::types::{ByteCapacity, OffsetSpecification, ResponseCode};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::task;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    use rabbitmq_stream_client::Environment;
    let environment = Environment::builder().build().await?;
    let stream = "test-stream";
    let first_offset = Arc::new(AtomicI64::new(-1));
    let last_offset = Arc::new(AtomicI64::new(-1));
    let notify_on_close = Arc::new(Notify::new());
    let create_response = environment
        .stream_creator()
        .max_length(ByteCapacity::GB(2))
        .create(stream)
        .await;

    if let Err(e) = create_response {
        if let StreamCreateError::Create { stream, status } = e {
            match status {
                // we can ignore this error because the stream already exists
                ResponseCode::StreamAlreadyExists => {}
                err => {
                    println!("Error creating stream: {:?} {:?}", stream, err);
                    std::process::exit(1);
                }
            }
        }
    }

    let stored_offset:u64 = 45;
    let mut consumer = environment
        .consumer()
        .name("consumer-1")
        .offset(OffsetSpecification::Offset(stored_offset))
        .build(stream)
        .await
        .unwrap();

    println!("Started consuming");

    let first_cloned_offset = first_offset.clone();
    let last_cloned_offset = last_offset.clone();
    let notify_on_close_cloned = notify_on_close.clone();

    task::spawn(async move {
        let mut received_messages = -1;
        while let Some(delivery) = consumer.next().await {
            let d = delivery.unwrap();

            println!("offset {} ", d.offset());
            if first_offset.load(Ordering::Relaxed) == -1 {
                println!("First message received");
                _ = first_offset.compare_exchange(
                    first_offset.load(Ordering::Relaxed),
                    d.offset() as i64,
                    Ordering::Relaxed,
                    Ordering::Relaxed,
                );
            }
            //received_messages = received_messages + 1;
            if String::from_utf8_lossy(d.message().data().unwrap()).contains("marker")
            {
                if String::from_utf8_lossy(d.message().data().unwrap()).contains("marker") {
                    last_offset.store(d.offset() as i64, Ordering::Relaxed);
                    let handle = consumer.handle();
                    _ = handle.close().await;
                    notify_on_close_cloned.notify_one();
                    break;
                }
            }
        }
    });

    notify_on_close.notified().await;

    if first_cloned_offset.load(Ordering::Relaxed) != -1 {
        println!(
            "Done consuming first_offset: {:?} last_offset: {:?}  ",
            first_cloned_offset, last_cloned_offset
        );
    }

    Ok(())
}

Expected behavior

Always start from the given offset

Additional context

No response

wolf4ood commented 2 months ago

By a quick digging i think the message skipping is not yet implemented as in other clients when the OffsetSpecification is Offset like here in the go one.

A similar approach should be implemented in the Rust driver when handling the delivery message here

DanielePalaia commented 2 months ago

Thanks @wolf4ood I'll have a look