confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
214 stars 3.14k forks source link

rd_kafka_producev: random message loss under CPU stress #1961

Closed alorbach closed 5 years ago

alorbach commented 6 years ago

Description

We use rd_kafka_producev in rsyslog output plugin omkafka to produce kafka messages and submit them to kafka servers. In our testbench setup, we use a single kafka /zookeeper server instance with a single partition. If we put the machine under high CPU stress (using "stress -c 4"), librdkafka/or kafka appears to be loosing random messages every once in a while.

The strange thing is, that the deliveryCallback set by rd_kafka_conf_set_dr_msg_cb is actually called for the message that is being lost. So from a technical point librdkafka successfully submitted the message to kafka.

We use a kafka consomer (with rsyslogplugin imkafka) to read all messages we produced with omkafka earlier.

I have added logfiles from a sample (including server logs). log-omkafka is the log producer. log-imkafka is the log consumer.

The missing message is "msgnum:00022364". If you search for it in log-omkafka, you will the delivery success. In log-imkafka you will only find 00022363 and 00022365, but 00022364 is missing.

Checklist

Please provide the following information:

logs.zip kafkalogs.zip

rolandyoung commented 6 years ago

Given that the kafka broker has acknowledged the message, this seems more likely to be a problem with the broker or the consumer than with the producer.

Have you checked (with kafkacat, for instance) whether the missing message was persisted by the broker? If not, this appears to be a broker problem. If it did get persisted, then it would be a problem between the broker and the consumer.

alorbach commented 6 years ago

I will look into using kafkacat tool. Do you have the propper command paramaters ready to do such a check?

edenhill commented 6 years ago

@alorbach kafkacat -b <broker> -C -o beginning -t <topic> -f '%p@%o: %s\n' will print partition, offset and value for all messages in all partitions for a topic.

edenhill commented 6 years ago

You might want to use request.required.acks=all on the producer to make sure all replicas have written the message to the log before returning an ack to the producer.

rolandyoung commented 6 years ago

Looking at your logs, I see that your imkafka process seems to have skipped reading messages from partition [0] of the topic between offsets 11300 (msgnum:00022361) and 11388 (msgnum:00022443).

You may notice that between these two points in the log, there are a number of messages relating to partition static [0]. This one, in particular, looks problematic:

5045.221659421:7f50b8d1a700: imkafka.c: imkafka: kafka log message [7,OFFSET]: [thrd:main]: static [0]: offset reset (at offset 11301) to BEGINNING: Broker: Offset out of range: Broker: Offset out of range

Examining the logs in more detail may provide a clue as to what is going on, but this definitely looks like a broker or consumer-side problem.

alorbach commented 6 years ago

I am trying to work with kafkacat, but I cannt get more then about 30000 messages out of kafka. I am producing 100000 messages. I see in debug log that all 100000 messages have been procuded.

kafkacat only returns last 30k messages. Any idea?

Edit: I believe this is actually my problem why messages are lost from time to time.

edenhill commented 6 years ago

What messages are missing from the kafkacat consumer? Is it a range? Is it for a specific partition?

alorbach commented 6 years ago

range from approx 1 to 70000 ... last 30000 messages are appearing. Used: kafkacat -b localhost:29092 -e -C -o beginning -t static -f '%p@%o:%k:%s'

Output looks like attached logfile rsyslog_omkafka.sh_34861.out.log

alorbach commented 6 years ago

I changed my kafka instance configuration to the basic sample I found here: https://github.com/kafka-dev/kafka/blob/master/config/server.properties

Once I get above ~ 62k messages into kafka, I only get about 20-30k messages out of kafkacat. If I stay below 62k messages, kafkacat is getting all the messages.

kafkacat output is: kafkacat -b localhost:29092 -e -C -o beginning -t static -f '%s' % Reached end of topic static [0] at offset 31176 % Reached end of topic static [1] at offset 30824: exiting

so I see propper offsets here. But kafkacat just returns now data, is it more a problem of kafkakat or kafka server?

alorbach commented 6 years ago

I have some new informations! This issue seems to be happening only, when I am using the new "rd_kafka_producev" to produce kafka messages. When I use the old "rd_kafka_produce" API (Which kafkacat also uses), I get all 100k messages back from the kafka server.

edenhill commented 6 years ago

@alorbach Can you share your code calling producev() ?

alorbach commented 6 years ago
  msg_kafka_response = rd_kafka_producev(pData->rk,
                  RD_KAFKA_V_RKT(rkt),
                  RD_KAFKA_V_PARTITION(partition),
                  RD_KAFKA_V_VALUE(msg, strlen((char*)msg)),
                  RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                  RD_KAFKA_V_TIMESTAMP(ttMsgTimestamp),
                  RD_KAFKA_V_END);

Old API Call is used like this:

msg_enqueue_status = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, msg, strlen((char)msg), pData->key, pData->key == NULL ? 0 : strlen((char)pData->key), NULL);

This is basically what I am using. Partition is set to RD_KAFKA_PARTITION_UA

edenhill commented 6 years ago

How are you handling the case where msg_kafka_response is set to an error? Could you try using RD_KAFKA_V_TOPIC("topic-name") instead of rkt and see if it changes anything?

You're not passing a key to the producev() method, only to produce(), not sure if that is intentional.

alorbach commented 6 years ago

Maybe its easier to share the original code here: https://github.com/rsyslog/rsyslog/blob/master/plugins/omkafka/omkafka.c#L646

A key is used when configured, the sample I cut out was the branch not using the KEY.

I will add RD_KAFKA_V_TOPIC to the producev and see what happens in the test.

edenhill commented 6 years ago

Thank you.

re key: you can pass a NULL, 0 key to RD_KAFKA_V_KEY() to avoid the if.

edenhill commented 6 years ago

I changed my kafka instance configuration to the basic sample I found here: https://github.com/kafka-dev/kafka/blob/master/config/server.properties

nit: That config is 7 years old, might want to use something fresher, such as: https://github.com/apache/kafka/blob/trunk/config/server.properties

alorbach commented 6 years ago

Ok I will add RD_KAFKA_V_KEY(NULL, 0), if no key is specified.

Using RD_KAFKA_V_TOPIC() instead of RD_KAFKA_V_RKT() did not change anything.

alorbach commented 6 years ago

Kafka Server config I am using: https://github.com/rsyslog/rsyslog/blob/master/tests/testsuites/kafka-server.properties

edenhill commented 6 years ago

You're calling flush() with a timeout of 1000ms, which is too short to wait for all outstanding messages to be delivered:

5073.478003939:main thread    : omkafka.c: omkafka: kafka delivery SUCCESS on msg ' msgnum:00100000:'
5073.478007570:main thread    : omkafka.c: omkafka: onDestroyflushed remaining '248' messages to kafka topic 'static'
5073.478009941:main thread    : omkafka.c: omkafka: onDestroy kafka outqueue length: 0, callbacks called 0
5073.478012230:main thread    : omkafka.c: omkafka: closing topic static

First I suggest increasing the timeout, conceptually it should be message.timeout.ms to allow messages to be produced within the configured constraints, but at shutdown that might be too long. The recommendation is still at a minimum 10s.

Secondly, there seems to be very little batching of requests, one message is sent per request which has quite substantial overhead. You can increase performance by increasing the batch accumulation time: set linger.ms to something like 10 or 50 ms.

edenhill commented 6 years ago

There are also a bunch of failed messages in the log

egrep -o 'delivery (FAIL|SUCCESS)' ~/Hämtningar/log-omkafka  | sort | uniq -c
    624 delivery FAIL
 100000 delivery SUCCESS

I suggest printing the rkmessage->err when delivery fails, even if you re-enqueue them for later sending.

alorbach commented 6 years ago

I have compared server.log between old and new producer API. The only difference is at the end, when using rd_kafka_producev, these loglines are at the end of kafka server log showing that it deletged the missing entries.

[2018-09-04 12:08:29,139] INFO [Log partition=8zvbo9xa-0, dir=/home/al/git/rsyslog/tests/.dep_wrk/kafka/kafka-logs] Found deletable segments with base offsets [0] due to retention time 604800000ms breach (kafka.log.Log) [2018-09-04 12:08:29,150] INFO [ProducerStateManager partition=8zvbo9xa-0] Writing producer snapshot at offset 55143 (kafka.log.ProducerStateManager) [2018-09-04 12:08:29,151] INFO [Log partition=8zvbo9xa-0, dir=/home/al/git/rsyslog/tests/.dep_wrk/kafka/kafka-logs] Rolled new log segment at offset 55143 in 5 ms. (kafka.log.Log) [2018-09-04 12:08:29,152] INFO [Log partition=8zvbo9xa-0, dir=/home/al/git/rsyslog/tests/.dep_wrk/kafka/kafka-logs] Scheduling log segment [baseOffset 0, size 4023970] for deletion. (kafka.log.Log) [2018-09-04 12:08:29,153] INFO [Log partition=8zvbo9xa-0, dir=/home/al/git/rsyslog/tests/.dep_wrk/kafka/kafka-logs] Incrementing log start offset to 55143 (kafka.log.Log) [2018-09-04 12:08:29,157] INFO Cleared earliest 0 entries from epoch cache based on passed offset 55143 leaving 1 in EpochFile for partition 8zvbo9xa-0 (kafka.server.epoch.LeaderEpochFileCache)

edenhill commented 6 years ago

Ah, yes, if you pass a message timestamp that is older than currtime - retention.ms, it is highly likely that your messages will be deleted at the next log compaction run by the broker, which you seem to be doing when resending a message: https://github.com/rsyslog/rsyslog/blob/master/plugins/omkafka/omkafka.c#L649

That line should probably be * 1000.

But you can leave the timestamp at 0 in that case to make the producer automatically insert the current time.

alorbach commented 6 years ago

Good catch! Thanks, that was indeed a little bug.

Finally I know why my records are deleted. In the test it runs ino the "ttMsgTimestamp = atoi((char*)msgTimestamp)" case and the milliseconds are added to the timestamp.

Wenn commend out the RD_KAFKA_V_TIMESTAMP part, the test suddenly works. So something is wrong with my timestamp calculation. Debug shows timestamp=1519866000 and ttMsgTimestamp = 1519866000000

But logs still get deleted by retention checker.

edenhill commented 6 years ago

When you are retrying a message you pass time as seconds, not milliseconds: https://github.com/rsyslog/rsyslog/blob/master/plugins/omkafka/omkafka.c#L649

$ grep producev.timestamp log-omkafka  | grep -v '000$' | head
5051.324309225:main Q:Reg/w0  : omkafka.c: omkafka: rd_kafka_producev timestamp=(null)/1534845051
5051.326596371:main Q:Reg/w0  : omkafka.c: omkafka: rd_kafka_producev timestamp=(null)/1534845051
5051.326640246:main Q:Reg/w0  : omkafka.c: omkafka: rd_kafka_producev timestamp=(null)/1534845051
5051.328647942:main Q:Reg/w0  : omkafka.c: omkafka: rd_kafka_producev timestamp=(null)/1534845051
5051.328684986:main Q:Reg/w0  : omkafka.c: omkafka: rd_kafka_producev timestamp=(null)/1534845051
5053.378513193:main Q:Reg/w0  : omkafka.c: omkafka: rd_kafka_producev timestamp=(null)/1534845053
5053.378669626:main Q:Reg/w0  : omkafka.c: omkafka: rd_kafka_producev timestamp=(null)/1534845053
5053.883760057:main Q:Reg/w0  : omkafka.c: omkafka: rd_kafka_producev timestamp=(null)/1534845053
5053.883783908:main Q:Reg/w0  : omkafka.c: omkafka: rd_kafka_producev timestamp=(null)/1534845053
5053.883802992:main Q:Reg/w0  : omkafka.c: omkafka: rd_kafka_producev timestamp=(null)/1534845053
alorbach commented 6 years ago

Yes this is the bug your found earlier above. I have fixed that in my test branch. But the problem persists.

0483.487997496:main Q:Reg/w0 : omkafka.c: omkafka: rd_kafka_producev timestamp=1519866000/1519866000000 0483.488093867:main Q:Reg/w0 : omkafka.c: omkafka: rd_kafka_producev timestamp=1519866000/1519866000000 0483.488195048:main Q:Reg/w0 : omkafka.c: omkafka: rd_kafka_producev timestamp=1519866000/1519866000000

edenhill commented 6 years ago

Is the broker still saying it is deleting segments?

alorbach commented 6 years ago

Its deleting entries when "log.retention.hours" is below ~ 4400 in the kafka server configuration.

With log.retention.hours=4400 logs are deleted:

[2018-09-04 14:00:12,146] INFO Created log for partition zbp5iG90-0 in /home/al/git/rsyslog/tests/.dep_wrk/kafka/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.0-IV1, file.delete.delay.ms -> 60000, max.message.bytes -> 1000000, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> true, retention.bytes -> 104857600, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 10000, segment.ms -> 604800000, segment.bytes -> 104857600, retention.ms -> 15840000000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 104857600, flush.messages -> 10000}. (kafka.log.LogManager)

With log.retention.hours=4500 no logs are deleted:

[2018-09-04 14:01:41,705] INFO Created log for partition IpkGd3og-0 in /home/al/git/rsyslog/tests/.dep_wrk/kafka/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.0-IV1, file.delete.delay.ms -> 60000, max.message.bytes -> 1000000, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> true, retention.bytes -> 104857600, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 10000, segment.ms -> 604800000, segment.bytes -> 104857600, retention.ms -> 16200000000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 104857600, flush.messages -> 10000}. (kafka.log.LogManager)

alorbach commented 6 years ago

But technically can you confirm that my timestamps are correct now as shown below:

3906.724258635:main Q:Reg/w0 : omkafka.c: omkafka: rd_kafka_producev timestamp=1519866000/1519866000000 3906.724457330:main Q:Reg/w0 : omkafka.c: omkafka: rd_kafka_producev timestamp=1519866000/1519866000000 3906.724657294:main Q:Reg/w0 : omkafka.c: omkafka: rd_kafka_producev timestamp=1519866000/1519866000000 3906.724838562:main Q:Reg/w0 : omkafka.c: omkafka: rd_kafka_producev timestamp=1519866000/1519866000000 3906.725036284:main Q:Reg/w0 : omkafka.c: omkafka: rd_kafka_producev timestamp=1519866000/1519866000000 3906.725250509:main Q:Reg/w0 : omkafka.c: omkafka: rd_kafka_producev timestamp=1519866000/1519866000000

edenhill commented 5 years ago
$ LC_ALL=C date -d @1519866000
Thu Mar  1 02:00:00 CET 2018

Looks ok. Do note that message timestamps are in milliseconds since epoch, not seconds though.