Closed damjankuznar closed 4 months ago
As you say, when a message remains in the queue at the time of MessageStream
drop
, it is not nacked and waits for redelivery.
Therefore, we are currently considering the following interfaces
MessageStream#dispose
This will perform a nack on all messages present in the queue when disposing.let token = tokio_util::sync::CancellationToke::new();
let child_token = token.child_token();
let task = tokio::spawn(async move {
let config = SubscribeConfig::default().with_cancellable_by(child_token);
let mut stream = subscription.subscribe(Some(config)).await.unwrap();
// None when the token is cancelled
while let Some(message) = stream.next().await {
...
}
stream.dispose().await
});
token.cancel();
task.await();
or
let token = tokio_util::sync::CancellationToke::new();
let child_token = token.child_token();
let task = tokio::spawn(async move {
let mut stream = subscription.subscribe(None).await.unwrap();
while let Some(message) = tokio::select! {
msg = stream.next() => msg,
_ = child_token.cancelled() => None
} {
...
}
stream.dispose().await
});
token.cancel();
task.await();
Both examples seem more or less the same, so not sure what's the difference supposed to be. In any case, this looks exactly what I'm looking for, so :+1: from me.
thank you @yoshidan
Currently the "prefetched/buffered" messages that are in the MessageStream queue are silently dropped when MessageStream is dropped which causes the messages to be retried only after
stream_ack_deadline_seconds
passes and message is retried by PubSub.This doesn't work well in low latency scenarios when a process needs to be restarted, e.g. deploying a new app version.
Ideally, we would have an option to:
which would support cases where an app graceful shutdown is important.