dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.63k stars 1.41k forks source link

Client disconnected when Kafka 2.6.0 with SSL used. #2160

Open mpastecki opened 4 years ago

mpastecki commented 4 years ago

Description

Using the highlevel KafkaConsumer or KafkaProducer with Kafka 2.6.0 configured with an SSL listener ends with multiple client disconnection. The messages are produced/consumed eventually.

It looks like the socket is closed after the BrokerConnection callback method invokes the _conn_state_change method.

This only happens if SSL is used, I wasn't able to reproduce using PLAINTEXT.

I checked with confluent-kafka python library and it seems to be working fine.

It doesn't happen with previous Kafka versions.

How to reproduce

Code snippet:

#! /usr/bin/env python3

from kafka import KafkaConsumer

consumer = KafkaConsumer(bootstrap_servers="localhost:29092",
                         ssl_keyfile="keyfile.key",
                         ssl_certfile="certfile.pem",
                         ssl_cafile="cafile.crt",
                         ssl_password="ssl_password",
                         security_protocol="SSL", ssl_check_hostname=False, auto_offset_reset='earliest',
                         group_id="group_id", consumer_timeout_ms=10000, enable_auto_commit=True)

consumer.subscribe(['3p3r'])

try:
  print(next(consumer))
except:
  pass

Error returned:

2020-11-03 13:23:02,124 [client_async] DEBUG Node 2 connected
2020-11-03 13:23:02,124 [client_async] DEBUG Node 2 connected
2020-11-03 13:23:02,131 [conn] INFO <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv4 ('127.0.0.1', 29092)]>: Closing connection. KafkaConnectionError: Socket EVENT_READ without in-flight-requests
2020-11-03 13:23:02,131 [conn] INFO <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv4 ('127.0.0.1', 29092)]>: Closing connection. KafkaConnectionError: Socket EVENT_READ without in-flight-requests
2020-11-03 13:23:02,131 [conn] DEBUG <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv4 ('127.0.0.1', 29092)]>: reconnect backoff 0.05202506418997614 after 1 failures
2020-11-03 13:23:02,131 [conn] DEBUG <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv4 ('127.0.0.1', 29092)]>: reconnect backoff 0.05202506418997614 after 1 failures
2020-11-03 13:23:02,132 [client_async] WARNING Node 2 connection failed -- refreshing metadata
2020-11-03 13:23:02,132 [client_async] WARNING Node 2 connection failed -- refreshing metadata
(...)
2020-11-03 13:23:02,441 [conn] DEBUG <BrokerConnection node_id=coordinator-2 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]> Response 5 (104.11787033081055 ms): HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
2020-11-03 13:23:02,441 [conn] DEBUG <BrokerConnection node_id=coordinator-2 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]> Response 5 (104.11787033081055 ms): HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
2020-11-03 13:23:02,441 [base] DEBUG Received successful heartbeat response for group e2e-test-group-temp
2020-11-03 13:23:02,441 [base] DEBUG Received successful heartbeat response for group e2e-test-group-temp
2020-11-03 13:23:02,442 [conn] INFO <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]>: Closing connection. KafkaConnectionError: Socket EVENT_READ without in-flight-requests
2020-11-03 13:23:02,442 [conn] INFO <BrokerConnection node_id=2 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]>: Closing connection. KafkaConnectionError: Socket EVENT_READ without in-flight-requests
2020-11-03 13:23:02,442 [client_async] WARNING Node 2 connection failed -- refreshing metadata
2020-11-03 13:23:02,442 [client_async] WARNING Node 2 connection failed -- refreshing metadata

Kafka Brokers errors:

[2020-11-03 12:25:24,686] DEBUG [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=broker-2-fetcher-0, correlationId=1897) and timeout 30000 to node 1: {replica_id=2,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=383646810,session_epoch=1895,topics=[],forgotten_topics_data=[],rack_id=} (org.apache.kafka.clients.NetworkClient)
[2020-11-03 12:25:24,693] DEBUG [SocketServer brokerId=2] Connection with /0:0:0:0:0:0:0:1 disconnected (org.apache.kafka.common.network.Selector)
java.io.IOException: Broken pipe
    at java.base/sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
    at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:113)
    at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:79)
    at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:50)
    at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:466)
    at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:239)
    at org.apache.kafka.common.network.SslTransportLayer.write(SslTransportLayer.java:697)
    at org.apache.kafka.common.network.SslTransportLayer.write(SslTransportLayer.java:738)
    at org.apache.kafka.common.network.SslTransportLayer.write(SslTransportLayer.java:763)
    at org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:60)
    at org.apache.kafka.common.record.MultiRecordsSend.writeTo(MultiRecordsSend.java:93)
    at org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:425)
    at org.apache.kafka.common.network.Selector.write(Selector.java:648)
    at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java:641)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:597)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
    at kafka.network.Processor.poll(SocketServer.scala:913)
    at kafka.network.Processor.run(SocketServer.scala:816)
    at java.base/java.lang.Thread.run(Thread.java:834)
[2020-11-03 12:25:24,693] DEBUG [SslTransportLayer channelId=0:0:0:0:0:0:0:1:29092-0:0:0:0:0:0:0:1:38210-28 key=channel=java.nio.channels.SocketChannel[connected local=/0:0:0:0:0:0:0:1:29092 remote=/0:0:0:0:0:0:0:1:38210], selector=sun.nio.ch.EPollSelectorImpl@68dbbefc, interestOps=1, readyOps=0] Failed to send SSL Close message (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
    at java.base/sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
    at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:113)
    at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:79)
    at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:50)
    at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:466)
    at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:239)
    at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:182)
    at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:934)
    at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:154)
    at org.apache.kafka.common.network.Selector.doClose(Selector.java:955)
    at org.apache.kafka.common.network.Selector.close(Selector.java:939)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:629)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
    at kafka.network.Processor.poll(SocketServer.scala:913)
    at kafka.network.Processor.run(SocketServer.scala:816)
    at java.base/java.lang.Thread.run(Thread.java:834)
[2020-11-03 12:25:25,117] DEBUG [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Received FETCH response from node 3 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=broker-2-fetcher-0, correlationId=1897): org.apache.kafka.common.requests.FetchResponse@40c845b8 (org.apache.kafka.clients.NetworkClient)

Checklist

Thanks for your help!

jpramos123 commented 2 years ago

I'm facing a similar issue. It is not able to connect when using SSL.