confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
277 stars 3.15k forks source link

Message timeouts on slow networks #1958

Closed yorksen closed 5 years ago

yorksen commented 6 years ago

Description

I have a kafka cluster which have 3 nodes deployed on the same vm(32GB Ram, 2.50GHz * 8 CPU, Ubuntu 14.04.4 LTS)。when I run a test program which has 8 threads and each thread produce 100k messages(each message 300bytes),I get error messages below: %3|1534731468.117|FAIL|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: 8 request(s) timed out: disconnect %3|1534731468.118|ERROR|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: 8 request(s) timed out: disconnect %3|1534731468.380|FAIL|rdkafka#producer-1| [thrd:ssl://localhost:9092/bootstrap]: ssl://localhost:9092/0: 5 request(s) timed out: disconnect %3|1534731468.380|ERROR|rdkafka#producer-1| [thrd:ssl://localhost:9092/bootstrap]: ssl://localhost:9092/0: 5 request(s) timed out: disconnect

and error messages from kafka console below(when ssl enabled): [2018-08-21 09:47:55,404] WARN Failed to send SSL Close message (org.apache.kafka.common.network.SslTransportLayer) java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:209) at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:172) at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718) at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61) at org.apache.kafka.common.network.Selector.doClose(Selector.java:746) at org.apache.kafka.common.network.Selector.close(Selector.java:734) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532) at org.apache.kafka.common.network.Selector.poll(Selector.java:424) at kafka.network.Processor.poll(SocketServer.scala:628) at kafka.network.Processor.run(SocketServer.scala:545) at java.lang.Thread.run(Thread.java:748) ...... [2018-08-21 09:47:58,082] WARN Failed to send SSL Close message (org.apache.kafka.common.network.SslTransportLayer) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:209) at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:172) at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718) at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61) at org.apache.kafka.common.network.Selector.doClose(Selector.java:746) at org.apache.kafka.common.network.Selector.close(Selector.java:734) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532) at org.apache.kafka.common.network.Selector.poll(Selector.java:424) at kafka.network.Processor.poll(SocketServer.scala:628) at kafka.network.Processor.run(SocketServer.scala:545) at java.lang.Thread.run(Thread.java:748)

It's occured frequently under plaintext mode(I means sometimes it's ok),and comes more frequently when ssl enabled. Maybe it's occured only if the host machine of my vm is busy, I guess, because it's deployed in a private cloud and its host machine may be used by somebody else at the same time.

the properties of rdkafka producer are like this: static mq_option g_producer_options[] = { {"queue.buffering.max.messages", "10000000"}, {"acks", "0"}, {"linger.ms", "0"}, {"compression.codec", "none"}, {"reconnect.backoff.jitter.ms", "1"}, {"socket.keepalive.enable", "true"}, {"message.timeout.ms", "300000"}, {"session.timeout.ms", "300000"}, //{"debug", "protocol"}, }; and when "debug = protocol" is enabled, I get error messages below: %7|1534729805.193|SEND|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: Sent partial ProduceRequest (v3, 933368+16384/996724 bytes, CorrId 446) %7|1534729805.193|SEND|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: Sent partial ProduceRequest (v3, 949752+16384/996724 bytes, CorrId 446) %7|1534729805.193|SEND|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: Sent partial ProduceRequest (v3, 966136+16384/996724 bytes, CorrId 446) %7|1534729805.194|SEND|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: Sent ProduceRequest (v3, 996724 bytes @ 982520, CorrId 446) %7|1534729805.271|RECV|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: Received ProduceResponse (v3, 66 bytes, CorrId 421, rtt 756.14ms) %7|1534729805.274|RECV|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: Received ProduceResponse (v3, 66 bytes, CorrId 422, rtt 753.36ms) %7|1534729805.303|RECV|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: Received ProduceResponse (v3, 66 bytes, CorrId 423, rtt 754.00ms) %7|1534729805.306|RECV|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: Received ProduceResponse (v3, 66 bytes, CorrId 424, rtt 728.51ms) %7|1534729805.367|BROKERFAIL|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ### ssl://localhost:9093/1: failed: err: Local: Timed out: (errno: Connection timed out) %3|1534729805.367|FAIL|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: 22 request(s) timed out: disconnect %3|1534729805.367|ERROR|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: 22 request(s) timed out: disconnect %7|1534729805.367|SEND|rdkafka#producer-1| [thrd:ssl://localhost:9094/bootstrap]: ssl://localhost:9094/2: Sent MetadataRequest (v2, 281 bytes @ 0, CorrId 352) %7|1534729805.405|RECV|rdkafka#producer-1| [thrd:ssl://localhost:9094/bootstrap]: ssl://localhost:9094/2: Received MetadataResponse (v2, 619 bytes, CorrId 352, rtt 37.94ms) %7|1534729805.515|CONNECTED|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: Connected (#17) %7|1534729805.515|SEND|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: Sent ApiVersionRequest (v0, 25 bytes @ 0, CorrId 447) %7|1534729805.516|RECV|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: Received ApiVersionResponse (v0, 264 bytes, CorrId 447, rtt 0.28ms) %7|1534729805.524|SEND|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: Sent partial ProduceRequest (v3, 0+51185/999824 bytes, CorrId 448) %7|1534729805.524|SEND|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: Sent partial ProduceRequest (v3, 51185+18600/999824 bytes, CorrId 448) %7|1534729805.524|SEND|rdkafka#producer-1| [thrd:ssl://localhost:9093/bootstrap]: ssl://localhost:9093/1: Sent partial ProduceRequest (v3, 69785+27900/999824 bytes, CorrId 448)

I don't know which property should I set to prevent the connection from timeout.

the rdkafka version is 0.11.5. kafka version is 2.11-1.1.0. jdk version 1.8.0_171

How to reproduce

1.a kafka cluster with 3 nodes deployed on the same vm; 2.running a test program in the vm above; 4.the test program has 8 threads and each thread produces 100k(100000) messages,each message 300bytes; 5.version of rdkafka C library is 0.11.5; 6.version of kafka is 2.11-1.1.0; 7.version of jdk is 1.8.0_171;

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

yorksen commented 6 years ago

when msg cb is setted:

rd_kafka_conf_set_dr_msg_cb(conf, mq_connection_deliverlog);

static void mq_connection_deliverlog(rd_kafka_t kafka, const rd_kafka_message_t message, void* opaque) { if (!message) { MQ_ERROR("param message is null"); return; }

if (RD_KAFKA_RESP_ERR_NO_ERROR == message->err)
{
    return;
}

MQ_ERROR("failed to deliver message,len:%lu,offset:%"PRId64",topic:%s,err:%s\n", message->len,
        message->offset, rd_kafka_topic_name(message->rkt), rd_kafka_err2str(message->err));

return;

}

I get error log below when error occure: Aug 20 09:47:01 SZX1000449934 multi-thread: [KFKCLI ERROR] src:source/mq_connection.c,line:46 failed to deliver message,len:300,offset:-1001,topic:test_topic2,err:Local: Message timed out

edenhill commented 6 years ago

Your producer is producing messages faster than the broker or network can handle, resulting in connection timeout. Try setting max.in.flight to 1, 3 or some other low value to slow down the producer.

yorksen commented 6 years ago

when max.in.flight = 1 is setted to producer,the same error messages I get below: %3|1535105518.876|FAIL|rdkafka#producer-1| [thrd:172.120.88.25:9092/bootstrap]: 172.120.88.25:9092/1: 3 request(s) timed out: disconnect %3|1535105518.876|ERROR|rdkafka#producer-1| [thrd:172.120.88.25:9092/bootstrap]: 172.120.88.25:9092/1: 3 request(s) timed out: disconnect

but this time, I make the test program of consumer record how many messages it received, and I found that, when the error occurs, message num the consumer received will be more than the producer produces:

producer test program prints: produce 500000 datas cost: 384 ms

consumer test program prints: end consuming topic test1,products num 503225,cost time:2612 ms

I don't known what's happened behind, It seams messages are resent again when error occurs. I extremely cared about whether messages can keep sequences when being resent?

edenhill commented 6 years ago

You may get duplicates if retries>0, since the producer can't know if a timed-out ProduceRequest was handled by the broker or not.

The upcoming Idempotent Producer (part of EOS) will improve on situations like this.

yorksen commented 6 years ago

Thanks, I feel like it's a problem of my network. and What's EOS? Is it a new feature of librdkafka which will be delivered future?

rolandyoung commented 6 years ago

EOS = Exactly Once Semantics. See issue #1308

pdeole commented 3 years ago

Hi @edenhill @yorksen i am facing same issue during performance testing of my service. My service is essentially a consumer of kafka and does processing of records and then inserts into DB. In order to test it, I use a Jmeter test script that produces events onto kafka topic. Occasionally, I see below error in the jmeter logs and I don't see the event being consumed by service, which means the event was not produced by the script. Can you suggest how to get this resolved?

WARN o.a.k.c.n.SslTransportLayer: Failed to send SSL Close message java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[?:1.8.0_232] at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) ~[?:1.8.0_232] at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[?:1.8.0_232] at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[?:1.8.0_232] at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468) ~[?:1.8.0_232] at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:212) ~[kafka-clients-1.0.0-cp1.jar:?] at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:170) [kafka-clients-1.0.0-cp1.jar:?] at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:703) [kafka-clients-1.0.0-cp1.jar:?] at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61) [kafka-clients-1.0.0-cp1.jar:?] at org.apache.kafka.common.network.Selector.doClose(Selector.java:717) [kafka-clients-1.0.0-cp1.jar:?] at org.apache.kafka.common.network.Selector.close(Selector.java:708) [kafka-clients-1.0.0-cp1.jar:?] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:500) [kafka-clients-1.0.0-cp1.jar:?] at org.apache.kafka.common.network.Selector.poll(Selector.java:398) [kafka-clients-1.0.0-cp1.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) [kafka-clients-1.0.0-cp1.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) [kafka-clients-1.0.0-cp1.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:168) [kafka-clients-1.0.0-cp1.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]

edenhill commented 3 years ago

That's the Java client, not librdkafka.

pdeole commented 3 years ago

Correct... but it is same issue so if you can suggest any resolution way?

yorksen commented 3 years ago

Oh, it's very long time ago, it's seams I used "Idempotent Producer" feature to reduce the exception above.