logstash-plugins / logstash-integration-kafka

Kafka Integration for Logstash, providing Input and Output Plugins
Apache License 2.0
32 stars 60 forks source link

Error in retrieval of schema registry #63

Closed andsel closed 3 years ago

andsel commented 3 years ago

filter { }

output { stdout { codec => rubydebug } }

- Steps to Reproduce:
  - run kafka and schema registry with https://github.com/logstash-plugins/logstash-integration-kafka/blob/v10.6.0/kafka_test_setup.sh
  - setup a schema

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\": \"record\",\"name\":\"Customer\", \"namespace\": \"com.example\", \"fields\": [{\"name\":\"first_name\",\"type\": {\"type\":\"string\", \"avro.java.string\":\"String\"},\"doc\":\"First Name of Customer\"}, {\"name\":\"last_name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Last Name of Customer\"}, {\"name\":\"age\",\"type\":\"int\",\"doc\": \"Age at the time of registration\"},{\"name\":\"height\",\"type\":\"float\",\"doc\":\"Height at the time of registration in cm\"},{\"name\":\"weight\",\"type\":\"float\",\"doc\":\"Weight at the time of registration in kg\"},{\"name\":\"automated_email\",\"type\":\"boolean\",\"doc\":\"Field indicating if the user is enrolled in marketing emails\",\"default\":true}],\"version\":\"1\"}"}' "http://localhost:8081/subjects/customer-avro-value/versions"


  - install the integration plugin on Logstash
 -  start Logstash with previous config file. If fails to start with:

[2020-11-29T20:16:26,060][DEBUG][logstash.javapipeline ][main] Pipeline terminated by worker error {:pipeline_id=>"main", :exception=>#, :backtrace=>["json/ext/Parser.java:173:in initialize'", "json/ext/Parser.java:150:innew'", "/tmp/7.9.2/libexec/vendor/bundle/jruby/2.5.0/gems/json-1.8.6-java/lib/json/common.rb:155:in parse'", "/tmp/7.9.2/libexec/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.6.0-java/lib/logstash/plugin_mixins/common.rb:61:incheck_for_schema_registry_connectivity_and_subjects'", "/tmp/7.9.2/libexec/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.6.0-java/lib/logstash/plugin_mixins/common.rb:32:in check_schema_registry_parameters'", "/tmp/7.9.2/libexec/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.6.0-java/lib/logstash/inputs/kafka.rb:249:inregister'", "/tmp/7.9.2/libexec/logstash-core/lib/logstash/java_pipeline.rb:226:in block in register_plugins'", "org/jruby/RubyArray.java:1809:ineach'", "/tmp/7.9.2/libexec/logstash-core/lib/logstash/java_pipeline.rb:225:in register_plugins'", "/tmp/7.9.2/libexec/logstash-core/lib/logstash/java_pipeline.rb:359:instart_inputs'", "/tmp/7.9.2/libexec/logstash-core/lib/logstash/java_pipeline.rb:309:in start_workers'", "/tmp/7.9.2/libexec/logstash-core/lib/logstash/java_pipeline.rb:183:inrun'", "/tmp/7.9.2/libexec/logstash-core/lib/logstash/java_pipeline.rb:134:in `block in start'"], "pipeline.sources"=>["/tmp/pipeline.conf"], :thread=>"#"} [2020-11-29T20:16:26,087][ERROR][logstash.agent ] Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create

, action_result: false", :backtrace=>nil}



The reason is related to https://github.com/logstash-plugins/logstash-integration-kafka/blob/v10.6.0/lib/logstash/plugin_mixins/common.rb#L48-L55. When Faraday customize the connection the adapter as to be reassinged with `conn.adapter Faraday.default_adapter` else Faraday doesn't send the request and the `response.body` is Nil, generating the error.

Related to: 
 - #51 
- https://github.com/logstash-plugins/logstash-input-kafka/pull/239#issuecomment-734330093

Some references for this Faraday behavior
 - https://github.com/lostisland/faraday/issues/121
- https://blog.thecodewhisperer.com/permalink/losing-time-to-faraday
andsel commented 3 years ago

This has been fixed with version 10.7.0 of logstash-integration-kafka