revpoint / logstash-codec-avro_schema_registry

A logstash codec plugin for decoding and encoding Avro records
Other
26 stars 23 forks source link

NoMethodError: undefined method `to_java_bytes' #15

Open cameronkerrnz opened 6 years ago

cameronkerrnz commented 6 years ago

Hello, I'm doing a bit of development with this codec (thank you for making this available, much appreciated). I struck an issue that may be due to change in logstash.

With logstash 6.3 (I haven't tried other versions), I get the following stack-trace

Jun 18 12:47:02 node-1 logstash[11790]: [FATAL] 2018-06-18 00:47:02.731 [LogStash::Runner] runner - An unexpected error occurred! {:error=>#<NoMethodError: undefined method `to_java_bytes' for #<#<Class:0x427befaa>:0x3ad546be>
Jun 18 12:47:02 node-1 logstash[11790]: Did you mean?  to_java_object>, :backtrace=>[
"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.10/lib/logstash/outputs/kafka.rb:199:in `block in register'", 
"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro_schema_registry-1.1.0/lib/logstash/codecs/avro_schema_registry.rb:251:in `encode'", 
"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.10/lib/logstash/outputs/kafka.rb:221:in `block in multi_receive'", 
"org/jruby/RubyArray.java:1734:in `each'", 
"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.10/lib/logstash/outputs/kafka.rb:219:in `multi_receive'", 
"org/logstash/config/ir/compiler/OutputStrategyExt.java:109:in `multi_receive'", 
"org/logstash/config/ir/compiler/OutputDelegatorExt.java:156:in `multi_receive'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:475:in `block in output_batch'", 
"org/jruby/RubyHash.java:1343:in `each'", 
"/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:474:in `output_batch'", 
"/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:426:in `worker_loop'", 
"/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:384:in `block in start_workers'"]}
Jun 18 12:47:03 node-1 logstash[11790]: [ERROR] 2018-06-18 00:47:03.076 [LogStash::Runner] Logstash - java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

This appears to occur because of the following code in avro_schema_registry.rb

/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro_schema_registry-1.1.0/lib/logstash/codecs/avro_schema_registry.rb

    if @binary_encoded
       @on_event.call(event, buffer.string.to_java_bytes)   # <--- line 251

This points to a to_java_bytes method, but this seems to cause issues with kafka.rb

(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.10/lib/logstash/outputs/kafka.rb)

    elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer'
      @codec.on_event do |event, data|
        write_to_kafka(event, data.to_java_bytes)   # <--- line 199
      end

So data here is presumably already a to_java_bytes.... and is why jruby is complaining that it doesn't have to_java_bytes method.

Changing avro_schema_registry.rb to pass in buffer.string instead of buffer.string.to_java_bytes seems to fix this problem, and I get the results I am expecting.

    if @binary_encoded
       #@on_event.call(event, buffer.string.to_java_bytes)
       @on_event.call(event, buffer.string)
    else
       @on_event.call(event, Base64.strict_encode64(buffer.string))
    end

Disclaimer: I am doing some custom development on your plugin to support my own use-case (its not in any public repo at this time, too early). In my use-case, I have an AVRO schema that I use to encapsulate the event in a logstash reception tier. I have various fields that are used in a backbone format, and the event goes into a 'message' field (and could be json, some binary payload, etc.). That said, the same behaviour occurs when I use avro_schema_registry.rb as per your current HEAD.

Possibly related to https://github.com/logstash-plugins/logstash-output-kafka/issues/123

Here is my configuration and testing; after fixing the issue.

My logstash output configuration:

input {
    tcp {
        host => "0.0.0.0"
        port => 5140
        mode => "server"
        codec => "json_lines"
    }
}

output {
  kafka {
    topic_id => "wapiti_backbone_sandpit"
    compression_type => "snappy"

    codec => avro_schema_registry {
      endpoint => "http://127.0.0.1:8081"
      subject_name => "wapiti_backbone_submitted-value"
      schema_version => "3"
      register_schema => false
      binary_encoded => true
    }

    value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
  }
}

Current version of schema (will change 'message' to bytes soon).

{
  "name": "wapiti_backbone",
  "namespace": "local.wapiti",
  "doc":"A log event on a Wapiti Backbone topic",
  "type": "record",

  "fields": [
    {
      "name":"submission_time",
      "type":"long",
      "logical-type":"timestamp-millis",
      "doc":"When first seen at log submission. ms since Unix epoch"
    },
    {
      "name":"submitted_from",
      "type":"string",
      "doc":"Source Hostname or IP at entry of log submission"
    },
    {
      "name":"originating_host",
      "type":"string",
      "doc":"Hostname of the host the log was first created on"
    },
    {
      "name":"vertical",
      "type":"string",
      "doc":"Business application, eg. 'corporate-website'"
    },
    {
      "name":"environment",
      "type":"string",
      "doc":"Business environment, eg. 'dev' or 'prod'"
    },
    {
      "name":"processing_key",
      "type":"string",
      "doc":"Processing key, eg. 'apache-httpd-access-combined'"
    },
    {
      "name":"message_format",
      "type":"string",
      "doc":"format of data contained in 'message' field, eg. 'json', 'syslog', ..."
    },
    {
      "name":"message",
      "type":"string",
      "doc":"The encapsulated message payload."
    }
  ]
}

Test input:

[vagrant@node-1 ~]$ echo "{\"breath_in\": \"$(date --iso-8601=ns)\"}" | nc 127.0.0.1 5140

Output when viewed using kafka-avro-console-consumer:

{"submission_time":0,"submitted_from":"SUBMITTED_FROM","originating_host":"ORIGINATING_HOST","vertical":"VERTICAL","environment":"ENVIRONMENT","processing_key":"PROCESSING_KEY","message_format":"json","message":"{\"host\":\"node-1\",\"@timestamp\":\"2018-06-18T01:27:59.221Z\",\"breath_in\":\"2018-06-18T13:27:59,081679157+1200\",\"@version\":\"1\",\"port\":48710}"}

cgiraldo commented 6 years ago

Check #17

4smirnovsn commented 4 years ago

I have the same problem, codec does't work in output: org.jruby.exceptions.NoMethodError: (NoMethodError) undefined methodto_java_bytes' for #<#:0x5a4d48a6>` And messages don't send to kafka

sleighzy commented 4 years ago

Just ran into this now as well.

@cgiraldo, I see your PR has been open for a long time now. Anything we can do to get it merged in and released?

sleighzy commented 4 years ago

Thanks for the patch @cgiraldo , I built and installed this and my system is running perfectly again.

JeroenvanKnippenberg commented 4 years ago

Same problem here. I use logstash-7.3.2.

@cgiraldo @sleighzy is there a way to build / install a fix for this issue?

ryananguiano commented 4 years ago

Fixed in 1.2.0

geraintwjones commented 3 years ago

There isn't a 1.2.0!

cgiraldo commented 3 years ago

There is not a 1.2.0 release in github, but there is in rubygems:

https://rubygems.org/gems/logstash-codec-avro_schema_registry/versions/1.2.0

geraintwjones commented 3 years ago

Thanks @cgiraldo. We'd like to make changes to support Apicurio, so where's the 1.2.0 source code?

cgiraldo commented 3 years ago

I think It is the master branch, @ryananguiano?

choykalun commented 3 years ago

I have the latest 1.2.0 release. I am encountering the same error in logstash when i have binary_encoded => true

[2021-09-13T14:34:33,680][ERROR][logstash.javapipeline    ][main] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :error=>"(NoMethodError) undefined method `to_java_bytes' for #<#<Class:0x38aa6237>:0x32adac90>", :exception=>Java::OrgJrubyExceptions::NoMethodError
geraintwjones commented 3 years ago

@cgiraldo @ryananguiano Regarding the previoius comment by @choykalun, he built his gem from the latest master branch and did not download it from rubygems. Version 1.2.0 from rubygems was built from latest in master, right?