oyvindberg / typo

Typed Postgresql integration for Scala. Hopes to avoid typos
https://oyvindberg.github.io/typo/
MIT License
99 stars 9 forks source link

Add `upsertStreaming` and `upsertBatch` (fix #115) #117

Closed oyvindberg closed 1 month ago

oyvindberg commented 2 months ago

Let's give it a shot, looks very useful. example of generated code:

anorm

  override def upsertBatch(unsaved: Iterable[UnitmeasureRow])(implicit c: Connection): List[UnitmeasureRow] = {
    def toNamedParameter(row: UnitmeasureRow): List[NamedParameter] = List(
      NamedParameter("unitmeasurecode", ParameterValue(row.unitmeasurecode, null, UnitmeasureId.toStatement)),
      NamedParameter("name", ParameterValue(row.name, null, Name.toStatement)),
      NamedParameter("modifieddate", ParameterValue(row.modifieddate, null, TypoLocalDateTime.toStatement))
    )
    unsaved.toList match {
      case Nil => Nil
      case head :: rest =>
        new anorm.adventureworks.ExecuteReturningSyntax.Ops(
          BatchSql(
            s"""insert into production.unitmeasure("unitmeasurecode", "name", "modifieddate")
                values ({unitmeasurecode}::bpchar, {name}::varchar, {modifieddate}::timestamp)
                on conflict ("unitmeasurecode")
                do update set
                  "name" = EXCLUDED."name",
                  "modifieddate" = EXCLUDED."modifieddate"
                returning "unitmeasurecode", "name", "modifieddate"::text
             """,
            toNamedParameter(head),
            rest.map(toNamedParameter)*
          )
        ).executeReturning(UnitmeasureRow.rowParser(1).*)
    }
  }
  /* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
  override def upsertStreaming(unsaved: Iterator[UnitmeasureRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
    SQL"create temporary table unitmeasure_TEMP (like production.unitmeasure) on commit drop".execute(): @nowarn
    streamingInsert(s"""copy unitmeasure_TEMP("unitmeasurecode", "name", "modifieddate") from stdin""", batchSize, unsaved)(UnitmeasureRow.text, c): @nowarn
    SQL"""insert into production.unitmeasure("unitmeasurecode", "name", "modifieddate")
          select * from unitmeasure_TEMP
          on conflict ("unitmeasurecode")
          do update set
            "name" = EXCLUDED."name",
            "modifieddate" = EXCLUDED."modifieddate"
          ;
          drop table unitmeasure_TEMP;""".executeUpdate()
  }

doobie

  override def upsertBatch(unsaved: List[UnitmeasureRow]): Stream[ConnectionIO, UnitmeasureRow] = {
    Update[UnitmeasureRow](
      s"""insert into production.unitmeasure("unitmeasurecode", "name", "modifieddate")
          values (?::bpchar,?::varchar,?::timestamp)
          on conflict ("unitmeasurecode")
          do update set
            "name" = EXCLUDED."name",
            "modifieddate" = EXCLUDED."modifieddate"
          returning "unitmeasurecode", "name", "modifieddate"::text"""
    )(using UnitmeasureRow.write)
    .updateManyWithGeneratedKeys[UnitmeasureRow]("unitmeasurecode", "name", "modifieddate")(unsaved)(using catsStdInstancesForList, UnitmeasureRow.read)
  }
  /* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
  override def upsertStreaming(unsaved: Stream[ConnectionIO, UnitmeasureRow], batchSize: Int = 10000): ConnectionIO[Int] = {
    for {
      _ <- sql"create temporary table unitmeasure_TEMP (like production.unitmeasure) on commit drop".update.run
      _ <- new FragmentOps(sql"""copy unitmeasure_TEMP("unitmeasurecode", "name", "modifieddate") from stdin""").copyIn(unsaved, batchSize)(using UnitmeasureRow.text)
      res <- sql"""insert into production.unitmeasure("unitmeasurecode", "name", "modifieddate")
                   select * from unitmeasure_TEMP
                   on conflict ("unitmeasurecode")
                   do update set
                     "name" = EXCLUDED."name",
                     "modifieddate" = EXCLUDED."modifieddate"
                   ;
                   drop table unitmeasure_TEMP;""".update.run
    } yield res
  }

zio-jdbc

  // Not implementable for zio-jdbc: upsertBatch

  /* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
  override def upsertStreaming(unsaved: ZStream[ZConnection, Throwable, UnitmeasureRow], batchSize: Int = 10000): ZIO[ZConnection, Throwable, Long] = {
    val created = sql"create temporary table unitmeasure_TEMP (like production.unitmeasure) on commit drop".execute
    val copied = streamingInsert(s"""copy unitmeasure_TEMP("unitmeasurecode", "name", "modifieddate") from stdin""", batchSize, unsaved)(UnitmeasureRow.text)
    val merged = sql"""insert into production.unitmeasure("unitmeasurecode", "name", "modifieddate")
                       select * from unitmeasure_TEMP
                       on conflict ("unitmeasurecode")
                       do update set
                         "name" = EXCLUDED."name",
                         "modifieddate" = EXCLUDED."modifieddate"
                       ;
                       drop table unitmeasure_TEMP;""".update
    created *> copied *> merged
  }