Open AugustNagro opened 1 month ago
This MR #47 looks good @guizmaii . My thought is that we can merge this (once you add a simple test or two), and then iterate until it's ready for release.
Here's an idea I have, and would like your feedback.
One thing I like about Magnum is the typesafe transaction management. For example if you have
def makePayment(sum: Long)(using DbTx): Unit =
val newBalance = readAndUpdateSomeTables(sum)
sendInvoiceOverBlockingHttp(newBalance)
Then makePayment
is guaranteed to be executed in a transactional context; if sendInvoiceOverBlockingHttp
fails because the 3rd party service is offline, we know that the database changes will be rolled back.
It doesn't seem possible to get the same behavior with the ZIO transact
version
def transact[A](transactor: Transactor)(q: DbTx ?=> A)(implicit
trace: Trace
): ZIO[Any, Throwable, A] = ???
The example would look like:
def readAndUpdateSomeTables(sum: Long)(using DbTx): Long = ???
def sendInvoiceOverZIOHttp: RIO[Http, Unit] = ???
def makePayment(sum: Long): RIO[Http, Unit] =
for
newBalance <- transact(ds):
readAndUpdateSomeTables(sum)
_ <- sendInvoiceOverBlockingHttp(newBalance)
yield ()
And since sendInvoiceOverBlockingHttp
doesn't happen in the transactional scope it won't work.
So here's the idea: what if we put the DbTx
context in the ZIO R
type? Then the signature of transact
would look something like
def transact[A](transactor: Transactor)(zio: => RIO[DbTx, A])(using Trace): Task[A] = ???
And we could then implement ZIO-versions of the key classes like Frag, Query, etc. For example like
open class ZIORepo[EC, E, ID](using defaults: RepoDefaults[EC, E, ID]) extends ZIOImmutableRepo[E, ID]:
def insert(entityCreator: EC): RIO[DbCon, Unit] = ??? // wrap defaults.insert in ZIO
...
Users would import from com.augustnagro.magnum.zio.{...}
instead.
Hum, I'm sorry, I'm not sure to understand the problem @AugustNagro 🤔
The current zio.transact
does ensure that it is run in a transaction, thanks to the DbTx
parameter in q: DbTx ?=> A
, no? 🤔
Here's how we organise our code at work (nothing innovative, just typical hexagonal architecture AFAIK):
core
sbt module declaring services.
The app logic lives in these services.
In this core
module, we also declare the repositories and their methods we need in the services:// In file: modules/core/src/main/scala/my/app/core/domain/UserEntity.scala
package my.app.core.domain
final case class UserEntity(id: UUID, username: String)
// In file: modules/core/src/main/scala/my/app/core/services/MyService.scala
package my.app.core.services
import my.app.core.repositories.UserRepo
trait MyService {
def createNewUser(username: String): Task[UserEntity]
}
object MyService {
def live: ZLayer[UserRepo & NotificationService, Nothing, MyService] = ZLayer.derived[MyServiceLive]
}
final class MyServiceLive(userRepo: UserRepo, notificationService: NotificationService) extends MyService {
override def createNewUser(username: String): Task[UserEntity] =
for {
user <- userRepo.insert(username)
_ <- notificationService.notifyNewUser(user) // hypothetical notification service
} yield user
}
// In file: modules/core/src/main/scala/my/app/core/repositories/UserRepo.scala
package my.app.core.repositories
trait UserRepo {
def insert(username: String)
}
db
module implementing the core
repositories
// In file: modules/db/src/main/scala/my/app/db/UserRepoLive.scala
package my.app.db
import my.app.core.repositories.UserRepo import my.app.db.magnum.MagnumUserRepo
final class UserRepoLive(ds: DataSource, magnumUserRepo: MagnumUserRepo) extends UserRepo { override def insert(username: String): Task[UserEntity] = transact(ds) { tx ?=> magnumUserRepo.insertReturning(UserCreator(username)) }.map(UserDB.toCore) }
object UserRepoLive { def layer: ZLayer[DataSource & MagnumUserRepo, Nothing, UserRepo] = ... }
// In file: modules/db/src/main/scala/my/app/db/magnum/MagnumUserRepo.scala package my.app.db.magnum
import com.augustnagro.magnum.Repo
final class MagnumUserRepo extends Repo[UserCreator, UserDB, UUID] object MagnumUserRepo { def live: ULayer[MagnumUserRepo] = ZLayer.succeed(new MagnumUserRepo) }
So that we don't leak DB details outside of the repositories implementation.
I think I don't understand what you mean by:
> And since `sendInvoiceOverBlockingHttp` doesn't happen in the transactional scope it won't work.
Do you mean that people use a DB transaction to wrap something that sends an invoice? 🤔
and fail the transaction if the invoice sending fails? 🤔
I'm not sure I'd code this like this.
I'd have a retry mechanism in the `sendInvoiceOverBlockingHttp` mechanism, and it wouldn't be correlated to the DB transaction (I can be wrong of course) 🤔
> what if we put the DbTx context in the ZIO R type? Then the signature of transact would look something like
> def transact[A](transactor: Transactor)(zio: => RIO[DbTx, A])(using Trace): Task[A] = ???
I think it's equivalent to what I suggest but more complex and less performant.
I'm not sure to see why we'd need a `ZIORepo` since things already work with the current Repo mechanism 🤔
It'd probably makes things more complex to write when we have complex SQL operation to write.
The fact that Magnum Repo is impure makes things easier to write:
```scala
import scala.util.control.ControlThrowable
object AlreadyExistsException extends ControlThrowable
type AlreadyExistsException = AlreadyExistsException.type
// hypothetical complex process to insert a new user
// The code is dumb, it's just to try to give a complex process example
def inserUser(username: String): Task[UserEntity] =
transact(ds) { tx ?=>
sql"""SELECT pg_advisory_xact_lock(100)""".query[Any].run(): @nowarn("msg=unused value")
val exists = magnumUserRepo.existsByUsername(username)
if (exists) throw AlreadyExistsException
magnumUserRepo.insertReturning(UserCreator(username))
}
It feels like if we introduce a ZIORepo
, we'd need to wrap each of these operations into a ZIO
, which makes things more complex and less performant, no? 🤔
// same code but with `ZIORepo`
import scala.util.control.ControlThrowable
object AlreadyExistsException extends ControlThrowable
type AlreadyExistsException = AlreadyExistsException.type
// hypothetical complex process to insert a new user
// The code is dumb, it's just to try to give a complex process example
def inserUser(username: String): Task[UserEntity] =
transact(ds) {
for {
_ <- ZIO.attempt(sql"""SELECT pg_advisory_xact_lock(100)""".query[Any].run(): @nowarn("msg=unused value"))
exists <- zioMagnumUserRepo.existsByUsername(username)
_ <- if (exists) ZIO.fail(AlreadyExistsException) else ZIO.unit
user <- zioMagnumUserRepo.insertReturning(UserCreator(username))
yield user
}
Note that the ZIO.attempt
usage above is probably incorrect (🤔) as it doesn't wrap the query in ZIO[DbTx, Throwable, ?]
so it seems, we'd also need to provide something to wrap any DB stuff into a ZIO[DbTx, Throwable, ?]
(Note that I initially implemented that in my project but later realized I wasn't using it. I was just using transact
and connect
)
I am not a Magnum user. But only here because of @guizmaii posting on twitter 😁
I am quite sure, that
Is considered an anti pattern. Since an open connection is required for an open transaction and therefore blocks a connection from the connection pool for a long time. Exception might be you do a one DB per user SQLite setup.
So it should be
And in that case, you can aggregate all writes into one transact and you don't need to leak the DB types.
Do you mean that people use a DB transaction to wrap something that sends an invoice? 🤔
I wouldn't get hung up on the example having IO. It's a question of where the transactional boundaries of an application lie. If you're creating a database transaction for every instance like
class UserRepoLive(ds: DataSource) extends UserRepo:
override def updateBalance(user: Long, change: Long): Task[Unit] =
transact(ds) { tx ?=>
sql"UPDATE user SET balance = balance + $change".update.run()
}
That's too fine grained and WILL cause bugs. For example, if you have an endpoint that calls updateBalance
on two users you will get the double spend problem. I argue that transactions should NOT be created in Repositories; it is more common to create them at the Controller or Service level, and span multiple methods & services. Chances are, you'll want to interleave some methods that return ZIOs between the database calls. And that is impossible with your current MR.
In Spring the common pattern is that your REST controller calls a service method entrypoint which is annotated with @Transactional
, and then protects the whole callstack. The mechanism is not typesafe and the transaction boundary may be too wide, but it does protect you from bugs.
I'd have a retry mechanism in the sendInvoiceOverBlockingHttp mechanism, and it wouldn't be correlated to the DB transaction (I can be wrong of course) 🤔
What do you do when retries are exhausted, or your service goes down while retrying? It's a lot of work to handle these cases correctly.
A good solution is to design your services like @987Nabil says, saving your database updates until the very end. My proposal does not prevent this design; it's orthoganal.
Unfortunately, this pattern is often inconvenient or impossible, and the large pre-existing ZIO projects I've worked on did not follow it. It requires discipline and architectural foresight. I don't want to neglect the majority usecase and tell people to just refactor their application :laughing:
It feels like if we introduce a ZIORepo, we'd need to wrap each of these operations into a ZIO, which makes things more complex and less performant, no? 🤔
I'm confident the performance impact of wrapping the calls in a ZIO will be 0, since database IO dominates. In terms of complexity, the example would look like this (from a User-code perspective):
import com.augustnagro.magnum.zio.{ZioRepo, sql}
class UserRepo extends ZioRepo[UserCreator, UserEntity, Long]:
def existsByUserName(userName: String): RIO[DbCon, Boolean] =
sql"SELECT 1 FROM user WHERE user_name = $userName".query[Int].run().map(_.nonEmpty)
class UserService(userRepo: UserRepo):
def insertUser(username: String): RIO[DbTx, UserEntity] =
for
_ <- sql"""SELECT pg_advisory_xact_lock(100)""".query[Any].run()
exists <- userRepo.existsByUsername(username)
_ <- if (exists) ZIO.fail(AlreadyExistsException) else ZIO.unit
user <- userRepo.insertReturning(UserCreator(username))
yield user
class UserController(ds: DataSource, userService: UserService):
def postUser(userName: String): Task[UserEntity] =
transact(ds)(userService.insertUser(userName))
One final thing to consider is that ZIO docs validate this approach of putting database transactions in the ZIO Environment:
"In a database application, a service may be defined only to operate in the context of a larger database transaction. In such a case, the transaction could be stored in the environment"
While I understand your reasoning @AugustNagro I am not sure about the conclusion.
My counter proposal:
def transact[A](transactor: Transactor)(q: DbTx ?=> A)(implicit
trace: Trace
): ZIO[Any, Throwable, A] = ???
def transactZIO[R,E <: Throwable, A](transactor: Transactor)(q: DbTx ?=> ZIO[R,E,A])(implicit
trace: Trace
): ZIO[R, E, A] = ???
Then transactZIO can be used for the "bad code" style, but it is easier to encourage, for example in docs, the good non leaky style @guizmaii is aiming for.
btw, this
I'm confident the performance impact of wrapping the calls in a ZIO will be 0, since database IO dominates
is only true, if the db is accessed via network. There is a recent push for SQLite in prod, for example in the rails community, which is then disk and not network IO. So it might become significant.
Designing based on bad user code does not sound right
I think my earlier example having network IO has really muddied the waters, and I'm sorry about that.
Still, I don't think it's wrong to let users define their own transactional boundaries. That's what the current Magnum API lets you do already.
Your transactZIO
counter-proposal is good @987Nabil; the downside is that users would have have to wrap .run()
calls in ZIO.attempt
. I feel like going all-in with ZIO types will result in more beautiful code, and feel more 'ZIO-native', vs a direct style library with a clobbed on ZIO adapter.
However, I don't discount the suggestion and I definitely see your side to the argument.
I think the best thing to do will be try out both styles and see which one feels better.
Having the env resolved so often does not seem like a good idea. is only true, if the db is accessed via network. There is a recent push for SQLite in prod, for example in the rails community, which is then disk and not network IO. So it might become significant.
The performance argument here still doesn't sway me.. a few allocations or env lookups per tx is not going have an impact even if you're using in-memory H2. I mean, just look at Doobie.. people are not complaining that every method ends up returning ConnectionIO.
FYI: I filed https://github.com/zio/zio/issues/9276
Issue to track work done on the ZIO module. For example by @guizmaii in https://github.com/AugustNagro/magnum/pull/47