Open anmironov opened 1 year ago
@anmironov
If I understand correctly you are running Kafka 2.6.1
(which JDK are you using to run it?) and with logstash-integration-kafka
10.11.0
the TLS connection is successful while with 10.12.0
not.
10.11.0
ships Kafka client 2.5.1
10.12.0
ships Kafka client 2.8.1
Please could try to connect with TLS with Kafka console clients, for both Kafka client versions, and check that with the same TLS settings one success and the other fails?
After downloading and unpackig the Kafka distributions, for example:
wget https://archive.apache.org/dist/kafka/2.5.1/kafka_2.13-2.5.1.tgz
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz
create a client.properties
containing:
security.protocol=SSL
ssl.truststore.location=/path/to/clienttruststore.jks
ssl.truststore.password=changeit
then try with the console consumer clients:
kafka_2.13-2.5.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic "tls_test_topic" --from-beginning --group logstash --consumer.config "/path/to//client.properties"
kafka_2.13-2.8.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic "tls_test_topic" --from-beginning --group logstash --consumer.config "/path/to//client.properties"
It's important the for the command line you use the same JVM used by Logstash and all the security settings are the same used inside the Logstash config.
I've tried with the following steps and wasn't able to reproduce:
Download generate broker keystore and client's truststore:
echo "Generating broker key store"
keytool -genkey -alias kafka_broker -keyalg RSA -keystore kafka_broker.jks -keypass changeit -storepass changeit -validity 365 -keysize 2048 -dname "CN=localhost, OU=John Doe, O=Acme Inc, L=Unknown, ST=Unknown, C=IT"
echo "Exporting broker certificate"
keytool -exportcert -rfc -keystore kafka_broker.jks -storepass changeit -alias kafka_broker -file broker_certificate.pem
echo "Creating client's truststore and importing schema registry's certificate"
keytool -import -trustcacerts -file broker_certificate.pem -keypass changeit -storepass changeit -keystore clienttruststore.jks -noprompt
Downloaded Kafka 2.6.2
with
wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.13-2.6.2.tgz
tar zxf kafka_2.13-2.6.2.tgz
Edit broker's kafka_2.13-2.6.2/config/server.properties
adding the following:
listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
ssl.keystore.location=/path/to/kafka_broker.jks
ssl.keystore.password=changeit
ssl.key.password=changeit
Launch the broker with:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
Checked TLS producer and consumer connectivity with:
kafka_2.13-2.8.1/bin/kafka-console-producer.sh --broker-list localhost:9093 --topic "logstash_integration_andsel_test" --producer.config "/path/to/client.properties"
kafka_2.13-2.5.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic "logstash_integration_andsel_test" --from-beginning --group logstash --consumer.config "/path/to/client.properties"
Used the pipeline
input {
kafka {
bootstrap_servers => ["localhost:9093"]
topics => ["logstash_integration_andsel_test"]
group_id => "logstash"
consumer_threads => 2
security_protocol => "SSL"
ssl_truststore_location => "/path/to/clienttruststore.jks"
ssl_truststore_password => "changeit"
ssl_truststore_type => "jks"
}
}
output {
stdout {
codec => rubydebug
}
}
Raised up the heap space in /tmp/logstash-8.2.3/config/jvm.options
Verified with Kafka plugin 10.10.0
on Logstash 8.2.3
/tmp/logstash-8.2.3/bin/logstash-plugin list --verbose logstash-input-kafka
/tmp/logstash-8.2.3/bin/logstash -f `pwd`/test_tls_pipeline.conf
Once connects, try with the previous kafka_2.13-2.8.1/bin/kafka-console-producer.sh
to send some data.
Updates the logstash-integration-kafka
with:
> /tmp/logstash-8.2.3/bin/logstash-plugin update logstash-integration-kafka
Updated logstash-integration-kafka 10.10.0 to 10.12.0
and testing again with the above pipeline and console producer and messages flows correctly from producer down to Logstash Kafka input.
@anmironov if the test above works for you but you have a reproducer of the problem:
-Djavax.net.debug=ssl
Logstash information:
LS upgraded 7.17=>8.6. Kafka-clients used by Logstash (2.8) and Kafka broker (2.6.2) are version aligned.
[2023-01-30T16:20:11,010][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][e293a3fd8f07b3dae7da3b4e432b1be2cfe83f1cbe584d82c1641c319f8af5d4] [Consumer clientId=logstash-0, groupId=elk-consumer-group] FindCoordinator request hit fatal exception org.apache.kafka.common.errors.SslAuthenticationException: Failed to process post-handshake messages Caused by: [javax.net](http://javax.net/).ssl.SSLException: Tag mismatch!
JVM :
error
``` [2023-02-03T09:16:48,924][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][des_activity][6f070654d30e815d2be7967fa85898f0ee8c6cadd5cdf1060acf195dac29e275] [Consumer clientId=logstash-0, groupId=pre-elk-consumer-group] FindCoordinator request failed due to {} org.apache.kafka.common.errors.SslAuthenticationException: Failed to process post-handshake messages Caused by: javax.net.ssl.SSLException: Tag mismatch! at sun.security.ssl.Alert.createSSLException(Alert.java:133) ~[?:?] at sun.security.ssl.TransportContext.fatal(TransportContext.java:371) ~[?:?] at sun.security.ssl.TransportContext.fatal(TransportContext.java:314) ~[?:?] at sun.security.ssl.TransportContext.fatal(TransportContext.java:309) ~[?:?] at sun.security.ssl.SSLTransport.decode(SSLTransport.java:123) ~[?:?] at sun.security.ssl.SSLEngineImpl.decode(SSLEngineImpl.java:736) ~[?:?] at sun.security.ssl.SSLEngineImpl.readRecord(SSLEngineImpl.java:691) ~[?:?] at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:506) ~[?:?] at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:482) ~[?:?] at javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:679) ~[?:?] at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:567) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:95) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.common.network.Selector.poll(Selector.java:481) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) [kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) [kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) [kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245) [kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) [kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261) [kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1232) [kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1165) [kafka-clients-2.8.1.jar:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?] at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:457) [jruby.jar:?] at org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:318) [jruby.jar:?] at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:42) [jruby.jar:?] at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:372) [jruby.jar:?] at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:175) [jruby.jar:?] at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:316) [jruby.jar:?] at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72) [jruby.jar:?] at org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:86) [jruby.jar:?] at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:201) [jruby.jar:?] at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:188) [jruby.jar:?] at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:218) [jruby.jar:?] at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:372) [jruby.jar:?] at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:175) [jruby.jar:?] at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:316) [jruby.jar:?] at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72) [jruby.jar:?] at org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:116) [jruby.jar:?] at org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136) [jruby.jar:?] at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66) [jruby.jar:?] at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58) [jruby.jar:?] at org.jruby.runtime.Block.call(Block.java:143) [jruby.jar:?] at org.jruby.RubyProc.call(RubyProc.java:309) [jruby.jar:?] at org.jruby.javasupport.Java$ProcToInterface.callProc(Java.java:1232) [jruby.jar:?] at org.jruby.javasupport.Java$ProcToInterface.access$300(Java.java:1209) [jruby.jar:?] at org.jruby.javasupport.Java$ProcToInterface$ConcreteMethod.call(Java.java:1270) [jruby.jar:?] at org.jruby.gen.InterfaceImpl412716035.run(org/jruby/gen/InterfaceImpl412716035.gen:13) [jruby.jar:?] at java.lang.Thread.run(Thread.java:833) [?:?] Caused by: javax.crypto.AEADBadTagException: Tag mismatch! at com.sun.crypto.provider.GaloisCounterMode$GCMDecrypt.doFinal(GaloisCounterMode.java:1486) ~[?:?] at com.sun.crypto.provider.GaloisCounterMode.engineDoFinal(GaloisCounterMode.java:447) ~[?:?] at javax.crypto.Cipher.doFinal(Cipher.java:2500) ~[?:?] at sun.security.ssl.SSLCipher$T13GcmReadCipherGenerator$GcmReadCipher.decrypt(SSLCipher.java:1933) ~[?:?] at sun.security.ssl.SSLEngineInputRecord.decodeInputRecord(SSLEngineInputRecord.java:239) ~[?:?] at sun.security.ssl.SSLEngineInputRecord.decode(SSLEngineInputRecord.java:196) ~[?:?] at sun.security.ssl.SSLEngineInputRecord.decode(SSLEngineInputRecord.java:159) ~[?:?] at sun.security.ssl.SSLTransport.decode(SSLTransport.java:111) ~[?:?] at sun.security.ssl.SSLEngineImpl.decode(SSLEngineImpl.java:736) ~[?:?] at sun.security.ssl.SSLEngineImpl.readRecord(SSLEngineImpl.java:691) ~[?:?] at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:506) ~[?:?] at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:482) ~[?:?] at javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:679) ~[?:?] at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:567) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:95) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.common.network.Selector.poll(Selector.java:481) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) [kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1232) ~[kafka-clients-2.8.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1165) ~[kafka-clients-2.8.1.jar:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?] at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:457) ~[jruby.jar:?] at org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:318) ~[jruby.jar:?] at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:42) ~[jruby.jar:?] at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:372) ~[jruby.jar:?] at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:175) ~[jruby.jar:?] at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:316) ~[jruby.jar:?] at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72) ~[jruby.jar:?] at org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:86) ~[jruby.jar:?] at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:201) ~[jruby.jar:?] at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:188) ~[jruby.jar:?] at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:218) ~[jruby.jar:?] at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:372) ~[jruby.jar:?] at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:175) ~[jruby.jar:?] at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:316) ~[jruby.jar:?] at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72) ~[jruby.jar:?] at org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:116) ~[jruby.jar:?] at org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136) ~[jruby.jar:?] at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66) ~[jruby.jar:?] at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58) ~[jruby.jar:?] at org.jruby.runtime.Block.call(Block.java:143) ~[jruby.jar:?] at org.jruby.RubyProc.call(RubyProc.java:309) ~[jruby.jar:?] at org.jruby.javasupport.Java$ProcToInterface.callProc(Java.java:1232) ~[jruby.jar:?] at org.jruby.javasupport.Java$ProcToInterface.access$300(Java.java:1209) ~[jruby.jar:?] at org.jruby.javasupport.Java$ProcToInterface$ConcreteMethod.call(Java.java:1270) ~[jruby.jar:?] at org.jruby.gen.InterfaceImpl412716035.run(org/jruby/gen/InterfaceImpl412716035.gen:13) ~[jruby.jar:?] at java.lang.Thread.run(Thread.java:833) ~[?:?] ```cc: @kaisecheng