shah-smit / observability-spring-demo

https://youtu.be/gJ9f32PyBnM
0 stars 0 forks source link

Logstash Kafka Input throwing error when using registry_schema_url #12

Open shah-smit opened 3 years ago

shah-smit commented 3 years ago

logstash.conf.v1_5 file:

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["customer-avro"]
    schema_registry_url => "http://127.0.0.1:8081"
  }
}

filter { }

output {
   stdout {  }
}

When running

bin/logstash -f /usr/local/etc/logstash/logstash.conf.v1_5

I am getting below error:

Using JAVA_HOME defined java: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home
WARNING, using JAVA_HOME while Logstash distribution comes with a bundled JDK
Sending Logstash logs to /Users/Smit/Documents/Dev/ELK/logstash-7.10.0/logs which is now configured via log4j2.properties
[2020-11-26T23:01:50,352][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.10.0", "jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc Java HotSpot(TM) 64-Bit Server VM 25.65-b01 on 1.8.0_65-b17 +indy +jit [darwin-x86_64]"}
[2020-11-26T23:01:50,605][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-11-26T23:01:52,524][INFO ][org.reflections.Reflections] Reflections took 37 ms to scan 1 urls, producing 23 keys and 47 values
[2020-11-26T23:01:54,069][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["/usr/local/etc/logstash/logstash.conf.v1_5"], :thread=>"#<Thread:0x48e1ef48 run>"}
[2020-11-26T23:01:55,118][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>1.03}
WARNING: No adapter was configured for this request
[2020-11-26T23:01:55,537][ERROR][logstash.javapipeline    ][main] Pipeline error {:pipeline_id=>"main", :exception=>#<TypeError: no implicit conversion of nil into String>, :backtrace=>["json/ext/Parser.java:173:in `initialize'", "json/ext/Parser.java:150:in `new'", "/Users/Smit/Documents/Dev/ELK/logstash-7.10.0/vendor/bundle/jruby/2.5.0/gems/json-1.8.6-java/lib/json/common.rb:155:in `parse'", "/Users/Smit/Documents/Dev/ELK/logstash-7.10.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.6.0-java/lib/logstash/plugin_mixins/common.rb:61:in `check_for_schema_registry_connectivity_and_subjects'", "/Users/Smit/Documents/Dev/ELK/logstash-7.10.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.6.0-java/lib/logstash/plugin_mixins/common.rb:32:in `check_schema_registry_parameters'", "/Users/Smit/Documents/Dev/ELK/logstash-7.10.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.6.0-java/lib/logstash/inputs/kafka.rb:249:in `register'", "/Users/Smit/Documents/Dev/ELK/logstash-7.10.0/logstash-core/lib/logstash/java_pipeline.rb:228:in `block in register_plugins'", "org/jruby/RubyArray.java:1809:in `each'", "/Users/Smit/Documents/Dev/ELK/logstash-7.10.0/logstash-core/lib/logstash/java_pipeline.rb:227:in `register_plugins'", "/Users/Smit/Documents/Dev/ELK/logstash-7.10.0/logstash-core/lib/logstash/java_pipeline.rb:386:in `start_inputs'", "/Users/Smit/Documents/Dev/ELK/logstash-7.10.0/logstash-core/lib/logstash/java_pipeline.rb:311:in `start_workers'", "/Users/Smit/Documents/Dev/ELK/logstash-7.10.0/logstash-core/lib/logstash/java_pipeline.rb:185:in `run'", "/Users/Smit/Documents/Dev/ELK/logstash-7.10.0/logstash-core/lib/logstash/java_pipeline.rb:137:in `block in start'"], "pipeline.sources"=>["/usr/local/etc/logstash/logstash.conf.v1_5"], :thread=>"#<Thread:0x48e1ef48 run>"}
[2020-11-26T23:01:55,546][INFO ][logstash.javapipeline    ][main] Pipeline terminated {"pipeline.id"=>"main"}
[2020-11-26T23:01:55,578][ERROR][logstash.agent           ] Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", :backtrace=>nil}
[2020-11-26T23:01:55,843][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2020-11-26T23:02:00,832][INFO ][logstash.runner          ] Logstash shut down.
[2020-11-26T23:02:00,849][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

Here is the way to get a schema registered under schema_registry:

curl --silent -X GET http://localhost:8081/subjects/ | jq .

Output:

[
  "tweet-avro-value",
  "test-avro-value",
  "customer-avro-value"
]

Also, if particular schema,

curl --silent -X GET http://localhost:8081/subjects/customer-avro-value/versions/latest | jq .
{
  "subject": "customer-avro-value",
  "version": 1,
  "id": 22,
  "schema": "{\"type\":\"record\",\"name\":\"Customer\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"first_name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"First Name of Customer\"},{\"name\":\"last_name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Last Name of Customer\"},{\"name\":\"age\",\"type\":\"int\",\"doc\":\"Age at the time of registration\"},{\"name\":\"height\",\"type\":\"float\",\"doc\":\"Height at the time of registration in cm\"},{\"name\":\"weight\",\"type\":\"float\",\"doc\":\"Weight at the time of registration in kg\"},{\"name\":\"automated_email\",\"type\":\"boolean\",\"doc\":\"Field indicating if the user is enrolled in marketing emails\",\"default\":true}],\"version\":\"1\"}"
}
shah-smit commented 3 years ago

Issue is tracked here: https://github.com/logstash-plugins/logstash-input-kafka/pull/239#issuecomment-734330093

shah-smit commented 3 years ago

https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html#schema-registry-onprem-tutorial

shah-smit commented 3 years ago

I have tried externalising the particular failing script, but that particular is not throwing any error:

require 'faraday'
require 'json'

def test_schema_registry
    client = Faraday.new("http://127.0.0.1:8081") do |conn|
        puts conn
    end
    begin
        response = client.get('/subjects')
        registered_subjects = JSON.parse response.body
        puts registered_subjects
        expected_subjects = ["customer-avro"].map { |t| "#{t}-value"}
        puts expected_subjects
        if (expected_subjects & registered_subjects).size != expected_subjects.size
            undefined_topic_subjects = expected_subjects - registered_subjects
            puts "ERRPRRRRRRRR"
        end
      rescue Faraday::Error => e
        raise LogStash::ConfigurationError.new("Schema registry service doesn't respond, error: #{e.message}")
      end
end

test_schema_registry
shah-smit commented 3 years ago

Kafka Config for Producing the Message

import com.example.Customer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaAvroJavaProducerV1Demo {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // normal producer
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("acks", "all");
        properties.setProperty("retries", "10");
        // avro part
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
        properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");

        Producer<String, Customer> producer = new KafkaProducer<String, Customer>(properties);

        String topic = "customer-avro";

        // copied from avro examples
        Customer customer = Customer.newBuilder()
                .setAge(3)
                .setAutomatedEmail(true)
                .setFirstName("Smit")
                .setLastName("Shah")
                .setHeight(181f)
                .setWeight(100f)
                .build();

        ProducerRecord<String, Customer> producerRecord = new ProducerRecord<String, Customer>(
                topic, customer
        );

        System.out.println(customer);
        producer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println(metadata);
                } else {
                    exception.printStackTrace();
                }
            }
        });

        producer.flush();
        producer.close();

    }
}