logstash-plugins / logstash-codec-protobuf

Codec plugin for parsing Protobuf messages
Apache License 2.0
26 stars 16 forks source link

Couldn't decode protobuf: org.logstash.MissingConverterException: Missing Converter handling for full class name #42

Closed vaisov closed 4 years ago

vaisov commented 5 years ago

Getting this error for all messages taken from kafka. Enabled debug in Logstash, but apart from the error below don't get anything. Tried with Logstash 6.4.1 and 7.0.0

Pipeline starts fine:

[2019-04-24T14:29:19,968][INFO ][logstash.javapipeline    ] Starting pipeline {:pipeline_id=>"processed-entries", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>125, :thread=>"#<Thread:0x71f49406@/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:37 run>"}
[2019-04-24T14:29:19,971][INFO ][logstash.javapipeline    ] Pipeline started {"pipeline.id"=>"processed-entries"}

Error:

[2019-04-24T14:29:27,428][WARN ][logstash.codecs.protobuf ] Couldn't decode protobuf: org.logstash.MissingConverterException: Missing Converter handling for full class name=[B, simple name=byte[].
[2019-04-24T14:29:27,456][WARN ][logstash.codecs.protobuf ] Couldn't decode protobuf: org.logstash.MissingConverterException: Missing Converter handling for full class name=[B, simple name=byte[].
[2019-04-24T14:29:27,488][WARN ][logstash.codecs.protobuf ] Couldn't decode protobuf: org.logstash.MissingConverterException: Missing Converter handling for full class name=[B, simple name=byte[].
[2019-04-24T14:29:27,528][WARN ][logstash.codecs.protobuf ] Couldn't decode protobuf: org.logstash.MissingConverterException: Missing Converter handling for full class name=[B, simple name=byte[].
[2019-04-24T14:29:27,546][WARN ][logstash.codecs.protobuf ] Couldn't decode protobuf: org.logstash.MissingConverterException: Missing Converter handling for full class name=[B, simple name=byte[].

Here is the setup we have:

Pipeline:

input {
  kafka {
    bootstrap_servers => 'kafka-0.kafka:9092,kafka-1.kafka:9092,kafka-2.kafka:9092,kafka-3.kafka:9092,kafka-4.kafka:9092,kafka-5.kafka:9092'
    topics => ["processed-entries"]
    client_id => "logstash_pipe3"
    group_id => "logstash-pre"
    key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
    value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
    codec => protobuf
    {
      class_name => "bk.sk.pk.reservations.proto.types.v0.ReservationEntry"
      include_path => "/usr/share/logstash/proto-res-schemas/ReservationEntry_pb.rb"
      protobuf_version => 3
    }
    decorate_events => "true"
    auto_offset_reset => "earliest"
  }
}
filter {
  mutate {
    copy => { "creationTimestamp" => "index_name" }
  }
  mutate {
    gsub => [ "index_name", "^(\d{4}-\d{2}).*", "\1" ]
  }
}
output {
  stdout {
    codec => json
  }
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    http_compression => "true"
    index => "processed-entries-%{index_name}"
  }
}

Proto scheme:

syntax = "proto3";
package bk.sk.pk.reservations.proto.types.v0;

option java_multiple_files = true;

message ReservationEntry {
    string id = 1;
    string internalAccountId = 2;
    ReservationState state = 3;
    InstructedAmount instructedAmount = 4;
    Lifetime lifetime = 5;
    Requestor requestor = 6;
    Description description = 7;
    bool forceMarker = 8;
    string creationTimestamp = 9; // ISO timestamp format: yyyy-MM-ddTHH:mm:ss.SSSSSSX
}

message Requestor {
    string productCode = 1;

    string systemCode = 2;

    string init = 3;
}

enum ReservationState {
    RESERVED = 0;
    CANCELED = 1;
    CONSUMED = 2;
    EXPIRED = 3;
}

message Lifetime {
    string startDateTime = 1; // ISO timestamp format: yyyy-MM-ddTHH:mm:ss.SSSSSSX

    string endDateTime = 2; // ISO timestamp format: yyyy-MM-ddTHH:mm:ss.SSSSSSX
}

message InstructedAmount {
    DecimalNumber amount = 1;

    // 3 characters long currency code
    string currency = 2;
}

message DecimalNumber {
    int64 unscaledValue = 1;
    int32 scale = 2;
}

message Description {
    string text1 = 1;

    string text2 = 2;
}

Converted Ruby scheme:

require 'google/protobuf'

Google::Protobuf::DescriptorPool.generated_pool.build do
  add_message "bk.sk.pk.reservations.proto.types.v0.ReservationEntry" do
    optional :id, :string, 1
    optional :internalAccountId, :string, 2
    optional :state, :enum, 3, "bk.sk.pk.reservations.proto.types.v0.ReservationState"
    optional :instructedAmount, :message, 4, "bk.sk.pk.reservations.proto.types.v0.InstructedAmount"
    optional :lifetime, :message, 5, "bk.sk.pk.reservations.proto.types.v0.Lifetime"
    optional :requestor, :message, 6, "bk.sk.pk.reservations.proto.types.v0.Requestor"
    optional :description, :message, 7, "bk.sk.pk.reservations.proto.types.v0.Description"
    optional :forceMarker, :bool, 8
    optional :creationTimestamp, :string, 9
  end
  add_message "bk.sk.pk.reservations.proto.types.v0.Requestor" do
    optional :productCode, :string, 1
    optional :systemCode, :string, 2
    optional :init, :string, 3
  end
  add_message "bk.sk.pk.reservations.proto.types.v0.Lifetime" do
    optional :startDateTime, :string, 1
    optional :endDateTime, :string, 2
  end
  add_message "bk.sk.pk.reservations.proto.types.v0.InstructedAmount" do
    optional :amount, :message, 1, "bk.sk.pk.reservations.proto.types.v0.DecimalNumber"
    optional :currency, :string, 2
  end
  add_message "bk.sk.pk.reservations.proto.types.v0.DecimalNumber" do
    optional :unscaledValue, :int64, 1
    optional :scale, :int32, 2
  end
  add_message "bk.sk.pk.reservations.proto.types.v0.Description" do
    optional :text1, :string, 1
    optional :text2, :string, 2
  end
  add_enum "bk.sk.pk.reservations.proto.types.v0.ReservationState" do
    value :RESERVED, 0
    value :CANCELED, 1
    value :CONSUMED, 2
    value :EXPIRED, 3
  end
end

module Bk
  module Sk
    module Pk
      module Reservations
        module Proto
          module Types
            module V0
              ReservationEntry = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.ReservationEntry").msgclass
              Requestor = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.Requestor").msgclass
              Lifetime = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.Lifetime").msgclass
              InstructedAmount = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.InstructedAmount").msgclass
              DecimalNumber = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.DecimalNumber").msgclass
              Description = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.Description").msgclass
              ReservationState = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.ReservationState").enummodule
            end
          end
        end
      end
    end
  end
end
vaisov commented 5 years ago

@IngaFeick did you have any chance to look into this issue?

IngaFeick commented 5 years ago

@vaisov sorry, didn't have time yet. I'm going to try to reproduce this asap.

IngaFeick commented 5 years ago

@vaisov I have not been able to reproduce the issue yet. Can you send me one or two examples which trigger this?

vaisov commented 5 years ago

Meanwhile our proto schema has changed a bit and to show you couple of examples I had to re-configure the pipeline. Now the plugin even fails with fatal error (follows). I also pasted couple of messages from the topic which are already present in the topic and they might be the cause.

Error:

  Pipeline_id:processed-entries-proto
  Plugin: <LogStash::Inputs::Kafka value_deserializer_class=>"org.apache.kafka.common.serialization.ByteArrayDeserializer", codec=><LogStash::Codecs::Protobuf protobuf_version=>3, id=>"26c95eff-7e3a-4f6f-bc62-ba9eb93e7def", include_path=>["/usr/share/logstash/proto-s
chemas/CopReservationEntryEvent_pb.rb"], class_name=>"bk.sk.pk.reservations.proto.types.cop.v1.CopReservationEntryEvent", enable_metric=>true, stop_on_error=>false, pb3_encoder_autoconvert_types=>true>, auto_offset_reset=>"earliest", group_id=>"logstash-pre-proto", t
opics=>["processed-entries"], key_deserializer_class=>"org.apache.kafka.common.serialization.ByteArrayDeserializer", id=>"689a8f15f5031273237f514bc80f35fc478eeb95e419fb4be544fe019ff2c374", bootstrap_servers=>"kafka-0.kafka.core:9092,kafka-1.kafka.core:$
092,kafka-2.kafka.core:9092,kafka-3.kafka.core:9092,kafka-4.kafka.core:9092,kafka-5.kafka.core:9092", client_id=>"logstash_pre_proto", decorate_events=>true, enable_metric=>true, auto_commit_interval_ms=>"5000", consumer_threads=>1, enable_auto_commit=>"true", poll_time$
ut_ms=>100, ssl_endpoint_identification_algorithm=>"https", security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI">
  Error: Missing Converter handling for full class name=[B, simple name=byte[]
  Exception: Java::OrgLogstash::MissingConverterException
  Stack: org.logstash.Valuefier.fallbackConvert(Valuefier.java:98)
org.logstash.Valuefier.convert(Valuefier.java:76)
org.logstash.Valuefier.lambda$static$3(Valuefier.java:52)
org.logstash.Valuefier.convert(Valuefier.java:74)
org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_set_field(JrubyEventExtLibrary.java:99)
org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.gen)
org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:360)
org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:201)
org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:326)
org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72)
org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:128)
org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:151)
org.jruby.runtime.IRBlockBody.doYield(IRBlockBody.java:187)
org.jruby.runtime.BlockBody.yield(BlockBody.java:116)
org.jruby.runtime.Block.yield(Block.java:165)
org.jruby.ir.runtime.IRRuntimeHelpers.yield(IRRuntimeHelpers.java:478)
org.jruby.ir.targets.YieldSite.yield(YieldSite.java:105)
usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_protobuf_minus_1_dot_2_dot_2.lib.logstash.codecs.protobuf.RUBY$method$decode$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-protobuf-1.2.2/lib/logstash/codecs/prot$
buf.rb:217)
org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:117)
org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:156)
org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:350)
org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:180)
org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:187)
org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:338)
org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72)
org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:128)
org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:151)
org.jruby.runtime.IRBlockBody.doYield(IRBlockBody.java:187)
org.jruby.runtime.BlockBody.yield(BlockBody.java:116)
org.jruby.runtime.Block.yield(Block.java:165)
org.jruby.javasupport.ext.JavaLang$Iterable.each(JavaLang.java:99)
org.jruby.javasupport.ext.JavaLang$Iterable$INVOKER$s$0$0$each.call(JavaLang$Iterable$INVOKER$s$0$0$each.gen)
org.jruby.internal.runtime.methods.JavaMethod$JavaMethodZeroBlock.call(JavaMethod.java:555)
org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:80)
org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:89)
org.jruby.ir.instructions.CallBase.interpret(CallBase.java:537)
org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:362)
org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72)
org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:128)
org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:151)
org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:79)
org.jruby.runtime.Block.call(Block.java:124)
org.jruby.RubyProc.call(RubyProc.java:295)
org.jruby.RubyProc.call(RubyProc.java:274)
org.jruby.RubyProc.call(RubyProc.java:270)
org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)
java.lang.Thread.run(Thread.java:748)
[2019-07-17T14:16:47,962][FATAL][logstash.runner          ] An unexpected error occurred! {:error=>org.logstash.MissingConverterException: Missing Converter handling for full class name=[B, simple name=byte[], :backtrace=>["org.logstash.Valuefier.fallbackConvert(Valuefi$
r.java:98)", "org.logstash.Valuefier.convert(Valuefier.java:76)", "org.logstash.Valuefier.lambda$static$3(Valuefier.java:52)", "org.logstash.Valuefier.convert(Valuefier.java:74)", "org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_set_field(JrubyEventExtLibrary.java:$
9)", "org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.gen)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:360)", "org.jruby.runtime.callsite.Cac
hingCallSite.call(CachingCallSite.java:201)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:326)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72)", "org.jruby.ir.interpreter.Interpreter.INTE
RPRET_BLOCK(Interpreter.java:128)", "org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:151)", "org.jruby.runtime.IRBlockBody.doYield(IRBlockBody.java:187)", "org.jruby.runtime.BlockBody.yield(BlockBody.java:116)", "org.jruby.runtime.Block.y
ield(Block.java:165)", "org.jruby.ir.runtime.IRRuntimeHelpers.yield(IRRuntimeHelpers.java:478)", "org.jruby.ir.targets.YieldSite.yield(YieldSite.java:105)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_protobuf_minus_1_dot_2_dot_
2.lib.logstash.codecs.protobuf.RUBY$method$decode$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-protobuf-1.2.2/lib/logstash/codecs/protobuf.rb:217)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:117)", "org.jruby.i
nternal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:156)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:350)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:180)", "org.jruby.runtime.callsite.Cach
ingCallSite.callIter(CachingCallSite.java:187)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:338)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72)", "org.jruby.ir.interpreter.Interpreter.I
NTERPRET_BLOCK(Interpreter.java:128)", "org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:151)", "org.jruby.runtime.IRBlockBody.doYield(IRBlockBody.java:187)", "org.jruby.runtime.BlockBody.yield(BlockBody.java:116)", "org.jruby.runtime.Bloc
k.yield(Block.java:165)", "org.jruby.javasupport.ext.JavaLang$Iterable.each(JavaLang.java:99)", "org.jruby.javasupport.ext.JavaLang$Iterable$INVOKER$s$0$0$each.call(JavaLang$Iterable$INVOKER$s$0$0$each.gen)", "org.jruby.internal.runtime.methods.JavaMethod$JavaMethodZeroB
lock.call(JavaMethod.java:555)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:80)", "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:89)", "org.jruby.ir.instructions.CallBase.interpret(CallBase.java:537)", "org.jruby.ir.i
nterpreter.InterpreterEngine.processCall(InterpreterEngine.java:362)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72)", "org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:128)", "org.jruby.runtime.MixedM
odeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:151)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:79)", "org.jruby.runtime.Block.call(Block.java:124)", "org.jruby.RubyProc.call(RubyProc.java:295)", "org.jruby.RubyProc.call(RubyProc.java:274)", "org.jru
by.RubyProc.call(RubyProc.java:270)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)", "java.lang.Thread.run(Thread.java:748)"]}
[2019-07-17T14:16:48,017][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

Messages which are present in the topic:

q
2019-05-20T08:01:00.744ZS"2019-05-17T13:08:33.814Z*     test_SBIT2      test_SBID:      test_BRIDB      test_SBRNJ      test_LOIDPN
7874589782792477686$2bc7a5f6-9a6c-454c-8ceb-3d4d48cc4524"
       ͡DKK*:
019-05-17T15:08:33.814000Z019-05-17T15:08:33.814000Z2!
        test_PDBT       test_SYBT       test_INIT:

test_TEXT1
test_TEXT2J019-05-17T14:08:33.814000Z

g
2019-05-20T08:43:55.809ZH"2018-09-03T22:31:35.547584624ZcmMIUNAF2
ofrqFBiUzOBAFUqqniB8663P

16264722327
5269535372"

܃sDKK*:
017-02-10T14:20:06.685798Z017-06-06T23:19:29.642617Z2
DKXDYeaX:?
, cZh96|Q%(kGwYvr|/bBY%R?p<Bu]np]+|ac$0(gmMf(w {gi4&{*Wd!Q@!J018-09-03T22:31:35.547584Z

g
2019-05-20T08:43:55.919ZH"2019-11-05T00:05:42.093052569ZXWZYQlgT2
2WQEiaGmxRcFppOe0HB6747P

63788172800
3911602074"
6DKK*:
017-01-26T15:58:01.422214Z017-02-03T03:48:29.966907Z2
pezxiZjMYso:F
$(HHvE_EYJgFK/Hd-8)h O|PBC'J@P+sMOPT3`k{^,i%t18HzzF VkPVD)cM4#!pvuJ019-11-04T23:05:42.093052Z

j
2019-05-20T09:01:30.900ZH"2017-01-04T17:28:06.450281340ZLqrI78Mx2
YR1pcGplPi7bK2JqxUB4039JZP

58739343613
1572349332"
DKK*:
017-01-21T08:11:48.380226Z017-07-10T00:34:33.919286Z2
lLIAuTXRH:

*Xa0.moVJ017-01-04T16:28:06.450281Z

j
2019-05-20T09:01:31.366ZH"2019-03-16T16:13:03.429251312ZtUpUajIg2
UCCEKR12Q1L7wtl06IB1171JBP

10776643750
7160424078"

DKK*:
017-01-11T17:58:04.294871Z017-10-22T13:07:00.136853Z2
eapxdaPXcY:o
A_v>B7wU_Et7uf\;B`1X=d]@IQX$U"=D;jSn")KUr(2A9v,\/17-y+mSkxMV|Ixa+Q*UKR<BcR|nO_ji2Ogr0?Q`p{RV4VGFeKQw>*XNpC<$XJ019-03-16T15:13:03.429251Z

I can send updated scheme and most recent messages if needed if you won't be able to reproduce the issue.

vaisov commented 5 years ago

@IngaFeick were you able to reproduce this issue?

IngaFeick commented 5 years ago

@vaisov can you give me an unencoded message please? I need to know the exact values which are causing this. Also of interest: which version of the codec are you using and what language/lib are you writing the messages into Kafka with?

vaisov commented 5 years ago
meta {
  inputSource: "H"
  inputUpdatedOn: "2017-11-15T14:51:42.942855446Z"
  updatingProgram: "pOooNuJD"
  terminalId: "34u5GADsdX"
  userId: "nQHvCsef"
  updatingOffice: "7120"
  locationId: "D"
  optsTimezone: 20000
}
data {
  reservationId: "94842045307"
  eventId: "94842045307O20171115155142942855"
  internalAccountId: "5076139206"
  state: RESERVED
  instructedAmount {
    amount {
      unscaledValue: 658000
      scale: 2
    }
    cy: "DKK"
  }
  lifetime {
    startTimestamp: "2020-01-08T02:57:45.684530Z"
    endTimestamp: "2020-01-09T22:32:05.413258Z"
  }
requestor {
    productCode: "Llq"
    systemCode: "C"
    init: "NOS"
  }
  description {
    text1: "-m6TCc((eee9H6G^&"
    text2: "LMiO2jdFjt&7!.@Mp{uizu_+S?3,u=}W]9Nk\'m-#WE2MD*IE"
  }
  forceMarker: true
  createdOn: "2017-11-15T13:51:42.942855Z"
}
IngaFeick commented 5 years ago

@vaisov thx

IngaFeick commented 5 years ago

@vaisov How are you generating the messages exactly?

IngaFeick commented 5 years ago

@vaisov I see neither eventId nor reservationId in the protobuf definition. Also instructedAmount.cy is called instructedAmount.currency in the PB def. Can you please send a the new protobuf file? Thanks

vaisov commented 5 years ago

New protobuf file:

syntax = "proto3";
package bk.sk.pk.reservations.proto.types.v1;

option java_multiple_files = true;

message CopReservationEntryEvent {
    CopReservationEntryEventMeta meta = 1;
    ReservationEntryEventData data = 2;
}

message CopReservationEntryEventMeta {
    string inputSource = 1;
    string inputUpdatedOn = 2;
    string updatingProgram = 3;
    string terminalId = 4;
    string userId = 5;
    string updatingOffice = 6;
    string locationId = 7;
    int32 optsTimezone = 8;
}

message ReservationEntryEventData {
    string reservationId = 1;
    string eventId = 2;
    string internalAccountId = 3;
    ReservationState state = 4;
    InstructedAmount instructedAmount = 5;
    Lifetime lifetime = 6;
    Requestor requestor = 7;
    Description description = 8;
    bool forceMarker = 9;
    string createdOn = 10;
}

enum ReservationState {
    UNDEFINED = 0;
    RESERVED = 1;
    CANCELED = 2;
    CONSUMED = 3;
    EXPIRED = 4;
}

message InstructedAmount {
    DecimalNumber amount = 1;
    string currency = 2;
}

message Lifetime {
    string startTimestamp = 1;
    string endTimestamp = 2;
}

message Requestor {
    string productCode = 1;
    string systemCode = 2;
    string init = 3;
}

message Description {
    string text1 = 1;
    string text2 = 2;
}

message DecimalNumber {
    int64 unscaledValue = 1;
    int32 scale = 2;
}
vaisov commented 5 years ago

New example entry:

meta {
  inputSource: "H"
  inputUpdatedOn: "2017-09-30T19:07:13.526246165Z"
  updatingProgram: "8Tbm8nG4"
  terminalId: "4afBzTXVAb"
  userId: "njxMLX9F"
  updatingOffice: "2312"
  locationId: "R"
  optsTimezone: 20000
}
data {
  reservationId: "34953061251"
  eventId: "34953061251D20170930210713526246"
  internalAccountId: "7177832327"
  state: CONSUMED
  instructedAmount {
    amount {
      unscaledValue: 8086666892446918
      scale: 2
    }
    currency: "DKK"
  }
  lifetime {
    startTimestamp: "2018-01-02T07:22:51.625071Z"
    endTimestamp: "2018-01-03T15:28:24.516277Z"
  }
  requestor {
    productCode: "GLU"
    systemCode: "znJUI"
    init: "tX"
  }
  description {
    text1: "\'e]"
    text2: "n"
  }
  forceMarker: true
  createdOn: "2017-09-30T19:07:13.526246Z"
}
rimvydas-pranciulis commented 5 years ago

We fixed this error by removing key_deserializer_class setting (was set to org.apache.kafka.common.serialization.ByteArrayDeserializer). We actually don't use key value, and it had some Avro message bytes that were inherited from upstream topic with Kafka Streams. So without key_deserializer_class everything works fine.