yoshidan / google-cloud-rust

Google Cloud Client Libraries for Rust.
MIT License
243 stars 87 forks source link

RowIterator+AsyncIterator are not cancellation safe #158

Closed ivankelly closed 1 week ago

ivankelly commented 1 year ago

To repro:

At this point, the iterator should error and we return from the loop. However the client goes into a retry, giving the tick() future a chance to complete. the iterator.next() is cancelled and the error never returned to next_entry_res. It goes into an error state,but this error state always returns None for next() calls. This is particularly bad for change streams, because None indicates you've read everything from the stream, which may not be the case. So this loops forever.

yoshidan commented 1 year ago

Thanks for pointing this out. I am not sure I understand it.

ivankelly commented 1 year ago

The iterator itself doesn't get dropped, but the future returned by iterator.next() can be dropped if iterval.tick complete first. I don't think this is normally a problem because the iterator is mostly cancellation safe. But if you get a error from spanner for a future that is dropped, that never bubbles up, and then the next call to iterator.next returns with a None which looks like a normal partition completion (when using change streams).

I think the problem is in try_recv, which calls self.streaming.message().await which calls poll_next(),

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {                                                                                                                                                                                
        loop {                                                                                                                                                                                                                                                                
            if let State::Error = &self.inner.state {                                                                                                                                                                                                                         
                return Poll::Ready(None);                                                                                                                                                                                                                                     
            }           

If you miss the error that put the inner state into State::Error, you'll get None forever. This may not even be a spanner client issue. It could well be tonic.

yoshidan commented 1 year ago

Thank you for the detailed information. I was able to reproduce the phenomenon. When a future is dropped using tokio::select, if an error is generated by streaming.message() in try_recv, then self.reader.read will remain in Pending state and the process will not be completed. https://github.com/yoshidan/google-cloud-rust/blob/207422caf07dad8c0e9f80ed7c47cb37b21d7716/spanner/src/reader.rs#L232

If try_recv was called in another future, such as when a tick was terminated, streaming.message() would always return None.

yoshidan commented 1 year ago

I added enable_resume option to QueryOptions. If this is set to false and query is executed, the RowIterator will be cancel safe because it will not be automatically resumed.

https://github.com/yoshidan/google-cloud-rust/pull/162

ivankelly commented 1 year ago

👍 good stuff. In our case we're just going to assume nothing is cancellation safe. It's too sharp an edge to be load bearing for correctness. Our final solution used futures::future::select: https://docs.rs/futures/latest/futures/future/fn.select.html. It's ugly as hell but it's safe.

'mainloop: loop {
    let mut data_future = iterator.next();
    'innerloop: loop {
        let tick = timed_flush_check_interval.tick();
        pin_mut!(tick);
        match select(data_future, tick).await {
            Either::Left((data, _)) => {
                let status = process_the_data(data).await?;
                if status == Status::Finished {
                    break 'mainloop;
                };
                break 'innerloop;
            }
            Either::Right((_, future)) => {
                data_future = future;
            }
        }
        things_that_need_to_run_periodically().await?;
    }
}

Thanks for your help on this. I have another bug incoming, but this will come as a PR. Session leakage.