zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
334 stars 133 forks source link

Consumer not reconnecting on lost session on proxy connectivity #1233

Open fd8s0 opened 2 months ago

fd8s0 commented 2 months ago

This behaviour doesn't seem to apply when not using a kafka proxy. Connecting directly we observe the stream always recomposes itself.

Example code:

import org.apache.kafka.clients.consumer.ConsumerConfig
import zio._
import zio.kafka.consumer.{Consumer, ConsumerSettings, RebalanceListener, Subscription}
import zio.kafka.serde.{Deserializer, Serializer}

object ZioKafkaProxyTest extends ZIOAppDefault {

  private val Props = Map(
    ConsumerConfig.GROUP_ID_CONFIG                 -> "zio-kafka-test",
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG   -> "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG        -> "earliest"
  )

  override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] = {
    for {
      consumer <- Consumer.make(
        ConsumerSettings(List("kafka-proxy:9192"))
          .withProperties(Props)
          .withPollTimeout(1.second)
          .withRebalanceListener(RebalanceListener(
            (assigned, _) => ZIO.logInfo(s"assigned: ${assigned.map(_.partition())}"),
            (revoked, _) => ZIO.logInfo(s"revoked: ${revoked.map(_.partition())}"),
            (lost, _) => ZIO.logInfo(s"lost: ${lost.map(_.partition())}"),
          ))
      )
      _ <-
        consumer.plainStream(Subscription.topics("topic"), Deserializer.string, Serializer.string, 2)
          .mapZIO(m => ZIO.logInfo(m.value))
          .runDrain
    } yield ExitCode.success
  }

}

In version 2.3.2 it goes something like this:

timestamp=2024-04-30T16:58:09.273167Z level=INFO thread=#zio-fiber-19 message="assigned: Set(0)" 
proxy stopped
wait for session timeout
proxy start
timestamp=2024-04-30T16:58:40.757980Z level=INFO thread=#zio-fiber-90 message="lost: Set(0)" 
timestamp=2024-04-30T16:58:43.769893Z level=INFO thread=#zio-fiber-97 message="assigned: Set(0)"
stream resumes

In version 2.7.4 it goes:

timestamp=2024-04-30T15:26:46.500919Z level=INFO thread=#zio-fiber-18 message="assigned: Set(0)"
stopped proxy
wait for session timeout
started proxy
timestamp=2024-04-30T15:29:32.603663Z level=INFO thread=#zio-fiber-22 message="lost: Set(0)" 
hangs indefinitely
when stopping:
2024-04-30 16:31:57 ERROR ConsumerCoordinator:1201 - [Consumer clientId=consumer-zio-kafka-test-1, groupId=zio-kafka-test] LeaveGroup request with Generation{generationId=-1, memberId='consumer-zio-kafka-test-1-e2669255-c590-4ac7-9457-a585dfbf3969', protocol='null'} failed with error: The coordinator is not aware of this member.
erikvanoosten commented 2 months ago

I suspect that this is since version 2.7.0, due to this change:

Always end streams in rebalance listener, support lost partitions https://github.com/zio/zio-kafka/pull/1089

What I do not understand is that it hangs indefinitely. If a partition is lost, the stream should be interrupted. This should immediately cause an error at the place where the stream is run.

fd8s0 commented 2 months ago

2.6 resumes the stream, so it must be 2.7+ issue

svroonland commented 2 weeks ago

@fd8s0 Since you seem to be able to reproduce the issue quite well, could you confirm that zio-kafka 2.7.5. fixes this issue?

There are a few upgrades of kafka-clients (apache kafka) versions in between zio-kafka 2.3.2 and 2.7.4: from 3.4.1 to 3.7.0. My suspicion is that besides the issue fixed in #1252 there is also a bug in newer kafka-clients that causes the connection not to be recovered. If we could try zio-kafka 2.7.5 with an older apache-kafka client, we can rule that out.

Would you be willing to try to reproduce the issue with the older kafka-clients by adding this to your dependencies? I'm not 100% sure if you can just override the version like that, but let's try.

"org.apache.kafka" % "kafka-clients" % "3.4.1"

If we could pinpoint the kafka-clients version that introduces the issue, that would be even nicer.

fd8s0 commented 1 week ago

@svroonland seems still broken in 2.7.5 for me

I rolled back kafka-clients version by version, all the way back to 3.4.1 while having zio-kafka 2.7.5 and in no case it works like in zio-kafka 2.6.

If you're having trouble replicating the issue I can try to help. I don't share my exact setup because I'm relying on a zookeeper instance embedded inside hbase and it's a bit offtopic. I'm adding this on a docker compose with the kafka server

kafka-proxy:
    container_name: kafka-proxy
    image: grepplabs/kafka-proxy:0.3.8
    command: server --bootstrap-server-mapping "kafka:9092,kafka-proxy:9192"
    network_mode: "host"

Connect to 9192 port with the client, stop the proxy container for over a minute, then start it again.

svroonland commented 1 week ago

Thanks, I'm able to replicate this behavior now.