oyvindberg / typo

Typed Postgresql integration for Scala. Hopes to avoid typos
https://oyvindberg.github.io/typo/
MIT License
101 stars 11 forks source link

Streaming inserts #65

Closed oyvindberg closed 11 months ago

oyvindberg commented 12 months ago

We now get the following methods in the repos:

anorm

  def insertStreaming(unsaved: Iterator[PersonRow], batchSize: Int)(implicit c: Connection): Long
  /* NOTE: this functionality requires PostgreSQL 16 or later! */
  def insertUnsavedStreaming(unsaved: Iterator[PersonRowUnsaved], batchSize: Int)(implicit c: Connection): Long

doobie

  def insertStreaming(unsaved: Stream[ConnectionIO, PersonRow], batchSize: Int): ConnectionIO[Long]
  /* NOTE: this functionality requires PostgreSQL 16 or later! */
  def insertUnsavedStreaming(unsaved: Stream[ConnectionIO, PersonRowUnsaved], batchSize: Int): ConnectionIO[Long]

zio

  def insertStreaming(unsaved: ZStream[ZConnection, Throwable, PersonRow], batchSize: Int): ZIO[ZConnection, Throwable, Long]
  /* NOTE: this functionality requires PostgreSQL 16 or later! */
  def insertUnsavedStreaming(unsaved: ZStream[ZConnection, Throwable, PersonRowUnsaved], batchSize: Int): ZIO[ZConnection, Throwable, Long]

Also get Text instances for all three libraries:

  implicit lazy val text: Text[PersonRow] = Text.instance[PersonRow]{ (row, sb) =>
    Text.longInstance.unsafeEncode(row.one, sb)
    sb.append(Text.DELIMETER)
    Text.option(Text.stringInstance).unsafeEncode(row.two, sb)
    sb.append(Text.DELIMETER)
    Text.option(Text.stringInstance).unsafeEncode(row.name, sb)
  }

The Text type class is used as-is from doobie, while typo will generate the type class itself for anorm and zio since they don't have this functionality

oyvindberg commented 12 months ago

@sbrunk it actually works for doobie now!

todo:

sbrunk commented 12 months ago

Fantastic. Hopefully I'll be able to give it a run tomorrow on my project COPYing lots of vectors into my test DB.

As for for testing different PG versions. Perhaps we could use a matrix to run and gen code against different versions. Perhaps also put the docker build job into that matrix with caching and base-image as parameter depending on the PG version.. Then depend on it for running the tests + PG container.

oyvindberg commented 12 months ago

Fantastic. Hopefully I'll be able to give it a run tomorrow on my project COPYing lots of vectors into my test DB.

let me know if you come across any problems! :)

As for for testing different PG versions. Perhaps we could use a matrix to run and gen code against different versions. Perhaps also put the docker build job into that matrix with caching and base-image as parameter depending on the PG version.. Then depend on it for running the tests + PG container.

I think it's a great idea! The only complications I see is that:

1) we'll need to produce two docker images 2) the adventureworks test database currently generates code for the full schema of a given postgres release as well. A test reads rows for all of them as a quick and dirty integration test. This will not work across versions. as such we need to do something. I think either disable the test for the other version of postgres, or maybe delete it altogether. It's likely better to extend what is currently ArrayTest to cover everything we support. 3) need to figure out the lowest Pg release which pass all the typo tests. 4) In any case we'll need to discover if we're running PG < 16 for the streaming insert test

do you have any bandwidth towards any of creating a new docker image, setting up the matrix, maybe thread through PG version as an environment variable @sbrunk ?

sbrunk commented 12 months ago

do you have any bandwidth towards any of creating a new docker image, setting up the matrix, maybe thread through PG version as an environment variable @sbrunk ?

I'll give it a try @oyvindberg

sbrunk commented 11 months ago

Didn't get to try it yesterday, but I'm on it now. With the latest changes the generated insertStreaming is taking a stream now, i.e.

insertStreaming(unsaved: Stream[ConnectionIO, MyRow], batchSize: Int) = ...

But I'm struggling to create a Stream[ConnectionIO, MyRow] from a generic Stream[IO, MyRow]. Any idea how this can be done?

sbrunk commented 11 months ago

My workaround for now is to create a new pure stream from each chunk so I can insert as before:

// ...
.parEvalMapUnordered(8) { chunk =>
  CleanedContentRepoImpl()
    .insertUnsavedStreaming(Stream.chunk(chunk), 512)
    .transact(xa)
}
oyvindberg commented 11 months ago

You can translate the stream using WeakAsync. I can hack out some code tomorrow for the docs. This is probably the most difficult part of doobie

sbrunk commented 11 months ago

That's be great, thanks! I think I was on the right track as I actually found these SO answers about translate and WeakAsync but gave up early when I didn't get it to work immediately 😁

oyvindberg commented 11 months ago

here you go @sbrunk dbf78116