grepplabs / kafka-proxy

Proxy connections to Kafka cluster. Connect through SOCKS Proxy, HTTP Proxy or to cluster running in Kubernetes.
Apache License 2.0
496 stars 86 forks source link

write a message and kafka report EOFException #73

Open chaoxxx opened 3 years ago

chaoxxx commented 3 years ago

hi, I have a kafka(v2.5.0) cluster(3 brokers ).I start up kafka-proxy(v0.2.8) in another machine,And send a message by kafka-console-producer ,the message does not write into topic,Kafka throw a java.io.EOFException.

kafka-console-producer

./kafka-console-producer.sh --broker-list proxyip:32400,proxyip:32401,proxyip:32402 --topic test
>123

kafka-console-producer log

[2020-11-15 05:41:34,272] WARN [Producer clientId=console-producer] Connection to node 3 (/0.0.0.0:32402) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,371] WARN [Producer clientId=console-producer] Connection to node 1 (/0.0.0.0:32400) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,472] WARN [Producer clientId=console-producer] Connection to node 1 (/0.0.0.0:32400) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,573] WARN [Producer clientId=console-producer] Connection to node 3 (/0.0.0.0:32402) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,675] WARN [Producer clientId=console-producer] Connection to node 2 (/0.0.0.0:32401) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,776] WARN [Producer clientId=console-producer] Connection to node 1 (/0.0.0.0:32400) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,876] WARN [Producer clientId=console-producer] Connection to node 2 (/0.0.0.0:32401) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,977] WARN [Producer clientId=console-producer] Connection to node 2 (/0.0.0.0:32401) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:35,078] WARN [Producer clientId=console-producer] Connection to node 1 (/0.0.0.0:32400) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

kafka-proxy

kafka-proxy server \
--bootstrap-server-mapping "broker1:9093,0.0.0.0:32400" \
--bootstrap-server-mapping "broker2:9093,0.0.0.0:32401" \
--bootstrap-server-mapping "broker3:9093,0.0.0.0:32402" \
--sasl-enable \
--sasl-username "admin" \
--sasl-password "admin" \
--log-level debug

kafka-proxy log

INFO[2020-11-15T05:41:32-05:00] New connection for broker3:9093        
DEBU[2020-11-15T05:41:32-05:00] Sending SaslHandshakeRequest mechanism: PLAIN  version: 0 
DEBU[2020-11-15T05:41:32-05:00] Successful SASL handshake. Available mechanisms: [PLAIN] 
DEBU[2020-11-15T05:41:32-05:00] Sending authentication opaque packets, mechanism PLAIN 
DEBU[2020-11-15T05:41:33-05:00] Kafka request key 18, version 3, length 52   
DEBU[2020-11-15T05:41:33-05:00] Kafka response key 18, version 3, length 348 
DEBU[2020-11-15T05:41:33-05:00] Kafka request key 3, version 9, length 32    
DEBU[2020-11-15T05:41:33-05:00] Kafka response key 3, version 9, length 112  
DEBU[2020-11-15T05:41:33-05:00] Address mappings broker=broker1:9093, listener=0.0.0.0:32401, advertised=0.0.0.0:32401 
DEBU[2020-11-15T05:41:33-05:00] Address mappings broker=broker2:9093, listener=0.0.0.0:32402, advertised=0.0.0.0:32402 
DEBU[2020-11-15T05:41:33-05:00] Address mappings broker=broker3:9093, listener=0.0.0.0:32400, advertised=0.0.0.0:32400 
INFO[2020-11-15T05:41:37-05:00] Client closed local connection on 192.168.4.60:32400 from broker3:40750 (broker3:9093)

kafka log

[2020-11-15 05:34:46,830] DEBUG Processor 3 listening to new connection from /proxyip:54714 (kafka.network.Processor)
[2020-11-15 05:34:46,830] DEBUG Accepted connection from /proxyip:54714 on /broker1:9093 and assigned it to processor 3, sendBufferSize [actual|requested]: [102400|102400] recvBufferSize [actual|requested]: [102400|102400] (
kafka.network.Acceptor)
[2020-11-15 05:34:46,830] DEBUG connections.max.reauth.ms for mechanism=PLAIN: 0 (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,830] DEBUG Set SASL server state to HANDSHAKE_OR_VERSIONS_REQUEST during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,830] DEBUG Handling Kafka request SASL_HANDSHAKE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,830] DEBUG Using SASL mechanism 'PLAIN' provided by client (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,843] DEBUG Set SASL server state to AUTHENTICATE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,843] DEBUG Authentication complete; session max lifetime from broker config=0 ms, no credential expiration; no session expiration, sending 0 ms to client (org.apache.kafka.common.security.authenticator.SaslServerAu
thenticator)
[2020-11-15 05:34:46,843] DEBUG Set SASL server state to COMPLETE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,843] DEBUG [SocketServer brokerId=1] Successfully authenticated with /proxyip (org.apache.kafka.common.network.Selector)
[2020-11-15 05:34:46,854] DEBUG [SocketServer brokerId=1] Connection with /proxyip disconnected (org.apache.kafka.common.network.Selector)
java.io.EOFException
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:97)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398)
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
        at kafka.network.Processor.poll(SocketServer.scala:861)
        at kafka.network.Processor.run(SocketServer.scala:760)
        at java.lang.Thread.run(Thread.java:748)

thanks fo any help

everesio commented 3 years ago

Hi @chaoxxx , could you provide your client and server/broker configuration ? Kind regards,