fd4s / fs2-kafka

Functional Kafka Streams for Scala
https://fd4s.github.io/fs2-kafka
Apache License 2.0
294 stars 100 forks source link

Cannot deserialize Array[Byte] fields via Avro with Schema Registry #1330

Open onepintwig opened 3 months ago

onepintwig commented 3 months ago

Hello

Firstly big fan of the library - has made interfacing with Kafka a breeze! Apologies if this is the wrong place to raise this and I should be looking at the vulcan repository instead.

I have recently hit an issue when trying to deserialise Array[Byte] keys and values when using the Vulcan module with a Schema Registry. When referenced in nested models this data type works fine. I'm aware this is a slightly odd interface, but unfortunately it is what we have to work with.

vulcan.AvroException$$anon$1: Error decoding Array[Byte]: Got unexpected type byte[], expected type ByteBuffer
  at vulcan.AvroException$.apply(AvroException.scala:18)
  at vulcan.AvroError$ErrorDecodingType.throwable(AvroError.scala:93)
  at fs2.kafka.vulcan.AvroDeserializer$.$anonfun$createDeserializer$extension$2(AvroDeserializer.scala:57)
  at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$createDeserializer$3(AvroDeserializer.scala:40)
  at defer @ fs2.kafka.vulcan.AvroSerializer$.$anonfun$create$3(AvroSerializer.scala:38)
  at product$extension @ fs2.kafka.KafkaProducer$.serializeToBytes(KafkaProducer.scala:242)
  at map @ fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:208)
  at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:275)
  at traverse @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:272)
  at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:277)
  at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$28(KafkaConsumer.scala:280)
  at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$28(KafkaConsumer.scala:280)
  at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$35(KafkaConsumer.scala:317)
  at map @ fs2.kafka.internal.KafkaConsumerActor.records(KafkaConsumerActor.scala:279)
  at delay @ fs2.kafka.internal.Blocking$$anon$2.apply(Blocking.scala:28)
  at flatMap @ fs2.kafka.internal.KafkaConsumerActor.pollConsumer$1(KafkaConsumerActor.scala:303)
  at flatMap @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$poll$12(KafkaConsumerActor.scala:423)
  at flatMap @ fs2.kafka.internal.KafkaConsumerActor.<init>(KafkaConsumerActor.scala:420)

I have reproduced in a simple test scenario running using the confluent 7.5.3 kafka stack via testcontainers in

Scala version: 2.12.18 fs2-kafka version: 3.5.1

//Fails
"SerDes" should "Serialize and deserialize ByteArray keys" in {

    val as: AvroSettings[IO] = AvroSettings {
      SchemaRegistryClientSettings[IO](schemaRegistryUrl)
    }

    val producerSettings =
      ProducerSettings[IO, Array[Byte], String](avroSerializer[Array[Byte]].forKey(as), avroSerializer[String].forValue(as))
        .withBootstrapServers(bootstrapServers)

    val consumerSettings =
      ConsumerSettings[IO, Array[Byte], String](avroDeserializer[Array[Byte]].forKey(as), avroDeserializer[String].forValue(as))
        .withBootstrapServers(bootstrapServers)
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withGroupId(UUID.randomUUID().toString)

    val topic = "inputTopic1"
    val keyBytes = "123".getBytes
    val value = "Test"

    val test = for {
      _ <- KafkaProducer.resource(producerSettings).use(producer => producer.produce(ProducerRecords.one(ProducerRecord(topic, keyBytes, value))))
      _ <- IO.sleep(1.second)
      result <- KafkaConsumer.resource(consumerSettings).use(consumer => consumer.subscribeTo(topic).flatMap(_ => consumer.records.take(1).compile.toList.map(_.head)))
    } yield (result.record.key, result.record.value)

    test.unsafeRunSync() shouldBe (keyBytes, value)

  }

  //Succeeds
  it should "Serialize and deserialize String keys and values" in {

    val as: AvroSettings[IO] = AvroSettings {
      SchemaRegistryClientSettings[IO](schemaRegistryUrl)
    }

    val producerSettings =
      ProducerSettings[IO, String, String](avroSerializer[String].forKey(as), avroSerializer[String].forValue(as))
        .withBootstrapServers(bootstrapServers)

    val consumerSettings =
      ConsumerSettings[IO, String, String](avroDeserializer[String].forKey(as), avroDeserializer[String].forValue(as))
        .withBootstrapServers(bootstrapServers)
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withGroupId(UUID.randomUUID().toString)

    val topic = "inputTopic3"
    val key = "123"
    val value = "Test"

    val test = for {
      _ <- KafkaProducer.resource(producerSettings).use(producer => producer.produce(ProducerRecords.one(ProducerRecord(topic, key, value))))
      _ <- IO.sleep(1.second)
      result <- KafkaConsumer.resource(consumerSettings).use(consumer => consumer.subscribeTo(topic).flatMap(_ => consumer.records.take(1).compile.toList.map(_.head)))
    } yield (result.record.key, result.record.value)

    test.unsafeRunSync() shouldBe (key, value)

  }
onepintwig commented 3 months ago

I can see that the underlying error is thrown by this block in the vulcan Codec. But what I am struggling to understand is why it is being read as an Array[Byte] and not as the ByteBuffer is should have been encoded as.

I have raised an issue there.