scylladb / scylla-rust-driver

Async CQL driver for Rust, optimized for ScyllaDB!
Apache License 2.0
562 stars 98 forks source link

How to get batch LWT statement result? #998

Open fortunado opened 4 months ago

fortunado commented 4 months ago

Hi! I'm running 3 statements in a batch and I'd like to get to know if batch was applied or not. Please, advise.

Thanks.

wprzytula commented 4 months ago

You can know it based on Result obtained from batch().await call:

fortunado commented 4 months ago

Thank you for such quick response, but it is not 100% clear to me. Could you please point to the actual error code when one of batch statement correct, executed, but not applied (IF condition of one of the statements returned false, and the whole batch is not applied)?

When I'm experimenting in db console, such batch returns "ok":

[2024-05-13 19:00:33] 1 row retrieved starting from 1 in 31 ms (execution: 17 ms, fetching: 14 ms)

And the first column in that returned row contains status "applied: false", so it looks like result with the rows, not the error. Can it be retrieved somehow, may be with maybe_first_row_typed::<???>() in order to check value of the first column with statement status?

wprzytula commented 4 months ago

@Lorak-mmk any ideas?

fortunado commented 4 months ago

Well, I wrote minimal repro of my case, and yes - on a batch query there is a QueryResult that looks like:

OK2 QueryResult { rows: Some([Row { columns: [Some(Boolean(true)), None, None, None] }]), warnings: [], tracing_id: None, paging_state: None, col_specs: [ColumnSpec { table_spec: TableSpec { ks_name: "examples_ks", table_name: "cf" }, name: "[applied]", typ: Boolean }, ColumnSpec { table_spec: TableSpec { ks_name: "examples_ks", table_name: "cf" }, name: "account_d", typ: Int }, ColumnSpec { table_spec: TableSpec { ks_name: "examples_ks", table_name: "cf" }, name: "updated_at", typ: Timestamp }, ColumnSpec { table_spec: TableSpec { ks_name: "examples_ks", table_name: "cf" }, name: "value", typ: BigInt }], serialized_size: 21 }

And got every statement status in a batch like this:

let (value_idx, _) = query_result.get_column_spec("[applied]").expect("No value column found");
let mut row_index = 1;
for row in query_result.rows.expect("no rows found") {
    println!("row #{}: applied = {:?}", row_index, row.columns[value_idx]);
    row_index += 1;
}

output:

row #1: applied = Some(Boolean(true))

I can't say this is very convenient, and hardcoded "[applied]" column name seems not elegant, but it is working.

May be you can add one more example about how to work with batches? Full code:

use scylla::{Session, SessionBuilder};
use scylla::batch::Batch;
use scylla::transport::errors::QueryError;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create connection
    let uri = "127.0.0.1:9042".to_string();
    println!("Connecting to {} ...", uri);

    let session: Session = SessionBuilder::new()
        .known_node(uri)
        .user("cassandra", "cassandra")
        .schema_agreement_interval(std::time::Duration::from_secs(1)) // check every second for schema agreement if not agreed first check
        .build()
        .await?;

    let schema_version = session.await_schema_agreement().await?;

    println!("Schema version: {}", schema_version);

    session.query("CREATE KEYSPACE IF NOT EXISTS examples_ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}", &[]).await?;

    match session.await_schema_agreement().await {
        Ok(_schema_version) => println!("Schema is in agreement in time"),
        Err(QueryError::RequestTimeout(_)) => println!("Schema is NOT in agreement in time"),
        Err(err) => panic!("Query error {}", err),
    };

    session
        .query("DROP TABLE IF EXISTS examples_ks.cf", &[])
        .await?;

    session
        .query("CREATE TABLE IF NOT EXISTS examples_ks.cf (account_d INT, updated_at TIMESTAMP, value BIGINT, PRIMARY KEY ( (account_d) ))", &[])
        .await?;

    session.await_schema_agreement().await?;
    println!("OK1");
    // add row
    let mut batch: Batch = Default::default();
    batch.append_statement("UPDATE examples_ks.cf SET value = 1, updated_at = currentTimestamp() WHERE account_d = 20 IF updated_at = null AND value = null;");
    let prepared: Batch = session.prepare_batch(&batch).await?;
    let res = session.batch(&prepared, ((), (), )).await?;
    println!("OK2 {:?}", res);

    let (value_idx, _) = res.get_column_spec("[applied]").expect("No value column found");
    let mut row_index = 1;
    for row in res.rows.expect("no rows found") {
        println!("row #{}: applied = {:?}", row_index, row.columns[value_idx]);
        row_index += 1;
    }

    println!("OK3");
    Ok(())
}
Lorak-mmk commented 4 months ago

Did you try with more than one statement in batch? If memory serves me right there will always be one row in response, which indicates if the batch was applied or not. Your code looks like you assume there will be a row per a statement in batch. Regarding the docs / examples, this is a question about LWT, not batches. We can extend LWT docs, I see we have nothing about [applied] column in docs/source/queries/lwt.md. I'll leave this issue open so we don't forget to do this.

fortunado commented 4 months ago

Quick test with two statements in a batch:

  let mut batch: Batch = Default::default();
    batch.append_statement("UPDATE examples_ks.cf SET value = 1, updated_at = '2024-05-13 15:08:59.152' WHERE account_d = 20 IF updated_at = null AND value = null;");
    batch.append_statement("UPDATE examples_ks.cf SET value = 2, updated_at = '2024-05-13 16:08:59.152' WHERE account_d = 20 IF updated_at = '2024-05-13 15:08:59.152' AND value = 1;");
    let prepared: Batch = session.prepare_batch(&batch).await?;
    let res = session.batch(&prepared, ((), (), )).await?;
    println!("OK2 {:?}", res);

    let (value_idx, _) = res.get_column_spec("[applied]").expect("No value column found");
    let mut row_index = 1;
    for row in res.rows.expect("no rows found") {
        println!("row #{}: applied = {:?}", row_index, row.columns[value_idx]);
        row_index += 1;
    }

Output:

OK2 QueryResult { rows: Some([Row { columns: [Some(Boolean(false)), None, None, None] }, Row { columns: [Some(Boolean(false)), None, None, None] }]), warnings: [], tracing_id: None, paging_state: None, col_specs: [ColumnSpec { table_spec: TableSpec { ks_name: "examples_ks", table_name: "cf" }, name: "[applied]", typ: Boolean }, ColumnSpec { table_spec: TableSpec { ks_name: "examples_ks", table_name: "cf" }, name: "account_d", typ: Int }, ColumnSpec { table_spec: TableSpec { ks_name: "examples_ks", table_name: "cf" }, name: "updated_at", typ: Timestamp }, ColumnSpec { table_spec: TableSpec { ks_name: "examples_ks", table_name: "cf" }, name: "value", typ: BigInt }], serialized_size: 38 }
row #1: applied = Some(Boolean(false))
row #2: applied = Some(Boolean(false))

So, we have row per statement, but actually I expected only one row in query result because first statement has not been applied, and batch should be stopped because of all or nothing rule.

EDIT: or may be I'm wrong and second statement even didn't run because of first statement failure.. And for the whole batch status it is enough to check only first row applied state..

mykaul commented 4 months ago

@fortunado - I encourage you to ask questions over https://forum.scylladb.com/ (if you feel there's a bug in the driver, that is the right place though to discuss it)