logstash-plugins / logstash-codec-avro

A logstash codec plugin for decoding and encoding Avro records
Apache License 2.0
15 stars 63 forks source link

Binary encoding for output data #5

Closed oruen closed 7 years ago

oruen commented 9 years ago

Hello,

I'm trying to use this plugin with logstash-output-kafka. I've realized that messages I've sent cannot be decoded since they have invalid format. It turns out logstash-output-kafka uses StringEncoder by default to send these messages while they should be sent as binaries. To send messages as binaries I've switched serializer_class setting of logstash-output-kafka to kafka.serializer.DefaultEncoder. However now I'm facing different error (see below): DefaultEncoder accepts Array[Byte] while message it gets is a String.

There are 3 different places where this behavior could be fixed. We have to choose one and apply patch there:

I think this plugin is the right place for patch so I'm going to send PR shortly.

jruby-1.7.19 :145 > producer.send_msg("playbacks", nil, string)
Java::JavaLang::ClassCastException: java.lang.String cannot be cast to [B
    from kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34)
    from kafka.serializer.DefaultEncoder.toBytes(kafka/serializer/Encoder.scala:34)
    from kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:130)
    from kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(kafka/producer/async/DefaultEventHandler.scala:130)
    from kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:125)
    from kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(kafka/producer/async/DefaultEventHandler.scala:125)
    from scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    from scala.collection.IndexedSeqOptimized$class.foreach(scala/collection/IndexedSeqOptimized.scala:33)
    from scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    from scala.collection.mutable.WrappedArray.foreach(scala/collection/mutable/WrappedArray.scala:34)
    from kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:125)
    from kafka.producer.async.DefaultEventHandler.serialize(kafka/producer/async/DefaultEventHandler.scala:125)
    from kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:52)
    from kafka.producer.async.DefaultEventHandler.handle(kafka/producer/async/DefaultEventHandler.scala:52)
    from kafka.producer.Producer.send(Producer.scala:77)
    from kafka.producer.Producer.send(kafka/producer/Producer.scala:77)
    from kafka.javaapi.producer.Producer.send(Producer.scala:33)
    from kafka.javaapi.producer.Producer.send(kafka/javaapi/producer/Producer.scala:33)
    from java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:483)
    from org.jruby.RubyMethod.call(org/jruby/RubyMethod.java:120)
    from RUBY.send_msg(/Users/oruen/.rvm/gems/jruby-1.7.19/gems/jruby-kafka-1.4.0-java/lib/jruby-kafka/producer.rb:70)
    from RUBY.evaluate((irb):145)
    from org.jruby.RubyKernel.eval(org/jruby/RubyKernel.java:1107)
    from org.jruby.RubyKernel.loop(org/jruby/RubyKernel.java:1507)
    from org.jruby.RubyKernel.catch(org/jruby/RubyKernel.java:1270)
    from org.jruby.RubyKernel.catch(org/jruby/RubyKernel.java:1270)