use std::time::Duration;
use futures_util::StreamExt;
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::subscription::{SubscribeConfig, SubscriptionConfig};
use google_cloud_gax::grpc::Status;
async fn run(config: ClientConfig) -> Result<(), Status> {
// Creating Client, Topic and Subscription...
let client = Client::new(config).await.unwrap();
let subscription = client.subscription("test-subscription");
// Read the messages as a stream
let mut stream = subscription.subscribe(None).await.unwrap();
let cancellable = stream.cancellable();
let task = tokio::spawn(async move {
// None if the stream is cancelled
while let Some(message) = stream.next().await {
message.ack().await.unwrap();
}
});
tokio::time::sleep(Duration::from_secs(60)).await;
cancellable.cancel();
let _ = task.await;
Ok(())
}
Added
MessageStream#cancellable()
to finish subscribe messages gracefully.https://github.com/yoshidan/google-cloud-rust/issues/266