zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
335 stars 137 forks source link

Why are all the low-level (Java) Kafka AdminClient calls wrapped in `effectBlocking`? #492

Open guizmaii opened 2 years ago

guizmaii commented 2 years ago

AFAIK, and seeing that the Kafka AdminClient is using Java Futures (KafkaFuture implementing java.util.concurrent.Future and being based on CompletionStage since Kafka 3.1 or 3.2), why are all the calls made with this client wrapped in a effectBlocking?

Random example took from the code of zio-kafka:

    /**
     * List offset for the specified partitions.
     */
    override def listOffsets(
      topicPartitionOffsets: Map[TopicPartition, OffsetSpec],
      options: Option[ListOffsetsOptions] = None
    ): Task[Map[TopicPartition, ListOffsetsResultInfo]] = {
      val asJava = topicPartitionOffsets.bimap(_.asJava, _.asJava).asJava
      fromKafkaFuture {
        blocking.effectBlocking(
          options
            .fold(adminClient.listOffsets(asJava))(opts => adminClient.listOffsets(asJava, opts.asJava))
            .all()
        )
      }
    }.map(_.asScala.toMap.bimap(TopicPartition(_), ListOffsetsResultInfo(_)))
guizmaii commented 2 years ago

is it because of this synchronized:

image

?? 🤔

svroonland commented 1 year ago

@guizmaii What shall we do with this one year old issue?

guizmaii commented 1 year ago

@svroonland I'm still waiting for an answer/to understand why we need this and if we should keep these usages of blocking I'd prefer to leave it open if that's ok for you

svroonland commented 1 year ago

I suppose it was originally written that way as a precaution and probably after some actual experience of blocking in some circumstances.