tikv / client-rust

Rust Client for TiKV.
Apache License 2.0
390 stars 131 forks source link

transaction: `Transaction` would be hanged in dropping. #428

Closed pingyu closed 1 year ago

pingyu commented 1 year ago

In a long running program which using client-rust to request TiKV servers, there are low chances that the program will hang.

The hang position should be in dropping the Transaction object. After adding logs, we can see the log of "dropping transaction" but the outer code block did not exit.

To further confirm the issue, I write a snippet to make the problem reproducible (see here).

    async fn txn_task() {
        let pd_client = Arc::new(MockPdClient::default());
        let options = TransactionOptions::new_pessimistic().read_only();
        let txn = Transaction::new(Timestamp::default(), pd_client, options);
        txn.check_allow_operation().await.unwrap();
        drop(txn);
    }

    #[tokio::test]
    async fn test_transaction_drop_st() -> Result<(), io::Error> {
        txn_task().await;
        Ok(())
    }

    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn test_transaction_drop_mt() -> Result<(), io::Error> {
        const CONCURRENCY: usize = 10;
        let mut handles = Vec::with_capacity(CONCURRENCY);
        for _ in 0..CONCURRENCY {
            handles.push(tokio::spawn(txn_task()));
        }

        for handle in handles {
            handle.await.unwrap();
        }
        Ok(())
    }

There is also a small change on Transaction::drop (the tokio::time::sleep(Duration::from_millis(100)).await), otherwise it's not possible to reproduce the problem. The sleep is to simulate the scene that the lock can no be acquired immediately, which would happen when, i.e, the heartbeat_task has acquired the lock.

    fn drop(&mut self) {
        debug!("dropping transaction");
        if std::thread::panicking() {
            return;
        }
        let mut status = futures::executor::block_on(async {
            tokio::time::sleep(Duration::from_millis(100)).await;
            self.status.write().await
        });
        if *status == TransactionStatus::Active {
            match self.options.check_level {
                CheckLevel::Panic => {
                    panic!("Dropping an active transaction. Consider commit or rollback it.")
                }
                CheckLevel::Warn => {
                    warn!("Dropping an active transaction. Consider commit or rollback it.")
                }

                CheckLevel::None => {}
            }
        }
        *status = TransactionStatus::Dropped;
    }

Running these two tests will definitely hang:

❯ cargo test test_transaction_drop_mt -- --nocapture
    Finished test [unoptimized + debuginfo] target(s) in 0.14s
     Running unittests src/lib.rs (target/debug/deps/tikv_client-802bb59de2ea8dfe)

running 1 test
test transaction::transaction::tests::test_transaction_drop_mt has been running for over 60 seconds
^C
❯
❯ cargo test test_transaction_drop_st -- --nocapture
   Compiling tikv-client v0.2.0 (/data/nvme0n1/home/pingyu/workspace/client-rust)
    Finished test [unoptimized + debuginfo] target(s) in 10.50s
     Running unittests src/lib.rs (target/debug/deps/tikv_client-802bb59de2ea8dfe)

running 1 test
test transaction::transaction::tests::test_transaction_drop_st has been running for over 60 seconds
^C

This should be a known issue when using futures::executor::block_on in an async context. See https://github.com/tokio-rs/tokio/issues/2376.

pingyu commented 1 year ago

The address this issue, I'm trying to change the Transaction::status from RwLock to AtomicU8, and atomically transit status by compare_exchange_weak.