scylladb / python-driver

ScyllaDB Python Driver, originally DataStax Python Driver for Apache Cassandra
https://python-driver.docs.scylladb.com
Apache License 2.0
70 stars 42 forks source link

Python Driver fails to update existing `PreparedStatement`s after table alter #346

Open kbr-scylla opened 1 month ago

kbr-scylla commented 1 month ago

The following test written for Scylla's test.py framework fails:

import logging
import pytest
from test.pylib.manager_client import ManagerClient
logger = logging.getLogger(__name__)

@pytest.mark.asyncio
async def test_alter_prepared(manager: ManagerClient) -> None:
    s1 = await manager.server_add()
    cql, _ = await manager.get_ready_cql([s1])

    logger.info("ks")
    await cql.run_async("create keyspace ks with replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
    await cql.run_async("create table ks.t (pk int primary key, x int)")

    stmt = cql.prepare("update ks.t set x = ? where pk = ?")
    await cql.run_async(stmt, [0, 0])

    await cql.run_async("alter table ks.t alter x type blob")
    #stmt = cql.prepare("update ks.t set x = ? where pk = ?")
    #await cql.run_async(stmt, [0, 0])
    await cql.run_async(stmt, [b'', 0])

with:

E                   TypeError: Received an argument of invalid type for column "x". Expected: <class 'cassandra.cqltypes.Int32Type'>, Got: <class 'bytes'>; (required argument is not an integer)

In this case (I investigated), the Python driver does not attempt a statement reprepare at all. This is problem number 1.

If we uncomment:

    await cql.run_async(stmt, [0, 0])

(second-to-last statement), this triggers a reprepare (which I checked by adding some logs to the driver), because driver sends a request and gets a "query not prepared" response from Scylla. This statement passes. However, the following statement:

    await cql.run_async(stmt, [b'', 0])

still fails, even after repreparation. This is problem number 2.

If we uncomment the explicit

    stmt = cql.prepare("update ks.t set x = ? where pk = ?")

(after the alter), then the last statement passes (the one which sends [b'', 0]), whether or not we uncomment the the previous statement (the one which sends [0, 0]).

I found an easy way to solve problem number 2, by adjusting the reprepare code to update column_metadata inside the PreparedStatement object (following the logic used when PreparedStatement is first created by prepare: column_metadata is set to the response bind_metadata):

diff --git a/cassandra/cluster.py b/cassandra/cluster.py
index 5f2669c0..aaff3432 100644
--- a/cassandra/cluster.py
+++ b/cassandra/cluster.py
@@ -4963,6 +4963,7 @@ class ResponseFuture(object):
                             )
                         ))
                     self.prepared_statement.result_metadata = response.column_metadata
+                    self.prepared_statement.column_metadata = response.bind_metadata
                     new_metadata_id = response.result_metadata_id
                     if new_metadata_id is not None:
                         self.prepared_statement.result_metadata_id = new_metadata_id

(this is in _execute_after_prepare function in cassandra/cluster.py).

(BTW. the names are weird, yes)

However I don't know how to solve problem 1. In problem 1, the driver does not even attempt a reprepare, because it fails on serialization stage before even sending the request, due to outdated column_metadata which it apparently uses to do the serialization.


The problems have different symptoms (although the root causes are the same), if instead changing the type of a column using alter ks.t alter x type ..., we introduce a user-defined type, and then add a column to this type. Consider this test:

import logging
import pytest
from dataclasses import dataclass
from test.pylib.manager_client import ManagerClient

logger = logging.getLogger(__name__)

@dataclass
class Typ1:
    a: int

@dataclass
class Typ2:
    a: int
    b: int

@pytest.mark.asyncio
async def test_alter_prepared(manager: ManagerClient) -> None:
    s1 = await manager.server_add()
    cql, _ = await manager.get_ready_cql([s1])

    logger.info("ks")
    await cql.run_async("create keyspace ks with replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
    await cql.run_async("create type ks.typ (a int)")
    await cql.run_async("create table ks.t (pk int primary key, x frozen<typ>)")

    stmt = cql.prepare("update ks.t set x = ? where pk = ?")
    await cql.run_async(stmt, [Typ1(0), 0])

    await cql.run_async("alter type ks.typ add b int")
    #stmt = cql.prepare("update ks.t set x = ? where pk = ?")
    await cql.run_async(stmt, [Typ2(0, 0), 1])
    await cql.run_async(stmt, [Typ2(0, 0), 2])

    rs = await cql.run_async("select pk, x from ks.t")
    logger.info(list(rs))

There are no failures. However, the result is: [Row(pk=1, x=typ(a=0, b=None)), Row(pk=0, x=typ(a=0, b=None)), Row(pk=2, x=typ(a=0, b=None))]

So b is None for pk=1 and pk=2, even though we bound Typ2(0, 0) when updating those keys.

After my column_metadata fix, the result changes to: [Row(pk=1, x=typ(a=0, b=None)), Row(pk=0, x=typ(a=0, b=None)), Row(pk=2, x=typ(a=0, b=0))]

So for pk=1 the result is still None. Even though this statement causes a repreparation; the driver should in theory resend the query with updated column_metadata so b should get updated for pk=1 too.

But for pk=2 the result is correct -- apparently this time the updated column_metadata helped.

Uncommenting the explicit prepare also helps.


So is this a bug?

Well, at least Rust driver does provide a much better user experience. This (executed against pre-existing single-node cluster):

use anyhow::Result;
use scylla::{SessionBuilder};
use std::env;

use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let uri = env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
    let session = SessionBuilder::new() .known_node(uri) .build() .await?;
    let session = Arc::new(session);

    session.query("DROP KEYSPACE IF EXISTS ks", &[],).await?;
    session.query("CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}", &[],).await?;
    session.query("CREATE TABLE ks.t ( a int primary key, b int)", &[],).await?;

    let insert = Arc::new(session.prepare("UPDATE ks.t SET b = ?  WHERE a = ?  ").await?);

    if let Err(e) = session.execute(&insert, (0 as i32, 0 as i32)).await {
        println!("{}", e);
        std::process::exit(1);
    }

    session.query("ALTER TABLE ks.t alter b type blob", &[],).await?;

    if let Err(e) = session.execute(&insert, (Vec::<u8>::new(), 1 as i32)).await {
        println!("{}", e);
        std::process::exit(1);
    }

    std::process::exit(0);
}

works out of the box, and gives

cqlsh> select * from ks.t;

 a | b
---+------------
 1 |         0x
 0 | 0x00000000

(2 rows)

looking at the code, apparently Rust driver does not have a corresponding structure to Python driver's column_metadata / bind_metadata to be used for serialization; instead, it seems to use the types coming with the tuple. And the first statement after the ALTER reprepares, but does not seem to update the insert object, which apparently is not necessary for this driver.

Similarly the experience is better with UDTs:

use anyhow::Result;
use scylla::{SessionBuilder};
use std::env;

use std::sync::Arc;

use scylla::macros::{FromUserType, IntoUserType};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let uri = env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
    let session = SessionBuilder::new() .known_node(uri) .build() .await?;
    let session = Arc::new(session);

    session.query("DROP KEYSPACE IF EXISTS ks", &[],).await?;
    session.query("CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}", &[],).await?;
    session.query("CREATE TYPE ks.typ (a int,); ", &[],).await?;
    session.query("CREATE TABLE ks.t ( a int primary key, x frozen<typ>)", &[],).await?;

    #[derive(Debug, IntoUserType, FromUserType, Clone)]
    struct Typ1 {
        a: i32,
    }

    #[derive(Debug, IntoUserType, FromUserType, Clone)]
    struct Typ2 {
        a: i32,
        b: i32,
    }

    let insert = Arc::new(session.prepare("UPDATE ks.t SET x = ?  WHERE a = ?  ").await?);

    if let Err(e) = session.execute(&insert, (&Typ1{a: 0}, 0 as i32)).await {
        println!("{}", e);
        std::process::exit(1);
    }

    session.query("ALTER type ks.typ add b int", &[],).await?;

    if let Err(e) = session.execute(&insert, (&Typ2{a: 0, b: 0}, 1 as i32)).await {
        println!("{}", e);
        std::process::exit(1);
    }

    std::process::exit(0);
}

result:

cqlsh> select * from ks.t;

 a | x
---+-----------------
 1 |    {a: 0, b: 0}
 0 | {a: 0, b: null}

(2 rows)

cc @Lorak-mmk

Lorak-mmk commented 1 month ago

First, I think you are using quite an old Rust Driver version. Current is 0.13 and 0.11 released in December introduced new serialization traits that perform type checking on the client side. Currently your first Rust example fails with:

 Serializing values failed: SerializationError: Failed to serialize query arguments (alloc::vec::Vec<u8>, i32): failed to serialize column b: SerializationError: Failed to type check Rust type alloc::vec::Vec<u8> against CQL type Int: expected one of the CQL types: [Blob]

and second (after slight modifications to make it work with current version) fails with:

Serializing values failed: SerializationError: Failed to serialize query arguments (&testrow::main::{{closure}}::Typ2, i32): failed to serialize column x: SerializationError: Failed to type check Rust type testrow::main::{{closure}}::Typ2 against CQL type UserDefinedType { type_name: "typ", keyspace: "ks", field_types: [("a", Int)] }: the field b is missing in the Rust data but is required by the CQL UDT type

which btw seems like a problem with error message: it says that field b is missing in Rust data, but it's missing in CQL UDT - so it's the other way around than message says.

Now to address the issue: to the best of my knowledge what you described is how all the drivers work - metadata in prepared statements is constant and user needs to take care of updating this metadata by creating the prepared statement again. I see some reasons for that:

We could discuss doing it in different way if you want, but I suspect it would require significant, possibly backwards incompatible, changes in the driver.

Regarding your proposed fix: prepared statement is a shared object that can be used concurrently in multiple queries. It's not hard to imagine scenario where driver sends a query to node A, updates metadata, concurrently sends to B which has older schema and updates metadata back - not allowing you to send statement using new data type.

kbr-scylla commented 3 weeks ago

If you add a column then you need to modify you queries to make use of it. If you change type of column then you need to change type of what you use in queries.

That would mean disruption in availability.

Adding column: obviously you don't have to change the application, you can keep omitting this column from INSERT or UPDATE, until you are ready to deploy new version of your app. But this takes time. Schema change can be done ahead of time.

Changing column type to a compatible one, in particular, adding new field to a user-defined-type: same argument.

Schema change is not an atomic event iiuc and I don't see how exactly should the driver deal with different nodes having temporarily different schema versions.

Driver has a pool of connections. A given shard has a single schema version at a given time. Given connections acts according to that version.