logstash-plugins / logstash-codec-protobuf

Codec plugin for parsing Protobuf messages
Apache License 2.0
26 stars 16 forks source link

deserialize byte array field #50

Open duculete opened 4 years ago

duculete commented 4 years ago

I have this proto def:

message DataSet { string field1 = 1; bytes field2 = 2; bytes field3 = 3; int32 filed4 = 4; }

message TopicData { string schemaversion = 1; DataSet dt1 = 2; DataSet dt2 = 3; DataSet dt3 = 4; DataSet dt4 = 5; }

field2 and field3 are byte array and contains encrypted data.

my logic is to use protobuf to get data from kafka, then use ruby filter to decrypt. but it seems that field2, field3 are passed to ruby script (in filter) as strings but I need [byte]:

ex: field2: "02h{jA�C��x\u0013t��<�/" field3: "4yP'j^<�\u0017�>4T킁F=ݯ��M�U�}"

How can I pass the event.get([d4][field3]) as [byte] to logstash filter?

xo4n commented 4 years ago

What settings do you use in key_deserializer_class and value_deserializer_class in your kafka input config? Could you post it here?

duculete commented 4 years ago
key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
subhdeep commented 2 years ago

Facing similar issue, where the bytes field of a proto message is not getting parsed properly. Any idea how to go about the fix? Below is my kafka input plugin config.

 kafka {
        bootstrap_servers => "${KAFKA_BROKERS_LIST}"
        topics => ["raw-data"]
        group_id => "ls-data"
        client_id => "ls-data"
        max_poll_records => "100"
        max_poll_interval_ms => "600000"
        request_timeout_ms => "605000"
        fetch_max_bytes => "5000060"
        max_partition_fetch_bytes => "5000060"
        security_protocol => "${KAFKA_SECURITY_PROTOCOL}"
        decorate_events => true
        value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
        codec => protobuf {
            class_name => "pipe.DataCollectorResponse"
            class_file => '/usr/share/logstash/protos/compiled/pipe_data_pb.rb'
            protobuf_version => 3
        }
    }