logstash-plugins / logstash-input-kafka

Kafka input for Logstash
Apache License 2.0
139 stars 122 forks source link

Add support to read avro messages encoded with Confluent's kafkaavroserializer #37

Open Kargreat opened 9 years ago

Kargreat commented 9 years ago

I have avro messages encoded with kafkaavroserializer. I am not able to read them to elasticsearch using kafka input plugin.

I did classpath the kafka avro decoder (the entire serializer jars along with its dependencies) too. But I got an error that said that the plugin was not found.

do you happen to know this error or issue? do you have any thoughts to overcome this issue?

joekiller commented 9 years ago

@Kargreat Have you tried using the avro codec described in the following link? https://github.com/logstash-plugins/logstash-codec-avro/blob/master/DEVELOPER.md

Also check out https://github.com/logstash-plugins/logstash-codec-avro/blob/master/lib/logstash/codecs/avro.rb#L13-L17 to see how the schema is communicated.

Kargreat commented 9 years ago

@joekiller: I did read those posts and tried using logstadh-codec-avro in my input plugin. But those are for codecs that use binary decoder.

Also I went through their gits too. But they are different as we need to enhance the consumer with a different class. That corresponds to ruby-Kafka of yours.

Please let me know if you have any clarifications or queries.

joekiller commented 9 years ago

Okay I'll try to give this a more detailed look. On Jul 8, 2015 1:10 PM, "Kargreat" notifications@github.com wrote:

@joekiller https://github.com/joekiller: I did read those posts and tried using logstadh-codec-avro in my input plugin. But those are for codecs that use binary decoder.

Also I went through their gits too. But they are different as we need to enhance the consumer with a different class. That corresponds to ruby-Kafka of yours.

Please let me know if you have any clarifications or queries.

— Reply to this email directly or view it on GitHub https://github.com/logstash-plugins/logstash-input-kafka/issues/37#issuecomment-119666532 .

talevy commented 9 years ago

@Kargreat is this the serializer? https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java

This is not a regular Avro serialized message

Kargreat commented 9 years ago

@talevy: That is correct! That is the reason we need a consumer with a different decoder (the one for the KafkaAvroSerializer). The implementation can be found out: http://docs.confluent.io/1.0/schema-registry/docs/serializer-formatter.html & the KafkaAvroDecoder can be found at https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java.

@joekiller: the links i have pasted should give you further more information.

talevy commented 9 years ago

@Kargreat cool! that is what I thought. This is a special serialization format with a subset of the message being the contents in Avro. A new codec with this implementation is needed beyond what the existing avro-codec can support

Kargreat commented 9 years ago

@Tal: Will it be possible to enhance the codec to read values using the other decoder?

talevy commented 9 years ago

It is technically a separate serialization format, so would warrant a separate codec plugin.

ilene-gorman commented 9 years ago

If you have a customer deserializer, isn't that what the configuration / input/ kafka / decoder_class attribute is for?

When I set my decoder class to the Java class we use, I get an error that the class cannot be found, regardless of where I put the class, in the logstash main directory, or the logstash lib or bin directory. I've also put the jar file that contains the class file in all of those locations.

Can you please tell me after I've defined that class, how to use it in the logstash environment? What I'm trying to accomplish is to read avro data from kafka with a customized deserializer via logstash to elasticsearch.

Thanks for your help.

suyograo commented 9 years ago

In addition to the config changes, you'd need to add it to the classpath and start LS with your custom serializer.

input {
decoder_class => "com.mycomp.etl.avro.schema.kafka.RawLog2KafkaSerializer"
...
}

Logstash environment:

CLASSPATH=$CLASSPATH:/path/to/serializer.jar
bin/logstash -f <config_path>

I'll add this information to the docs

@ilene-gorman since you posted this on https://discuss.elastic.co/t/provide-custom-kafka-message-decoder-to-logstash/1780/3 lets follow up there.

joekiller commented 8 years ago

Finally RTFM'd this (http://docs.confluent.io/1.0/schema-registry/docs/serializer-formatter.html) and it seems pretty trivial to slip into jruby-kafka. I think I'll just pack Avro with the gem.

ignore this see below

joekiller commented 8 years ago

okay reading this more I think @talevy is correct that it just needs a codec. The input runs the raw message through the decoder prior to any mutations.

      @codec.decode("#{message_and_metadata.message}") do |event|
        decorate(event)
        if @decorate_events
          event['kafka'] = {'msg_size' => message_and_metadata.message.size,
                            'topic' => message_and_metadata.topic,
                            'consumer_group' => @group_id,
                            'partition' => message_and_metadata.partition,
                            'key' => message_and_metadata.key}
        end
        output_queue << event

The kafka plugin is just delivering raw binary by default via the decoder_class settings.

  # The serializer class for messages. The default decoder takes a byte[] and returns the same byte[]
  config :decoder_class, :validate => :string, :default => 'kafka.serializer.DefaultDecoder'

So if I make a codec that does the Kafka Avro Decoder, should be good to go.

talevy commented 8 years ago

hey everyone, here is a community plugin for the confluent serialization format: https://github.com/revpoint/logstash-codec-avro_schema_registry

Kargreat commented 8 years ago

That is so cool and thanks a lot!

Sent from my iPad

On Mar 17, 2016, at 10:33 PM, Tal Levy notifications@github.com wrote:

hey everyone, here is a community plugin for the confluent serialization format: https://github.com/revpoint/logstash-codec-avro_schema_registry

— You are receiving this because you were mentioned. Reply to this email directly or view it on GitHub

juliuszc commented 8 years ago

The newest changes in the plugin broke https://github.com/revpoint/logstash-codec-avro_schema_registry with this error:

←[31mAn unexpected error occurred! {:error=>#<ArgumentError: negative length -290656 given>, :backtrace=>["org/jruby/ext/stringio/StringIO.java:829:in read'", "C:/Users/jchoinski/Downloads/logstash-5.0.0-alpha2/logstash-5.0.0-alpha2/vendor/bundle/jruby/1.9/gems/avro-1.8.0/lib/avro/io.rb:106:in read'", "C:/Users/jchoinski/Downloads/logstash-5.0.0-alpha2/logstash-5.0.0-alpha2/vendor/bundle/jruby/1.9/gems/avro-1.8.0/lib/avro/io.rb:93:inread_bytes'", "C:/Users/jchoinski/Downloads/logstash-5.0.0-alpha2/logstash-5.0.0-alpha2/vendor/bundle/jruby/1.9/gems/avro-1.8.0/lib/avro/io.rb:99:in read_string'", "C:/Users/jchoinski/Downloads/logstash-5.0.0-alpha2/logstash-5.0.0-alpha2/vendor/bundle/jruby/1.9/gems/avro-1.8.0/lib/avro/io.rb:299:inread_data'", "C:/Users/jchoinski/Downloads/logstash-5.0.0-alpha2/logstash-5.0.0-alpha2/vendor/bundle/jruby/1.9/gems/avro-1.8.0/lib/avro/io.rb:384:in read_record'", "org/jruby/RubyArray.java:1613:ineach'", "C:/Users/jchoinski/Downloads/logstash-5.0.0-alpha2/logstash-5.0.0-alpha2/vendor/bundle/jruby/1.9/gems/avro-1.8.0/lib/avro/io.rb:382:in read_record'", "C:/Users/jchoinski/Downloads/logstash-5.0.0-alpha2/logstash-5.0.0-alpha2/vendor/bundle/jruby/1.9/gems/avro-1.8.0/lib/avro/io.rb:310:inread_data'", "C:/Users/jchoinski/Downloads/logstash-5.0.0-alpha2/logstash-5.0.0-alpha2/vendor/bundle/jruby/1.9/gems/avro-1.8.0/lib/avro/io.rb:275:in read'", "C:/Users/jchoinski/Downloads/logstash-5.0.0-alpha2/logstash-5.0.0-alpha2/vendor/local_gems/607c8166/logstash-codec-avro_schema_registry-0.9.0/lib/logstash/codecs/avro_schema_registry.rb:83:indecode'", "C:/Users/jchoinski/Downloads/logstash-5.0.0-alpha2/logstash-5.0.0-alpha2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-3.0.0.beta7/lib/logstash/inputs/kafka.rb:160:in thread_runner'", "file:/C:/Users/jchoinski/Downloads/logstash-5.0.0-alpha2/logstash-5.0.0-alpha2/vendor/jruby/lib/jruby.jar!/jruby/java/java_ext/java.lang.rb:12:ineach'", "C:/Users/jchoinski/Downloads/logstash-5.0.0-alpha2/logstash-5.0.0-alpha2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-3.0.0.beta7/lib/logstash/inputs/kafka.rb:159:in `thread_runner'"], :level=>:fatal}←[0m

How can I test what is being passed to the codec? That seems to be where it is breaking. Thanks, Juliusz

talevy commented 8 years ago

thanks for the update @juliuszc. Personally, I haven't tried this plugin recently, so I am not sure exactly as to the issue. I will try this out next week and see what's up

juliuszc commented 8 years ago

It turns out that the plugin was working fine and the two cooperate well together. I didn't realize that the default serializer was changed in the the new version and so I had to change from the string deserializer to the byte array deserializer. Sorry for the erroneous complaint.

joekiller commented 8 years ago

@juliuszc thanks for the update and solution!

stecino commented 8 years ago

Hi I was wondering if this plugin will work for logstash-2.2

I build the gem and installed it. One thing I was wondering is this correct syntax?

codec => { avro_schema_registry => { endpoint => "/home/t/logstash-2.2.2/etc/Payment.avsc" } }

talevy commented 8 years ago

@stecino is something not working with that syntax?

bjornhjelle commented 7 years ago

@stecino and @talevy I thinks perhaps endpoint should be the URL to the schema registry service where the avro schema has been uploaded. Ref: http://www.rubydoc.info/gems/logstash-codec-avro_schema_registry/0.9.0/LogStash/Codecs/AvroSchemaRegistry

CBR09 commented 7 years ago

Hi @talevy, @ph and @ryananguiano, I wonder why does encoder isn't implement? (This codec currently does not encode. This might be added later), is there any technical issue about that?. My team really need encoder, could you guide me how to implement that? Thanks in advance

Kargreat commented 7 years ago

The easiest way is to write directly using the index API after decoding separately!

Sent from my iPhone

On Apr 20, 2017, at 10:55 AM, Vu Nguyen Duy notifications@github.com wrote:

Hi @talevy @ph and @ryananguiano, I wonder why does encoder isn't implement?, is there any technical issue about that?. My team really need encoder to sync from Kafka to HDFS, could you guide me how to implement that? Thanks in advance

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread.

CBR09 commented 7 years ago

Thanks @Kargreat , sorry for my bad, I don't understand, could you please guide me in more detail?

ryananguiano commented 7 years ago

@juliuszc the package has been updated for logstash 5

tichomir commented 7 years ago

Any chance that the logstash-codec-avro_schema_registry will be also able to encode soon? Or do you know if somebody tried that already?

talevy commented 7 years ago

@tichomir that project is not maintained by the Elastic organization. It does not seem like the encode is implemented yet

cgiraldo commented 7 years ago

@tichomir @CBR09, I have modified revpoint's logstash-codec-avro_schema_registry to add a basic encoder:

https://github.com/revpoint/logstash-codec-avro_schema_registry/pull/4

ryananguiano commented 7 years ago

Just updated the encode method. Still needs to be tested.

https://github.com/revpoint/logstash-codec-avro_schema_registry/pull/5

alirezasafi commented 3 years ago

hey everyone, here is a community plugin for the confluent serialization format: https://github.com/revpoint/logstash-codec-avro_schema_registry

thanks a lot. do you know how to install this plugin when we use docker and docker-compose?

DenisOgr commented 2 years ago

hey everyone, here is a community plugin for the confluent serialization format: https://github.com/revpoint/logstash-codec-avro_schema_registry

thanks a lot. do you know how to install this plugin when we use docker and docker-compose?

Just run commands from plugin documentation in Dockerfile