logstash-plugins / logstash-codec-avro

A logstash codec plugin for decoding and encoding Avro records
Apache License 2.0
15 stars 63 forks source link

undefined method `type_sym' for nil:NilClass #33

Open fonzie1006 opened 4 years ago

fonzie1006 commented 4 years ago

I listen to kafka messages. Kafka messages are in avro format, and errors are reported when running.

avro-logstash_1  | [2019-12-13T15:38:13,239][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=kafka_avro_dev-2, groupId=kafka_avro_dev] Setting newly assigned partitions:
avro-logstash_1  | [2019-12-13T15:38:13,239][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=kafka_avro_dev-3, groupId=kafka_avro_dev] Successfully joined group with generation 46
avro-logstash_1  | [2019-12-13T15:38:13,240][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=kafka_avro_dev-3, groupId=kafka_avro_dev] Setting newly assigned partitions:
avro-logstash_1  | [2019-12-13T15:38:13,249][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=kafka_avro_dev-0, groupId=kafka_avro_dev] Setting newly assigned partitions: teacher_roll_call-0
avro-logstash_1  | [2019-12-13T15:38:13,265][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=kafka_avro_dev-0, groupId=kafka_avro_dev] Setting offset for partition teacher_roll_call-0 to the committed offset FetchPosition{offset=102568, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=10.1.64.243:9093 (id: 1 rack: null), epoch=0}}
avro-logstash_1  | [2019-12-13T15:38:34,575][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=kafka_avro_dev-0, groupId=kafka_avro_dev] Member kafka_avro_dev-0-8c8460a0-7e81-48e5-94db-6582900b1bcf sending LeaveGroup request to coordinator 10.1.64.243:9093 (id: 2147483646 rack: null)
avro-logstash_1  | warning: thread "Ruby-0-Thread-15: :1" terminated with exception (report_on_exception is true):
avro-logstash_1  | NoMethodError: undefined method `type_sym' for nil:NilClass
avro-logstash_1  |   match_schemas at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/schema_compatibility.rb:36
avro-logstash_1  |   match_schemas at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:240
avro-logstash_1  |       read_data at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:257
avro-logstash_1  |      read_union at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:355
avro-logstash_1  |       read_data at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:286
avro-logstash_1  |            read at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:252
avro-logstash_1  |          decode at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77
avro-logstash_1  |   thread_runner at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:258
avro-logstash_1  |   thread_runner at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:257
avro-logstash_1  | [2019-12-13T15:38:34,599][ERROR][logstash.javapipeline    ] A plugin had an unrecoverable error. Will restart this plugin.
avro-logstash_1  |   Pipeline_id:main
avro-logstash_1  |   Plugin: <LogStash::Inputs::Kafka codec=><LogStash::Codecs::Avro schema_uri=>"/rds-dts-record.avsc", id=>"40a4c414-d851-475f-b435-20dc6aca7f94", enable_metric=>true, tag_on_failure=>false>, auto_offset_reset=>"latest", group_id=>"kafka_avro_dev", topics=>["teacher_roll_call"], consumer_threads=>4, id=>"ed822f74235bdf59b1ee471d9053997e1e7ab1143de365181cfd66cf250243d3", type=>"kafka_avro_dev", bootstrap_servers=>"10.1.64.243:9093", client_id=>"kafka_avro_dev", decorate_events=>true, enable_metric=>true, auto_commit_interval_ms=>"5000", enable_auto_commit=>"true", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", poll_timeout_ms=>100, ssl_endpoint_identification_algorithm=>"https", security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI">
avro-logstash_1  |   Error: undefined method `type_sym' for nil:NilClass
avro-logstash_1  |   Exception: NoMethodError
avro-logstash_1  |   Stack: /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/schema_compatibility.rb:36:in `match_schemas'
avro-logstash_1  | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:240:in `match_schemas'
avro-logstash_1  | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:257:in `read_data'
avro-logstash_1  | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:355:in `read_union'
avro-logstash_1  | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:286:in `read_data'
avro-logstash_1  | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:252:in `read'
avro-logstash_1  | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77:in `decode'
avro-logstash_1  | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:258:in `block in thread_runner'
avro-logstash_1  | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:257:in `block in thread_runner'
avro-logstash_1  | [2019-12-13T15:38:34,599][FATAL][logstash.runner          ] An unexpected error occurred! {:error=>#<NoMethodError: undefined method `type_sym' for nil:NilClass>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/schema_compatibility.rb:36:in `match_schemas'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:240:in `match_schemas'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:257:in `read_data'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:355:in `read_union'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:286:in `read_data'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:252:in `read'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77:in `decode'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:258:in `block in thread_runner'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:257:in `block in thread_runner'"]}
avro-logstash_1  | [2019-12-13T15:38:34,656][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

My avsc file is from here:https://github.com/LioRoger/subscribe_example/blob/master/avro/Record.avsc

My logstash configuration file is as follows:

input {
    kafka {
        bootstrap_servers => "10.1.64.243:9093"
            consumer_threads => 4
            auto_offset_reset => "latest"
    #key_deserializer_class => "org.apache.kafka.common.serialization.ByteArraySerializer"
    #value_deserializer_class => "org.apache.kafka.common.serialization.ByteArraySerializer"
            codec => avro {
                             schema_uri => "/rds-dts-record.avsc"
                        }
        topics => ["teacher_roll_call"]
                        type => "kafka_avro_dev"
            decorate_events => "true"
            client_id => "kafka_avro_dev"
            group_id => "kafka_avro_dev"

    }
}

#filter {
#        json {
#                source => "message"
#        }
#        mutate {
#            add_field => {
#                "kafka" => "%{[@metadata][kafka]}"
#            }
#        }
#}

output {
  if [type] == "kafka_avro_dev" {
    logservice {
      codec => "json"
      endpoint => "cn-xxxxx-xxxxxx.log.aliyuncs.com"
      project => "xmkafka"
      logstore => "dev-dts"
      topic => "ALL"
      source => ""
      access_key_id => "xxxxxxx"
      access_key_secret => "xxxxxxxx"
      max_send_retry => 10
    }
  }
}
netsplit commented 3 years ago

I experienced the same in combination with Confluents Schema Registry and messages that were encoded with Confluents Avro Wire Frame format. Using https://github.com/revpoint/logstash-codec-avro_schema_registry instead of the default plugin solved this problem for me.

bencody commented 2 years ago

I'm using the AWS Glue schema registry, so my Kafka records are serialized with com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer. Trying to process those records with this Logstash Avro codec fails with the same NoMethodError: undefined method type_sym for nil:NilClass error mentioned in this issue.

From what I understand, both the Confluent schema registry serializer and the AWS Glue schema registry serializer don't just output the raw, standard Avro bytes, but they add some additional vendor-specific bytes as well:

These additional bytes cause this Logstash Avro codec to fail to parse the data.

I guess adding Confluent and AWS Glue specific configurations to this Avro-specific plugin would add bloat. The Elastic Kafka input plugin already has some Confluent schema registry support (https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-kafka.html#plugins-inputs-kafka-schema_registry_url), but I don't see anything for Glue.