logstash-plugins / logstash-integration-kafka

Kafka Integration for Logstash, providing Input and Output Plugins
Apache License 2.0
32 stars 60 forks source link

Confluent avro using https + user / password not working #101

Closed m-a-leclercq closed 1 year ago

m-a-leclercq commented 3 years ago

Logstash information:

Please include the following information:

  1. Logstash version: 7.13.3
  2. Logstash installation source: Docker
  3. How is Logstash being run: Docker
  4. How was the Logstash Plugin installed: Upgraded to latest stable using logstash-plugin update logstash-integration-kafka (v10.8.1)

JVM (e.g. java -version): Bundled JDK

OS version (uname -a if on a Unix-like system): 3.10.0-1160.24.1.el7.x86_64 #1 SMP Thu Mar 25 21:21:56 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

Description of the problem including expected versus actual behavior: Using HTTPS + auth does not work. Switching to HTTP with auth works however.

Steps to reproduce:

input {
  kafka {
        bootstrap_servers => "mybroker.kafka:9094"
        group_id => "my_group"
        client_id => "my_client"
        topics => "mytopic"
        schema_registry_url => "https://my.registry:8081/"
        schema_registry_key => "myuser"
        schema_registry_secret => "mypassword"
        codec => "json"
        decorate_events => true
        auto_offset_reset => "earliest"
        jaas_path => "/path/to/kafka-jaas-input.conf"
        sasl_mechanism => "SCRAM-SHA-512"
        security_protocol => "SASL_SSL"
        ssl_truststore_location => "/path/to/certs/my.truststore.jks"
        ssl_truststore_password => "REDACTED"
        key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
    }
}

Provide logs (if relevant): Using bundled version with 7.13.3 :

Pipeline error {:pipeline_id=>"main", :exception=>#<LogStash::ConfigurationError: Schema registry service doesn't respond, error: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target>

Using latest (10.8.1) version of this plugin. https + authentication:

[2021-07-15T13:17:49,790][ERROR][logstash.inputs.kafka ] Unable to poll Kafka consumer {:kafka_error_message=>org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition MYTOPIC-0 at offset 23815. If needed, please seek past the record to continue consumption., :cause=>org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 7}

From the errors I had with 7.11.1 I Highly suspect logstash does not pass the auth header (see this SO article for more info)

Using latest (10.8.1) version of this plugin, http (Haproxy for SSL termination) + authentication It works!: HaProxy Logs showing 200 http code bolded

Jul 15 16:34:47 localhost haproxy[3142]: :40080 [15/Jul/2021:16:34:47.332] registry servers/srv1 0/0/65/24/89 200 5327 - - ---- 3/3/0/0/0 0/0 "GET /schemas/ids/7?fetchMaxId=false HTTP/1.1"

My Haproxy conf for this.

frontend registry
    mode http
    bind :80
    default_backend servers

backend servers
    mode http
    server srv1 my.registry.intres:8081 ssl verify none

Can't test right now with an https frontend as generating a cert in my org takes a bit of time. Will update with results if I can.

m-a-leclercq commented 3 years ago

Related issue that fixed the basic auth : https://github.com/logstash-plugins/logstash-integration-kafka/pull/94

It seems like this fix does not apply when https is used.

robbavey commented 3 years ago

@m-a-leclercq Thanks for updating this issue.

Can you do a couple of things for me:

Send full (redacted) logs for the failures you are seeing when using https on the latest release. Can you add the following lines to your config/jvm.options file:

  -Dssl.truststore.location => "/path/to/certs/my.truststore.jks"
  -Dssl.truststore.password => "REDACTED"

and let me know if you see any change of behaviour?

m-a-leclercq commented 3 years ago

Hi @robbavey ,

So I used this syntax for the jvm.options file because I suppose you mixed logstash / ruby notation and jvm.option while copy pasting my example above 😄

-Dssl.truststore.location="/path/to/certs/my.truststore.jks"
-Dssl.truststore.password="REDACTED"

I didn't get more info from docker logs while adding that so I also changed log.level: "debug" and got the following info from it:

[2021-07-21T09:17:15,594][ERROR][logstash.inputs.kafka    ] Unable to poll Kafka consumer {:kafka_error_message=>org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition MYTOPIC-1 at offset 24251. If needed, please seek past the record to continue consumption., :cause=>org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 7}
[2021-07-21T09:17:15,725][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2021-07-21T09:17:15,726][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2021-07-21T09:17:16,594][DEBUG][io.confluent.kafka.schemaregistry.client.rest.RestService] Sending GET with input null to https://my.registry:8081/schemas/ids/7?fetchMaxId=false
[2021-07-21T09:17:16,596][DEBUG][io.confluent.kafka.schemaregistry.client.rest.RestService] Sending GET with input null to https://my.registry:8081/schemas/ids/7?fetchMaxId=false
[2021-07-21T09:17:16,606][DEBUG][io.confluent.kafka.schemaregistry.client.rest.RestService] Sending GET with input null to https://my.registry:8081/schemas/ids/7?fetchMaxId=false
[2021-07-21T09:17:16,643][ERROR][logstash.inputs.kafka    ] Unable to poll Kafka consumer {:kafka_error_message=>org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition MYTOPIC-2 at offset 24386. If needed, please seek past the record to continue consumption., :cause=>org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 7}

Is this what you were looking for or did I miss something ?

This is with docker.elastic.co/logstash/logstash:7.13.3 and

bash-4.2$ logstash-plugin list --verbose | grep kafka
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
logstash-integration-kafka (10.8.1)
 ├── logstash-input-kafka
 └── logstash-output-kafka
robbavey commented 3 years ago

@m-a-leclercq Apologies! Nice guess, but I also missed another part from the properties that I wanted you to try... It should read:

-Djavax.net.ssl.truststore.password="REDACTED"
-Djavax.net.ssl.truststore.location="/path/to/certs/my.truststore.jks"

If this still doesn't work, can you also add

-Djavax.net.debug=ssl

Warning - this is very chatty. If you do that, can you paste any lines from the resulting debug log that begin with something like:

javax.net.ssl|ERROR|

Redacting any sensitive information if any appears.

m-a-leclercq commented 3 years ago

Hi @robbavey . That was an interesting dive on java network debugging!

I needed the -Djavax.net.debug=ssl to make it work. Logstash log level however didn't need to be debug.

Here is the log I believe will be of interest to you :

javax.net.ssl|DEBUG|30|kafka-input-worker-my_client_id-1|2021-07-22 13:34:08.100 UTC|Utilities.java:73|the previous server name in SNI (type=host_name (0), value=my.schema.registry) was replaced with (type=host_name (0), value=my.schema.registry)
javax.net.ssl|ERROR|30|kafka-input-worker-my_client_id-1|2021-07-22 13:34:08.127 UTC|TransportContext.java:341|Fatal (CERTIFICATE_UNKNOWN): PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target (
"throwable" : {
  sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
        at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)
        at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)
        at java.base/sun.security.validator.Validator.validate(Validator.java:264)
        at java.base/sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:313)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:222)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:129)
        at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:638)
        at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:473)
        at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:369)
        at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
        at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443)
        at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:421)
        at java.base/sun.security.ssl.TransportContext.dispatch(TransportContext.java:182)
        at java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:171)
        at java.base/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1418)
        at java.base/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1324)
        at java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:440)
        at java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:411)
        at java.base/sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:567)
        at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:197)
        at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1592)
        at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1520)
        at java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:527)
        at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:334)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:272)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:351)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:659)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:641)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:273)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:97)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:76)
        at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173)
        at jdk.internal.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:456)
        at org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:317)
        at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:42)
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_8_dot_1_minus_java.lib.logstash.inputs.kafka.RUBY$method$do_poll$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.8.1-java/lib/logstash/inputs/kafka.rb:328)
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_8_dot_1_minus_java.lib.logstash.inputs.kafka.RUBY$block$thread_runner$1(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.8.1-java/lib/logstash/inputs/kafka.rb:313)
        at org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138)
        at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)
        at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52)
        at org.jruby.runtime.Block.call(Block.java:139)
        at org.jruby.RubyProc.call(RubyProc.java:318)
        at org.jruby.javasupport.Java$ProcToInterface.callProc(Java.java:1136)
        at org.jruby.javasupport.Java$ProcToInterface.access$300(Java.java:1113)
        at org.jruby.javasupport.Java$ProcToInterface$ConcreteMethod.call(Java.java:1174)
        at org.jruby.gen.InterfaceImpl1624388649.run(org/jruby/gen/InterfaceImpl1624388649.gen:13)
        at java.base/java.lang.Thread.run(Thread.java:829)
  Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
        at java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
        at java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
        at java.base/java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297)
        at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434)
        ... 62 more}

)
javax.net.ssl|DEBUG|30|kafka-input-worker-my_client_id-1|2021-07-22 13:34:08.127 UTC|SSLSocketImpl.java:1638|close the underlying socket
javax.net.ssl|DEBUG|30|kafka-input-worker-my_client_id-1|2021-07-22 13:34:08.127 UTC|SSLSocketImpl.java:1657|close the SSL connection (initiative)

I have a bunch of these then the classic Error retrieving Avro unknown schema for id

While I have redacted both host names appearing in the line that talks about SNI and hostname replacement, I can assure you that both the initial and supposedly replaced hostname are exactly the same.

Maybe there's somethink funky going on with DNS in my org....

Edit: I know for a fact the keystore contains the correct AC to be able to trust the schema registry. The team that maintains that kafka + schema registry also provides a pure java implementation that uses the specific truststore I'm using.

robbavey commented 3 years ago

@m-a-leclercq Thanks, that helps. That confirms that this is a certificate issue, rather than a continuation of the basic auth issue that was resolved by #94.

EDIT: To clarify - I think this is an issue with how we are handling certs wrt the schema registry, rather than a cert issue on your end

m-a-leclercq commented 3 years ago

Thank you for your precious time and help Rob.

As for my use case, we have decided to forego logstash in favor of a pure java implementation as the Kafka / Avro owners will soon start encrypting some of the fields inside kafka messages, and that would add another layer of complexity that I don't believe logstash could handle.

Should we do anything about the documentation of the plugin? I think if someone could reproduce my error, we could confirm it is indeed a certificate issue and we could amend the docs to inform potential users that the current implementation does not support HTTPS .

Debugging all of this has been an ordeal I do not wish on anyone else, and if we could make sure this is indeed the problem and clarify the documentation, I think it would be worth it.

This would require spawning an avro schema registry with https enabled and a self-signed cert I imagine. Maybe I could help with that?

Thanks.

robbavey commented 3 years ago

@m-a-leclercq Your patience and help here is much appreciated, and any help you would be willing to provide going forward would be gratefully received.

To clarify - my suspicion here is that we are not setting some values when configuring the kafka consumer - namely a bunch of properties prefixed by schema.registry.ssl.keystore.location referred to in this help section on the confluent site. (The client config is a little way down). I would have expected setting the "legacy" -Djavax.net.ssl... settings would have worked in a similar way, but my experience locally was similar to yours.

drmaas commented 3 years ago

I am seeing the same issue w.r.t. ssl errors when attempting to connect to a schema registry with a self-signed cert. This previously worked with self-signed certs in version 7.9.3 using the schema_registry_url 3rd party plugin. https://github.com/revpoint/logstash-codec-avro_schema_registry

andsel commented 1 year ago

Note for future, the correct properties are:

javax.net.ssl.trustStorePassword
javax.net.ssl.trustStore

and not

javax.net.ssl.truststore.password
javax.net.ssl.truststore.location