zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
341 stars 137 forks source link

BindExceptions when testing multi-module projects using ZIO Kafka Testkit #987

Open mattdavpir opened 1 year ago

mattdavpir commented 1 year ago

I've got a project using the ZIO Kafka Testkit that is sometimes failing with BindExceptions when starting the embedded Kafka from the testkit. I think that this is because the tests are running concurrently and each module's tests have their own refs that are being incremented independently, leading to them both trying to bind the same ports.

Would it be possible to add something to handle ports not being available when starting the embedded Kafka (maybe even just incrementing the ports and trying again)?

guizmaii commented 1 year ago

Would it be possible to add something to handle ports not being available when starting the embedded Kafka (maybe even just incrementing the ports and trying again)?

There's already such a mechanism implemented but it might not work perfectly Are you using the ZIOSpecWithKafka trait? (https://zio.dev/zio-kafka/writing-tests#ziospecwithkafka-trait)

mattdavpir commented 1 year ago

Hey, thanks for the response!

I am using the ZIOSpecWithKafka trait, I had a brief poke around and did see that it was incrementing a Ref but I think multiple modules running tests concurrently could still conflict with each other, e.g. they both create a Ref(Ports(6001, 7001)), both do a getAndUpdate to set the next value to Ports(6002, 7002), but when the second tries to start up on 6001 and 7001 neither port will be free. Similarly I think if I had a separate process constantly bound to 6001 and 7001 I think that the first embedded startup would fail, whereas I guess ideally (for me, at least) it would then automatically try again with different ports.

Very possible I'm missing something obvious here, though!

guizmaii commented 1 year ago

I don't remember the code (which I implemented several months ago 😅) Are we catching the BindExceptions and retrying with the next possible ports?

erikvanoosten commented 1 year ago

@mattdavpir Yes, you are absolutely right. This is definitely a limitation of how the ports are allocated.

Usually you run a single embedded kafka for all tests. Is that an option for you as well?

erikvanoosten commented 1 year ago

Are we catching the BindExceptions and retrying with the next possible ports?

No we don't. That would be a solution though.

guizmaii commented 1 year ago

The fix is kinda trivial: You catch the BindException and retry with the next port. @mattdavpir Do you feel like giving it a try? 🙂

mattdavpir commented 1 year ago

Usually you run a single embedded kafka for all tests. Is that an option for you as well

I've got a couple of tests split across a couple of different sbt modules so I don't think so, but for now I can just explicitly disable the parallel execution of the tests to avoid the conflict.

Do you feel like giving it a try? 🙂

Happy to do some when I get around to my next "personal dev time" slot next week, but I've only just started tinkering around with ZIO so I don't know how idiomatic it would be (but hopefully it's simple enough not to have too much scope to go wrong).

erikvanoosten commented 1 year ago

Something like this in zio-kafka-testkit/src/main/scala/zio/kafka/testkit/Kafka.scala maybe:

  private def embeddedWithBrokerProps(
    presetProps: Ports => Map[String, String],
    customProps: Ports => Map[String, String]
  ): ZLayer[Any, Throwable, Kafka] =
    ZLayer.scoped {
      def makeEmbeddedKafka(retriesLeft: Int): ZIO[Scope, EmbeddedKafkaStartException, EmbeddedKafkaService] = {
        for {
          ports <- nextPorts
          brokerProps = presetProps(ports) ++ customProps(ports) // custom is after to allow overriding
          embeddedKafkaConfig = EmbeddedKafkaConfig(
            ports.kafkaPort,
            ports.zookeeperPort,
            brokerProps
          )
          kafka <- ZIO.acquireRelease(
            ZIO
              .attemptBlocking(EmbeddedKafkaService(EmbeddedKafka.start()(embeddedKafkaConfig)))
              .catchSome {
                case _: BindException if retriesLeft > 0 => makeEmbeddedKafka(retriesLeft - 1)
              }
              .catchNonFatalOrDie { e =>
                ZIO.fail(EmbeddedKafkaStartException("Failed to start embedded Kafka", e))
              }
          )(_.stop())
        } yield kafka
      }

      makeEmbeddedKafka(3)
    }

Not tested!

mishto commented 2 months ago

@guizmaii I am encountering this issue as well. Do you mind if I give it a try?