zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
337 stars 138 forks source link

Unable to produce with zio 2.1.8 (or 2.1.9) #1333

Closed gaeljw closed 1 week ago

gaeljw commented 2 weeks ago

(This might be an issue with ZIO core itself but I'm opening it here as it's visible through zio-kafka in my case)

When upgrading from zio 2.1.7 to 2.1.8 (or 2.1.9), the following code stops working.

Code

import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.clients.producer.RecordMetadata
import org.scalatest.matchers.must.Matchers
import org.scalatest.wordspec.AnyWordSpec
import zio.Unsafe.unsafe
import zio.kafka.producer.{Producer, ProducerSettings}
import zio.kafka.serde.Serde
import zio.{ZIO, ZLayer}

import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}

class MyKafkaProducer(kafkaServers: Seq[String]) {

  private val producer: ZLayer[Any, Throwable, Producer] = ZLayer.scoped(Producer.make(ProducerSettings(kafkaServers.toList)))

  private val runtime: zio.Runtime[Any] = zio.Runtime.default

  private def runAsFuture[A](z: ZIO[Any, Throwable, A]): Future[A] = {
    unsafe { implicit u =>
      runtime.unsafe.runToFuture(z)
    }
  }

  private def produce(topic: String, key: String, value: String): ZIO[Producer, Throwable, RecordMetadata] = {
    Producer.produce[Any, String, String](topic = topic, key = key, value = value, keySerializer = Serde.string, valueSerializer = Serde.string)
  }

  def sendMessage(key: String, message: String): Future[Unit] = {
    runAsFuture {
      produce("my-topic", key, message).provide(producer).orDieWith(ex => new RuntimeException("Error when sending Kafka message", ex)).ignore
    }
  }

}

class MyProducerTest extends AnyWordSpec with Matchers with EmbeddedKafka {

  private val kafkaConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0) // Use random port available

  "KafkaProducer" should {
    "send some messages" in {
      withRunningKafkaOnFoundPort(kafkaConfig) { implicit actualConfig =>
        val producer = new MyKafkaProducer(Seq(s"localhost:${actualConfig.kafkaPort}"))
        Await.result(producer.sendMessage("some-key", "An awesome message to send"), 30.seconds)
      }
    }
  }

}

Versions

"dev.zio" %% "zio-kafka" % "2.8.2",
"org.apache.kafka" % "kafka-clients" % "3.7.0",
"io.github.embeddedkafka" %% "embedded-kafka" % "3.7.0" % Test,

Results

With zio 2.1.8 (or 2.1.9): Future timed out after [30 seconds] java.util.concurrent.TimeoutException: Future timed out after [30 seconds] (on the Await.result).

With zio 2.1.7: the test succeeds (in about 5 seconds on my laptop, all included).

erikvanoosten commented 2 weeks ago

Since this is very likely to be a zio problem, could you please open the issue at https://github.com/zio/zio ?

svroonland commented 2 weeks ago

Thanks for taking the time to report this. This does not ring a bell and I don't immediately have a change in 2.1.8 that I suspect (based on https://github.com/zio/zio/releases/tag/v2.1.8). I tried to reproduce it (using zio test instead of scalatest), but the test succeeds. That is with zio version 2.1.9

Could you perhaps enable logging for org.apache.kafka (using logback) and see if there's anything of interest there that could explain anything?

And, taking a wild guess here, are you using ARM perhaps? There's a note in the 2.1.8 release notes about that.

gaeljw commented 2 weeks ago

Since this is very likely to be a zio problem, could you please open the issue at https://github.com/zio/zio ?

@erikvanoosten Sure, discussion moved to https://github.com/zio/zio/issues/9222 then.

gaeljw commented 1 week ago

For the record, it's related to https://github.com/zio/zio/issues/9208.

Need to explicitly set the zio-streams version to the same as zio to workaround it.

svroonland commented 1 week ago

Wow, I would not have guessed that 😅 Good to hear your issue is fixed