fd4s / vulcan

Functional Avro for Scala
https://fd4s.github.io/vulcan
Apache License 2.0
101 stars 34 forks source link

Got unexpected type byte[], expected type ByteBuffer when deserialising byte array with bytes Codec #594

Open onepintwig opened 4 months ago

onepintwig commented 4 months ago

Hey!

Discovered while working with the fs2-kafka-vulcan module https://github.com/fd4s/fs2-kafka/issues/1330

Using the provided Codec.bytes I get the following error when trying to deserialize a Byte Array.

vulcan.AvroException$$anon$1: Error decoding Array[Byte]: Got unexpected type byte[], expected type ByteBuffer
  at vulcan.AvroException$.apply(AvroException.scala:18)
  at vulcan.AvroError$ErrorDecodingType.throwable(AvroError.scala:93)
  at fs2.kafka.vulcan.AvroDeserializer$.$anonfun$createDeserializer$extension$2(AvroDeserializer.scala:57)
  at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$createDeserializer$3(AvroDeserializer.scala:40)
  at defer @ fs2.kafka.vulcan.AvroSerializer$.$anonfun$create$3(AvroSerializer.scala:38)
  at product$extension @ fs2.kafka.KafkaProducer$.serializeToBytes(KafkaProducer.scala:242)
  at map @ fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:208)
  at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:275)
  at traverse @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:272)
  at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:277)
  at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$28(KafkaConsumer.scala:280)
  at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$28(KafkaConsumer.scala:280)
  at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$35(KafkaConsumer.scala:317)
  at map @ fs2.kafka.internal.KafkaConsumerActor.records(KafkaConsumerActor.scala:279)
  at delay @ fs2.kafka.internal.Blocking$$anon$2.apply(Blocking.scala:28)
  at flatMap @ fs2.kafka.internal.KafkaConsumerActor.pollConsumer$1(KafkaConsumerActor.scala:303)
  at flatMap @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$poll$12(KafkaConsumerActor.scala:423)
  at flatMap @ fs2.kafka.internal.KafkaConsumerActor.<init>(KafkaConsumerActor.scala:420)

I believe it is expected to be a ByteBuffer normally, however I think there is an optimisation in the schema-registry avro-deserializer to return the raw bytes when it detects the schema type is just Bytes. This causes an Array[Byte] to enter the decoder of the codec which does not match any of the available patterns.

I have added an extra match statement into the codecs where Avro.Bytes are expected, to handle this and it seemed to resolve the error. I am keen to raise a PR if this is understood to potentially be an issue.