launchbadge / sqlx

🧰 The Rust SQL Toolkit. An async, pure Rust SQL crate featuring compile-time checked queries without a DSL. Supports PostgreSQL, MySQL, and SQLite.
Apache License 2.0
12.38k stars 1.18k forks source link

Transaction commit raises CockroachDB error #933

Open imalsogreg opened 3 years ago

imalsogreg commented 3 years ago

Hello! I've been using sqlx to talk to CockroachDB databases and recently started an upgrade from sqlx-0.3.5 to sqlx-0.4.2. After upgrading, many tests began to fail with

PgDatabaseError {
  severity: Error, code: "0A000",
  message: "unimplemented: multiple active portals not supported",
  detail: None,
  hint: Some("You have attempted to use a feature that is not yet 
    implemented.\nSee: https://go.crdb.dev/issue-v/40195/v20.1"),
  position: None, 
  where: None, 
  schema: None, 
  table: None, 
  column: None, 
  data_type: None, 
  constraint: None, 
  file: Some("distsql_running.go"), 
  line: Some(775), 
  routine: Some("init") 
}

I came up with a minimal reproducing test case that does a SELECT 1. When this query is executed within a transaction, it fails. But if I run the query directly against the postgres connection pool, it succeeds.

    #[tokio::test]
    async fn test_sqlx_transaction() -> Result<(), sqlx::Error> {
        let pool = PgPoolOptions::new()
            .max_connections(5)
            .connect("postgres://root@localhost:26257/api_server")
            .await?;
        let mut txn = pool.begin().await.expect("should get txn");

        let row: (i64,) = sqlx::query_as("SELECT $1")
            .bind::<i64>(1)
            .fetch_one(&mut txn)
            .await
            .expect("query_as error");

        txn.commit().await.expect("txn should commit");

        println!("row.0: {}", row.0);
        Ok(())
    }

Both cases succeed for sqlx-0.3.5.

I was told in the CockroachDB community slack that my database driver is trying to have multiple active result sets (portals) concurrently on the same connection. I'm not sure where to go from there. Thanks for any ideas! I'm happy to work on a patch, given some clues, if you have ideas about what is happening.

imalsogreg commented 3 years ago

We discovered that fetch_all within a transaction does not exhibit the bug. The bugged behavior only appears when using fetch_one or fetch_optional, and only within a transaction.

While trying to debug, I was confused by the way that stream writing and message-awaiting depend on the state of the StatementCache: https://github.com/launchbadge/sqlx/blob/master/sqlx-core/src/postgres/connection/executor.rs#L180-L188. insert returns the id of a newly evicted cache line (if there is one), and then the code writes Close for that evicted cache line, and waits for completion signal from the server only if a cache line is evicted. This doesn't seem like desirable behavior though - do I have the right idea?

lacasaprivata2 commented 3 years ago

++++1

sergeyshaykhullin commented 2 years ago

Any existing workaround?

imalsogreg commented 2 years ago

@sergeyshaykhullin I've been using the following code. It lets you write .fetch_one_shim(&mut txn) where you would have written .fetch_one(&mut txn), and .fetch_optional_shim(&mut txn) where you would have written .fetch_optional(&mut txn).

The implementation calls .fetch_all(&mut txn), so it's inefficient if your SQL query returns more than one row.

use sqlx::FromRow;

#[async_trait]
/// A trait for adding shim methods to sqlx::QueryAs instances.
/// This is a shim to be used while we work on an upstream fix for sqlx.
/// See https://github.com/launchbadge/sqlx/issues/933 and
/// https://github.com/cockroachdb/cockroach/issues/40195 for more details.
pub trait FetchShim<O> {
    /// Like `sqlx::fetch_one`, except that internally it calls `sqlx::fetch_all`.
    /// This will be less efficient than `sqlx::fetch_one`, if the query does not
    /// limit its result rows to 1
    async fn fetch_one_shim<'e, 'c, E>(self, query: E) -> Result<O, sqlx::Error>
    where
        O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
        E: 'e + Executor<'c, Database = Postgres>;

    /// Like `sqlx::fetch_one`, except that internally it calls `sqlx::fetch_all`.
    /// This will be less efficient than `sqlx::fetch_one`, if the query does not
    /// limit its result rows to 1
    async fn fetch_optional_shim<'e, 'c, E>(self, query: E) -> Result<Option<O>, sqlx::Error>
    where
        O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
        E: 'e + Executor<'c, Database = Postgres>;
}

#[async_trait]
impl<'q, O> FetchShim<O> for sqlx::query::QueryAs<'q, sqlx::Postgres, O, PgArguments> {
    async fn fetch_one_shim<'e, 'c, E>(self, query: E) -> Result<O, sqlx::Error>
    where
        O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
        E: 'e + Executor<'c, Database = Postgres>,
    {
        let maybe_rows = self.fetch_all(query).await;
        match maybe_rows {
            Ok(rows) => match rows.into_iter().next() {
                Some(x) => Ok(x),
                None => Err(sqlx::error::Error::RowNotFound),
            },
            Err(y) => Err(y),
        }
    }

    async fn fetch_optional_shim<'e, 'c, E>(self, query: E) -> Result<Option<O>, sqlx::Error>
    where
        O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
        E: 'e + Executor<'c, Database = Postgres>,
    {
        let maybe_rows = self.fetch_all(query).await;
        match maybe_rows {
            Ok(rows) => Ok(rows.into_iter().next()),
            Err(y) => Err(y),
        }
    }
}
Lohann commented 1 year ago

Thx @imalsogreg it worked! btw follow a more generic implementation that also includes Map and works with query_as! macro:

#[async_trait]
/// A trait for adding shim methods to sqlx::QueryAs instances.
/// This is a shim to be used while we work on an upstream fix for sqlx.
/// See https://github.com/launchbadge/sqlx/issues/933 and
/// https://github.com/cockroachdb/cockroach/issues/40195 for more details.
pub trait FetchShim<DB, O>
where
    DB: Database,
{
    /// Like `sqlx::fetch_one`, except that internally it calls `sqlx::fetch_all`.
    /// This will be less efficient than `sqlx::fetch_one`, if the query does not
    /// limit its result rows to 1
    async fn fetch_one_shim<'e, 'c, E>(self, query: E) -> Result<O, sqlx::Error>
    where
        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
        E: 'e + Executor<'c, Database = DB>;

    /// Like `sqlx::fetch_one`, except that internally it calls `sqlx::fetch_all`.
    /// This will be less efficient than `sqlx::fetch_one`, if the query does not
    /// limit its result rows to 1
    async fn fetch_optional_shim<'e, 'c, E>(self, query: E) -> Result<Option<O>, sqlx::Error>
    where
        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
        E: 'e + Executor<'c, Database = DB>;
}

#[async_trait]
impl<'q, DB, O, A> FetchShim<DB, O> for sqlx::query::QueryAs<'q, DB, O, A>
where
    DB: sqlx::Database,
    A: 'q + sqlx::IntoArguments<'q, DB>,
{
    async fn fetch_one_shim<'e, 'c, E>(self, query: E) -> Result<O, sqlx::Error>
    where
        DB: sqlx::Database,
        O: Send + Unpin + for<'r> sqlx::FromRow<'r, DB::Row>,
        E: 'e + sqlx::Executor<'c, Database = DB>,
    {
        self.fetch_optional_shim(query).await?.ok_or(Error::RowNotFound)
    }

    async fn fetch_optional_shim<'e, 'c, E>(self, query: E) -> Result<Option<O>, sqlx::Error>
    where
        O: Send + Unpin + for<'r> sqlx::FromRow<'r, DB::Row>,
        E: 'e + sqlx::Executor<'c, Database = DB>,
    {
        Ok(self.fetch_all(query).await?.into_iter().next())
    }
}

#[async_trait]
impl<'q, DB, F, O, A> FetchShim<DB, O> for sqlx::query::Map<'q, DB, F, A>
where
    DB: sqlx::Database,
    F: FnMut(DB::Row) -> Result<O, sqlx::Error> + Send,
    O: Send + Unpin,
    A: 'q + Send + sqlx::IntoArguments<'q, DB>,
{
    async fn fetch_one_shim<'e, 'c, E>(self, query: E) -> Result<O, sqlx::Error>
    where
        O: Send + Unpin + for<'r> sqlx::FromRow<'r, DB::Row>,
        E: 'e + sqlx::Executor<'c, Database = DB>,
    {
        self.fetch_optional_shim(query).await?.ok_or(Error::RowNotFound)
    }

    async fn fetch_optional_shim<'e, 'c, E>(self, query: E) -> Result<Option<O>, sqlx::Error>
    where
        O: Send + Unpin + for<'r> sqlx::FromRow<'r, DB::Row>,
        E: 'e + sqlx::Executor<'c, Database = DB>,
    {
        Ok(self.fetch_all(query).await?.into_iter().next())
    }
}
abonander commented 1 year ago

I believe this is a bug in CockroachDB, not SQLx. We never use anything but the unnamed portal, which should be replaced every time we send a Bind message or closed at the end of a transaction.

Trying to find the error message in the source didn't yield any code results, but did give me this commit: https://github.com/cockroachdb/cockroach/commit/0ce44436cc16b9e8a9a1f7815f85e8b8b42fe274

So it appears that this will likely be fixed in the next release of CockroachDB.