fd4s / fs2-kafka

Functional Kafka Streams for Scala
https://fd4s.github.io/fs2-kafka
Apache License 2.0
294 stars 100 forks source link

Zipping finite KafkaConsumer-based streams doesn't behave as expected (first stream gets recycled after first 1000 elements) #1293

Open ppodlovics opened 8 months ago

ppodlovics commented 8 months ago

Hello,

I've run into an issue with finite fs2-kafka streams and zipping. It seems that if I take more than 1000 elements from the default-configured streams and then zip them, the first stream's elements get recycled after the 1000th one starting with the 0th one. The example code snippet makes it clearer below.

Scala version: 2.12.18 fs2.kafka version: 3.1.0

{
      val bootstrapServers = kafkaHostPort
      val topic1Name = "test-topic-1"
      val topic2Name = "test-topic-2"

      val producerSettings: ProducerSettings[IO, String, String] =
        ProducerSettings(
          keySerializer = Serializer.string[IO](StandardCharsets.UTF_8),
          valueSerializer = Serializer.string[IO](StandardCharsets.UTF_8),
        )
          .withBootstrapServers(bootstrapServers)
          .withEnableIdempotence(true)
          .withRetries(3)

      val consumerSettings: ConsumerSettings[IO, String, String] =
        ConsumerSettings(
          Deserializer.string[IO],
          Deserializer.string[IO],
        )
          .withAutoOffsetReset(AutoOffsetReset.Earliest)
          .withBootstrapServers(bootstrapServers)
          .withGroupId("test-group")

      val records = (0 to 1010).flatMap { i =>
        Vector(
          ProducerRecord(topic1Name, s"k${i}", s"v${i}"),
          ProducerRecord(topic2Name, s"k${i}", s"v${i}"),
        )
      }.toVector
      val producerRecords = ProducerRecords(records)

      KafkaProducer.resource(producerSettings).use { producer =>
        for {
          _ <- producer.produce(producerRecords)
          topic1Stream = KafkaConsumer
            .stream(consumerSettings)
            .subscribeTo(topic1Name)
            .records
            .take(1005)
          topic2Stream = KafkaConsumer
            .stream(consumerSettings)
            .subscribeTo(topic2Name)
            .records
            .take(1005)
          result <- (topic1Stream zip topic2Stream)
            .evalTap { case (lhs, rhs) => IO.println(lhs) >> IO.println(rhs) }
            .compile
            .toVector
        } yield {
          result.foreach { case (lhs, rhs) =>
            assert(lhs.record.key == rhs.record.key && lhs.record.value == rhs.record.value)
          }
        }
      }
    }

This code fails on the assert with "k[]0" did not equal "k[100]0". I added some logging to see more clearly what is happening, and there it shows this:

CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 997, key = k997, value = v997, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 998, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 997, key = k997, value = v997, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 998, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 998, key = k998, value = v998, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 999, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 998, key = k998, value = v998, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 999, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 999, key = k999, value = v999, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 1000, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 999, key = k999, value = v999, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1000, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 0, key = k0, value = v0, timestamp = Timestamp(createTime = 1706178880476), serializedKeySize = 2, serializedValueSize = 2, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 1, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 1000, key = k1000, value = v1000, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 5, serializedValueSize = 5, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1001, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 1, key = k1, value = v1, timestamp = Timestamp(createTime = 1706178880477), serializedKeySize = 2, serializedValueSize = 2, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 2, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 1001, key = k1001, value = v1001, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 5, serializedValueSize = 5, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1002, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 2, key = k2, value = v2, timestamp = Timestamp(createTime = 1706178880477), serializedKeySize = 2, serializedValueSize = 2, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 3, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 1002, key = k1002, value = v1002, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 5, serializedValueSize = 5, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1003, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 3, key = k3, value = v3, timestamp = Timestamp(createTime = 1706178880477), serializedKeySize = 2, serializedValueSize = 2, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 4, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 1003, key = k1003, value = v1003, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 5, serializedValueSize = 5, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1004, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 4, key = k4, value = v4, timestamp = Timestamp(createTime = 1706178880477), serializedKeySize = 2, serializedValueSize = 2, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 5, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 1004, key = k1004, value = v1004, timestamp = Timestamp(createTime = 1706178880599), serializedKeySize = 5, serializedValueSize = 5, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1005, test-group))

Could you help me resolve this?

P.S.: this other issue might be related: https://github.com/fd4s/fs2-kafka/issues/1292

aartigao commented 8 months ago

What happens if instead of zipping you use concatenation (++) of the streams? The second topic goes back to 0 too?