Closed ghostPM closed 6 years ago
When i'm using logstash-codec-protobuf with plugin version 1.0.3 to decode my msg receive from kafka, I got the error in the following:
[2017-12-28T18:27:48,741][WARN ][logstash.codecs.protobuf ] Couldn't decode protobuf: #<ProtocolBuffers::DecodeError: invalid message>.
I'm not sure if the plugin support protobuf version 2.x? Since I use protoc with version 2.6.1.
protoc
I use the ruby-protocol-buffers which is recommended by logstash-codec-protobuf. The following shows my idl and rb files:
idl:
option java_package = "com.hiido.mobileyy.type.count.proto"; message MessageList{ required int32 message_num = 1; repeated bytes messages = 2; required int32 parse_type = 3; }
generated rb file:
#!/usr/bin/env ruby # Generated by the protocol buffer compiler. DO NOT EDIT! require 'protocol_buffers' # forward declarations class MessageList < ::ProtocolBuffers::Message; end class MessageList < ::ProtocolBuffers::Message set_fully_qualified_name "MessageList" required :int32, :message_num, 1 repeated :bytes, :messages, 2 required :int32, :parse_type, 3 end
and my logstash config file is like:
input { kafka { topics => ["mytopic"] bootstrap_servers => "127.0.0.1:9092" codec => protobuf { class_name => "MessageList" include_path => ['D:/temp/logstash/stream_message_list.pb.rb'] } } } output { stdout {} }
I have resolved it, since I forgot to set the value_deserializer_class property of kafka to org.apache.kafka.common.serialization.ByteArrayDeserializer which refers protobuf's data.
org.apache.kafka.common.serialization.ByteArrayDeserializer
Thank you
When i'm using logstash-codec-protobuf with plugin version 1.0.3 to decode my msg receive from kafka, I got the error in the following:
I'm not sure if the plugin support protobuf version 2.x? Since I use
protoc
with version 2.6.1.I use the ruby-protocol-buffers which is recommended by logstash-codec-protobuf. The following shows my idl and rb files:
idl:
generated rb file:
and my logstash config file is like: