zio / zio-amqp

ZIO-based AMQP client for Scala
Apache License 2.0
34 stars 20 forks source link

[question] How to maintain a global connection? #423

Open sxh-lsc opened 8 months ago

sxh-lsc commented 8 months ago

I’m using the ZIO-HTTP framework, and I found that the ZIO-AMQP Connection must be in scope to be effective. Suppose I want a unique global connection after the program starts, and then use this connection to create and destroy some channels in some scope. What should I do?

Currently, I’m using ZLayer.scopedto get a ZLayer[Any, Throwable, Connection], but this will create a new connection every time I provide this layer.

I may still be a bit confused about the use of scope and layer, so is there any suggestion for the scenario I presented?

Adriani277 commented 6 months ago

Apologies for the late reply. It seems like notification was turned off for me. Are you able to provide a simple example of what you currently have?

From my understanding here, you want to create a single connection and reuse it across your code. This should be as simple as providing the connection layer from your main or highest possible level in your code.

Zlayer allows you to provide dependencies to your logic using the ZIO env variable, the scoped variant makes it so that once the provided layer is out of scope, it will terminate said dependency

Adriani277 commented 6 months ago

It seems like the issue you have is that you are providing the rabbit layer multiple times where you should do it only once

sxh-lsc commented 6 months ago

Tku for your reply. Currently, my code is like below: I have a connLayer like:

val connLayer: ZLayer[Any, Throwable, Connection] = ZLayer.scoped {
    for {
      config <- ZIO.config(RabbitMqConfig.config)

      amqpConfig <- ZIO.attempt(
        new AMQPConfig(
          user = config.user,
          password = config.password,
          vhost = config.vhost,
          heartbeatInterval = AMQPConfig.default.heartbeatInterval,
          ssl = AMQPConfig.default.ssl,
          host = config.host,
          port = config.port,
          connectionTimeout = 60.seconds
        )
      )
      connection <- Amqp.connect(amqpConfig)
    } yield connection
  }

I have a consume fiber like :

  def consume: ZIO[Any, Throwable, Unit] = ZIO.scoped {
    for {
      channel <- AmqpService.getChannel
      config <- ZIO.config(RabbitMqConfig.config)
      _ <- channel.basicQos(1, global = false)
      _ <- channel
        .consume(queue = QueueName(config.importWave3Queue), consumerTag = ConsumerTag(consumerTag))
        .retry(Schedule.fibonacci(1.second) && Schedule.recurs(10))
        .mapZIO(record => doConsume(record, channel))
        .runDrain
    } yield ()
  }.provide(
    Scope.default,
    AmqpConnection.connLayer
  )

And when I start my app, I will do

    val initJob = for {
      _ <- xxxx.consume.retry(Schedule.forever).forkDaemon
     ....
    } yield ()

I think I make a mistake about the connection scope, so it will connect every time when getting the messages