typelevel / doobie

Functional JDBC layer for Scala.
MIT License
2.17k stars 357 forks source link

Transaction only commited after stream is drained #2132

Open wb14123 opened 1 week ago

wb14123 commented 1 week ago

When using doobie with fs2 stream and transaction, it only commits the transaction after the stream is drained, including any downstream operations.

For example:

val q = quote {
   ...
}
stream(q).transact(xa).map { _ =>
  doThings() // some time consuming operation
}

The transaction is only committed (and return connection to the pool) after doThings are finished.

Is it expected behavior?

jatcwang commented 1 week ago

What version of doobie are you using? I believe that issue should've been fixed for a while now looking at the history. https://github.com/typelevel/doobie/commit/c6daae8e22a0da43de861b2e2c619e7897350be3

Can you try to replicate the issue for me with 1.0.0-RC5?

wb14123 commented 1 week ago

I'm using 1.0.0-RC4, which should already included the commit. Tried 1.0.0-RC5 but still the same. I'm using fs2 stream 3.9.3 btw.

jatcwang commented 6 days ago

Oh that's no good. Can you provide a minimal reproduction for this issue?

wb14123 commented 6 days ago

Sure, I created a unit test here. Just run sbt "project hikari" "test:testOnly *PGConcurrentSuite" on my branch: https://github.com/wb14123/doobie/tree/stream-leak .

Output:

doobie.postgres.PGConcurrentSuite:                                                            
==> X doobie.postgres.PGConcurrentSuite.Connection returned before stream is drained  3.803s java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request ti
med out after 2001ms (total=1, active=1, idle=0, waiting=0)            
    at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:686)     
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:179)                                                                                                                  
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:144)                                                                                                                  
    at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:127)            
    at doobie.util.transactor$Transactor$FromDataSourceUnapplied.$anonfun$apply$14(transactor.scala:333)        
    at delay @ doobie.util.transactor$Transactor$FromDataSourceUnapplied.$anonfun$apply$13(transactor.scala:333)                                                                             
    at fromAutoCloseable @ doobie.util.transactor$Transactor$FromDataSourceUnapplied.$anonfun$apply$13(transactor.scala:333)                                                                 
    at fromAutoCloseable @ doobie.util.transactor$Transactor$FromDataSourceUnapplied.$anonfun$apply$13(transactor.scala:333)                                                                 
    at use @ fs2.Compiler$Target.compile(Compiler.scala:158)                   
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)       
    at handleErrorWith @ fs2.Compiler$Target.handleErrorWith(Compiler.scala:161)         
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at get @ fs2.internal.Scope.openScope(Scope.scala:275)                            
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)                              
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)                     
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)                                                                                                                             
    at flatMap @ fs2.Pull$.goEval$1(Pull.scala:1089)                                                                                                                                         
    at get @ fs2.internal.Scope.openScope(Scope.scala:275)                                                                                                                                   
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)                              
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)                              
[error] Failed: Total 1, Failed 1, Errors 0, Passed 0                                         
[error] Failed tests:                                                                         
[error]         doobie.postgres.PGConcurrentSuite                                                                                                                                            
[error] (Test / testOnly) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 18 s, completed Nov 11, 2024, 7:37:38 PM  
wb14123 commented 5 days ago

I'm not familiar about how scope works in fs2 stream. Is it supposed to close the resource early? Since in my test:

val res1 = Resource.make(IO(println("res1 acquire")))(_ => IO(println("res1 close")))
s2.Stream.resource(res1)
  .scope
  .evalMap{_ => IO(println("start sleep")) >> IO.sleep(10.seconds) >> IO(println("sleep finished"))}
  .compile.drain.unsafeRunSync()

The output is:

 res1 acquire
start sleep
sleep finished
res1 close

So the resource is waiting for downstream operations to finish before release?

jatcwang commented 5 days ago

Thanks for the minimized example. That is a bit surprising indeed.

Stream close their resource when it's exhausted so one way I know to get the behaviour you want is.

    val res1 = Resource.make(IO(println("res1 acquire")))(_ => IO(println("res1 close")))
    (
      Stream.resource(res1) ++
      Stream.eval(IO(println("start sleep")) >> IO.sleep(1.seconds) >> IO(println("sleep finished")))
    )
     .compile.drain
res1 acquire
res1 close
start sleep
sleep finished

https://scastie.scala-lang.org/gnqt4MwzRTGssAAkQJ3KCg

But that's the extent of what I remember digging deeper (fs2 docs doesn't seem to offer any guidance on this). This definitely needs fixing on doobie's end if possible.

wb14123 commented 5 days ago

Yeah my current work around is to just convert the stream into a list. But it essentially makes stream query useless. My use cases are mostly to send the result stream to client in a web app. Something like this:

val resultStream = ... //
resultStream.evalMap(elem => sendToClient(elem))....

I thought this should help performance since it's like a data pipe: the send to client is started before all the results from db is fetched. But if it's blocking on sendToClient, that means a slow network connection on client side will block the transaction from commit and return to the pool, which is very bad. (The sendToClient is not always explicit. For example, I'm using fs2-grpc and http4s which can give a stream as response. I guess it's doing something similar internally as the example above).

I think lots of people uses doobie like this especially with Typelevel stack like fs2-grpc and http4s. Hopefully it can be fixed so I don't need to replace all the streams in my code to an IO list. Do you think it's something need to be reported to fs2 library?

jatcwang commented 5 days ago

I agree. I don't think there's a bug in fs2 but it'd be good to have some documentation on resource scopes.

Trawling through the fs2 chat history (e.g. https://discord.com/channels/632277896739946517/632310980449402880/966754369930420294) may help understanding what's going on and what needs to be done to fix it.

wb14123 commented 5 days ago

Yeah I think on fs2's side, it's reasonable to close the resource only after the stream operations are done, because it doesn't know if the resource is still needed in downstream or not.

I think what can be done is introduce a buffer in between. Something like this:

resultStream -> bufferStream -> downstreamOps

If the downstreamOps is slower than resultStream, then the buffer is filled to drain resultStream. Once the resultStream is done, the transaction is committed and connection is returned, while downstream can still slowly get results from buffer.

Not really sure how to implement it with fs2 yet but should be something possible.

wb14123 commented 4 days ago

@jatcwang I created a prototype for this approach in https://github.com/typelevel/doobie/pull/2137. I created a new transBuffer method to accept a buffer size param. Do you think it's good to make this approach as default (change the existing behavior instead of create a new method) ? The buffer size can be moved to a global config somewhere.

jatcwang commented 4 days ago

Thanks for the PR! Yes thinking about it I agree that's the only logical way this can work, due to fs2 Stream being pull-based. I do wonder whether fs2 has an operator for this kind of "buffering". I see various buffer* methods but I'm not sure if they run the finalizer of the stream eager when it is exhausted.

wb14123 commented 3 days ago

From the doc and example here, seems the buffer method is waiting N elements before output the elements to downstream. This behavior is not what we want here.

jatcwang commented 3 days ago

There is groupWithin(..).unchunks but we'll need to test that it has the desired behaviour.

wb14123 commented 2 days ago

I think groupWithin is basically the same as buffer. They are blocking the emitting before N elements are put into the buffer. Here we don't want the buffer to block the emitting, but to store elements when downstream operations are slow for query result to drain.

wb14123 commented 1 day ago

@jatcwang let me know when you think this is the right approach and where to put the buffer size configuration. I can refine the PR to make it the default stream method.

jatcwang commented 1 day ago

@wb14123 I think the idea is sound thanks! I'm checking with the fs2 folks too in case we're reinventing the wheel or there's a better way :) https://discord.com/channels/632277896739946517/632310980449402880/1307638686468407319

jatcwang commented 23 hours ago

@wb14123 Seems like prefetchN implements the same logic we have here. In doobie's current stream implementation we emit a chunk per fetch from JDBC (determined by fetchSize).

Assuming prefetchN does exactly what we want here, I think we can..

wb14123 commented 17 hours ago

Looking at the code, yeah it seems implement basically the same logic. That's cool!

Modify transact to use prefetchN(1)

Yeah this default behavior makes sense to me.

Add transactNoPrefetch with the old transact logic Add transactPrefetchN?

I think these two are a little bit redundant. It's kind of unclear what n means in prefetchN, so maybe just provide transactNoPrefetch so the user can do their own prefetchN if wanted?

wb14123 commented 16 hours ago

I updated the PR. Maybe we can continue the discussion there.