If the future can be dropped by tokio::select or other methods when searching for a change stream, the resume process after an error occurs in stream.message() will remain in the Pending state forever. enable_resume=false in QueryOption to immediately return an error without performing the resume process.
async fn query(client: Client) {
let mut tx = client.single().await.unwrap();
let option = QueryOptions {
enable_resume: false,
..Default::default()
};
let mut stmt = Statement::new("SELECT ChangeRecord FROM READ_UserItemChangeStream (
start_timestamp => @now,
end_timestamp => NULL,
partition_token => {},
heartbeat_milliseconds => 10000
)");
stmt.add_param("now", &OffsetDateTime::now_utc());
let mut rows = tx.query_with_option(stmt, option).await.unwrap();
let mut tick = tokio::time::interval(tokio::time::Duration::from_millis(100));
loop {
tokio::select! {
_ = tick.tick() => {
// run task
},
maybe = rows.next() => {
let row = maybe.unwrap().unwrap();
}
}
}
}
This PR will fix #158.
If the future can be dropped by
tokio::select
or other methods when searching for a change stream, the resume process after an error occurs instream.message()
will remain in the Pending state forever.enable_resume=false
inQueryOption
to immediately return an error without performing the resume process.