logstash-plugins / logstash-input-google_pubsub

Logstash input for pulling events from Google Pub/Sub service
Apache License 2.0
19 stars 33 forks source link

Avro Codec Support #54

Open gferreux opened 3 years ago

gferreux commented 3 years ago

Looks like avro does not interact well with this pubsub input:

this config:

input {
      google_pubsub {
        ...
        include_metadata => true
        codec => avro {
          schema_uri => "/xxx.avsc"
        }
      }
    }

give this error:

Jun 10, 2021 1:47:00 PM com.google.cloud.pubsub.v1.MessageDispatcher$AckHandler onFailure
WARNING: MessageReceiver failed to processes ack ID: PjA-RVNEUAYWLF1GXXXXXXXXXXXXXXX_NSAoRRIACmd5OANGHgoHXFx1B1EMHnsrZCE8XBUIVk9TdF9zKxWMw-TDMXMtG3Z9ZnZtXBEACkxUfVZSHSW1-cu 
org.jruby.exceptions.ArgumentError: (ArgumentError) negative length -2600952 given  
at org.jruby.ext.stringio.StringIO.read(org/jruby/ext/stringio/StringIO.java:851)      
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.avro_minus_1_dot_10_dot_2.lib.avro.io.read(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.10.2/ 
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.avro_minus_1_dot_10_dot_2.lib.avro.io.read_bytes(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1 
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.avro_minus_1_dot_10_dot_2.lib.avro.io.read_string(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro- 
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.avro_minus_1_dot_10_dot_2.lib.avro.io.read_data(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1. 
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.avro_minus_1_dot_10_dot_2.lib.avro.io.read_record(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro- 
at org.jruby.RubyArray.each(org/jruby/RubyArray.java:1809)      
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.avro_minus_1_dot_10_dot_2.lib.avro.io.read_record(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro- 
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.avro_minus_1_dot_10_dot_2.lib.avro.io.read_data(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1. 
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.avro_minus_1_dot_10_dot_2.lib.avro.io.read(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.10.2/ 
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_avro_minus_3_dot_2_dot_4_minus_java.lib.logstash.codecs.avro.decode(/usr/share/ 
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_google_pubsub_minus_1_dot_2_dot_1.lib.logstash.inputs.google_pubsub.run(/usr/sh 
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_google_pubsub_minus_1_dot_2_dot_1.lib.logstash.inputs.google_pubsub.receiveMess

maybe this plugin need a setting to setup the value deserializer as bytes to be sent to the avro codec ?

the kafka input plugin as: value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"

bquinart commented 1 year ago

We hit a similar problem when trying to use the protobuf codec. The documentation states it is mostly made for JSON content. Looks like it does an UTF8 conversion before feeding to the codec here.

Did someone figure out to get this working with a codec expecting the raw bytes?