kurtbuilds / ormlite

An ORM in Rust for developers that love SQL.
https://crates.io/crates/ormlite
MIT License
216 stars 11 forks source link

Upsert Support #46

Closed cloud303-cholden closed 4 months ago

cloud303-cholden commented 5 months ago

I am looking to have upsert support for Postgres. I would especially be curious whether or not support could be added to update all columns on conflict (see the Column::iter() portion of the sea-query example below). My use case is that I have models with many columns, which means writing the upsert query by hand is pretty tedious, but all I care about is having the latest version of the data. I would love to contribute if you haven't already started working on upsert per the roadmap.

Example:

use table_a::Column;

table_a::Entity::insert(row)
    .on_conflict(
        sea_query::OnConflict::column(Column::Id)
            .update_columns(Column::iter())
            .to_owned(),
    )
    .exec(&db)
    .await
    .unwrap();
kurtbuilds commented 5 months ago

Yes. This isn't built in ormlite, but using the TableMeta methods it provides combined with sqlmo, the underlying query building lib, makes this quite easy.

I built a similar function for an ETL project, so I'll paste the primary function for that here. Please modify it, and if you have a suggestion or outright PR for how to include it in the library directly, that contribution is welcome.

It additionally adds support for, here what I called __sa_deleted_at and __sa_updated_at columns to store dleteed at and updated at times. Without that, the code can actually be somewhat simpler/shorter. I'll also leave the comments in.

Let me know if any of it needs further explanation or you have any additional questions.

use ormlite::model::TableMeta;
use ormlite::postgres::{PgArguments, PgConnection};
use ormlite::{query_as, query_with};
use sqlmo::query::OnConflict;
use sqlmo::ToSql;
use std::future::IntoFuture;
use std::time::Duration;
use tracing::{error_span, warn, Instrument};

// This would be a cool optimization to do for the SQL insertion in this code:
// https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit
fn build_insert_sql<T: TableMeta>(schema_name: &str) -> String {
    use sqlmo::query::Expr;
    let dialect = sqlmo::Dialect::Postgres;
    let update_conditional: Vec<Expr> = T::table_columns()
        .into_iter()
        .map(|&c| {
            let left = Expr::table_column(T::table_name(), c);
            let right = Expr::table_column("excluded", c);
            Expr::not_distinct_from(left, right)
        })
        .collect();
    let on_conflict_update_value = Expr::case(
        sqlmo::query::Case::new_when(
            Expr::new_and(update_conditional),
            Expr::table_column(T::table_name(), "__sa_updated_at"),
        )
        .els("excluded.__sa_updated_at"),
    );

    // It's easier to read as SQL, so here's an example. Basically this compiles down to the following:
    // ```sql
    // INSERT INTO "users" ("id", "name", "email", "__sa_updated_at") VALUES
    // (1, Kurt, test@example.com, NOW())
    // ON CONFLICT ("id") DO UPDATE SET
    // "name" = EXCLUDED."name",
    // "email" = EXCLUDED."email",
    // "__sa_updated_at" = CASE WHEN
    // ("users"."id" IS NOT DISTINCT FROM "excluded"."id" AND
    // "users"."name" IS NOT DISTINCT FROM "excluded"."name" AND
    // "users"."email" IS NOT DISTINCT FROM "excluded"."email")
    // THEN "users"."__sa_updated_at"
    // ELSE excluded.__sa_updated_at END
    // ```
    sqlmo::Insert::new(T::table_name())
        .schema(schema_name)
        .columns(T::table_columns())
        .column("__sa_updated_at")
        .column("__sa_deleted_at")
        .values(sqlmo::query::Values::new_value(
            sqlmo::query::Value::new()
                .placeholders(T::table_columns().len(), dialect)
                .column("NOW()")
                .column("NULL"),
        ))
        .on_conflict(
            OnConflict::do_update_on_pkey(T::primary_key().unwrap())
                .alternate_value("__sa_updated_at", on_conflict_update_value),
        )
        .to_sql(dialect)
}

fn build_delete_sql<T: TableMeta>(schema_name: &str) -> String {
    format!(
        r#"UPDATE "{}"."{}"
    SET __sa_deleted_at = NOW()
    WHERE NOT id = ANY($1)
    AND __sa_deleted_at IS NOT NULL
    RETURNING id
    "#,
        schema_name,
        T::table_name()
    )
}
cloud303-cholden commented 4 months ago

This is great. Thanks for the detailed example. If I'm not mistaken, what I'm after is something like OnConflict::update_values(T::table_columns())? And would you be opposed to a PR adding upsert behind the postgres feature? Definitely not ideal, but that's what I'm most familiar/concerned with at this time.

kurtbuilds commented 4 months ago

That all sounds great. PR is welcome.

You might want to double check if using that method would update the primary key. It should be a no-op since pkey should define the conflict, but I don’t know if that’s the only scenario that can cause a conflict, so it feels smarter to err toward what’s safer and more semantically correct.

Agree it’s less than ideal to add a method gated on a feature, but it’s almost always auto generated anyway, and adding a trait only for Postgres feels highly over engineered.

It’s probably important to have a docstring that emphasize the SQL it compiles into, and the fact that it’s only supported in Postgres. Presumably if multiple features are turned on, those implementations will just panic with unimplemented.

Happy to answer any questions you encounter if you tackle this.

cloud303-cholden commented 4 months ago

I've been doing some experimentation, and it appears upsert is already supported via a re-export of OnConflict from sqlmo, and on_conflict() already exists from Insertion. See the example below. Maybe this issue can be closed?

use ormlite::{
    model::*,
    query_builder::OnConflict,
};
use sqlx::postgres::PgPoolOptions;

#[derive(Debug, Default, Model)]
pub struct Users {
    #[ormlite(primary_key)]
    pub id:   String,
    pub name: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let database_url = std::env::var("DATABASE_URL")
        .expect("`DATABASE_URL` must be set");
    let query_pool = PgPoolOptions::new()
        .max_connections(5)
        .connect(&database_url)
        .await?;

    Users {
        id: String::from("user_1"),
        ..default::Default()
    }
        .insert(&db)
        .on_conflict(OnConflict::do_update_on_pkey("id"))
        .await
        .unwrap();

    Ok(())
}
cloud303-cholden commented 4 months ago

And if you agree, I would be happy to add some documentation for this functionality.

kurtbuilds commented 4 months ago

Was just going to say! I'd welcome it if you added a section to the README docs for other folks who encounter the same issue. Sorry I forgot about OnConflict::do_update_on_pkey.