Closed kubukoz closed 3 months ago
Yeah, I did want to add that, but didn't really know how to be honest.
The streaming insert use the COPY API from postgres for best performance. That is not in itself compatible with doing upserts unfortunately - I was already impressed when I managed to talk it into handling default values properly.
So I googled a bit now, and it seems like the solution is to create a temp table, stream the data in there, and then do a merge from that temp table. I thought this would be easy to implement, and it kind of is. the only problem is that it doesn't seem to work to put more than one SQL statement into the COPY operation. As such, it would be three roundtrips to the database:
It's not too bad I think for the functionality you get, but a little surprising. WDYT?
override def upsertStreaming(unsaved: ZStream[ZConnection, Throwable, UnitmeasureRow], batchSize: Int = 10000): ZIO[ZConnection, Throwable, Long] = {
val created = sql"create temporary table unitmeasure_TEMP (like production.unitmeasure) on commit drop".execute
val copied = streamingInsert(s"""copy unitmeasure_TEMP("unitmeasurecode", "name", "modifieddate") from stdin""", batchSize, unsaved)(UnitmeasureRow.text)
val merged = sql"""insert into production.unitmeasure("unitmeasurecode", "name", "modifieddate")
select * from unitmeasure_TEMP
on conflict ("unitmeasurecode")
do update set
"name" = EXCLUDED."name",
"modifieddate" = EXCLUDED."modifieddate"
;
drop table unitmeasure_TEMP;
""".update
created *> copied *> merged
}
quick question though, what do you think about non-streaming variants? Without COPY
, just plain insert into x values (f1...), (f2...)
? That would work with both insert and upsert.
(As an additional option, not a replacement)
Yeah, we could of course add non-streaming variants. In fact it was there before, but I took it out when I added streaming inserts.
There is a tradeoff with quantity of generated code versus how useful it is. For plain inserts I don't think a non-streaming insert has much benefit at all (except that you can return other things than just number of inserted rows).
While for upserts I agree that it looks more useful. it'll have worse performance, but be safe with auto-commit sessions.
As a note, the syntax you suggested doesn't work that well. You get rows*cols
number of parameters to the prepared statement, and different numbers of rows will generate different prepared queries, which breaks prepared query cache in pg.
A better way is to use batch inserts.
I implemented it quickly for doobie, this is what it looks like:
override def upsertMany(unsaved: List[UnitmeasureRow]): Stream[ConnectionIO, UnitmeasureRow] = {
Update[UnitmeasureRow](
s"""insert into production.unitmeasure("unitmeasurecode", "name", "modifieddate")
values (?::bpchar,?::varchar,?::timestamp)
on conflict ("unitmeasurecode")
do update set
"name" = EXCLUDED."name",
"modifieddate" = EXCLUDED."modifieddate"
returning "unitmeasurecode", "name", "modifieddate"::text"""
)(using UnitmeasureRow.write)
.updateManyWithGeneratedKeys[UnitmeasureRow]("unitmeasurecode", "name", "modifieddate")(unsaved)(using catsStdInstancesForList, UnitmeasureRow.read)
}
and we'll need this implicit instance:
implicit lazy val write: Write[UnitmeasureRow] = new Write[UnitmeasureRow](
puts = List((UnitmeasureId.put, Nullability.NoNulls),
(Name.put, Nullability.NoNulls),
(TypoLocalDateTime.put, Nullability.NoNulls)),
toList = x => List(x.unitmeasurecode, x.name, x.modifieddate),
unsafeSet = (rs, i, a) => {
UnitmeasureId.put.unsafeSetNonNullable(rs, i + 0, a.unitmeasurecode)
Name.put.unsafeSetNonNullable(rs, i + 1, a.name)
TypoLocalDateTime.put.unsafeSetNonNullable(rs, i + 2, a.modifieddate)
},
unsafeUpdate = (ps, i, a) => {
UnitmeasureId.put.unsafeUpdateNonNullable(ps, i + 0, a.unitmeasurecode)
Name.put.unsafeUpdateNonNullable(ps, i + 1, a.name)
TypoLocalDateTime.put.unsafeUpdateNonNullable(ps, i + 2, a.modifieddate)
}
)
As a note, the syntax you suggested doesn't work that well. You get rows*cols number of parameters to the prepared statement, and different numbers of rows will generate different prepared queries, which breaks prepared query cache in pg.
TIL! thank you, that's good to know.
Should be self-explanatory - there's an
insert
and aninsertStreaming
, but only oneupsert
.So far, pretty impressed with the library! Trying it out here as a possible replacement for my skunk: https://github.com/kubukoz/scala.beauty/pull/2