Closed davideberra closed 5 years ago
The under-replication combined with broker failover seems to be the issue here, not the producer. The log message shows the log being truncated.
Are you telling that when brokers enter under-replicated state, order cannot be guarantee in any way? Future implementation of idempotence in librdkafka could fix our problem?
Under-replication means messages could not be written to all desired replicas. If you have a broker failover or leadship change in such a scenario, and the new leader was one of the replicas that did not get those messages, then there will be message loss.
You should set min.insync.replicas
appropriately and disable unclean leader election and you should be fine.
Taken from server.properties min.insync.replicas=2 unclean.leader.election.enable=false
so the configuration is exactly as you suggested. Unfortunately it seems not enough to avoid unordered messages. Do you have any other advices?
What could be happening is something like:
Ordering is thus not maintained.
The idempotent producer solves this by draining all outstanding requests for the old broker until sending new ones.
It would be very valuable if you could try to reproduce this on idempotence branch.
But don't set enable.idempotence=true
and don't set acks, retries, max.in.flight as they will be adjusted automatically.
I've made the test with library built from idempotence branch (but with enable.idempotence and other options not set as you said). The same problem occours. I've attached also the log with librdkafka debug output. log.txt
I also tried to run the test with idempotence set to true but i was flooded by "Local: Inconsistent state" and "Broker: Broker received an out of order sequence number" messages just after rd_kafka_produce call.
Thank you!
Those idempotence errors are interesting, could you reproduce that with debug=all and upload the logs?
Why are you recreating the producers? If you're not changing configuration you should persist producers as much as possible.
Just to be clear. Are you interested in logs from the enable.idempotence=true configuration?
About the logs I've previously attached, we're testing a scenario where MANY different processes push data using librdkafka apis (that's gonna be similar to our real world case). It's something we cannot avoid
Just to be clear. Are you interested in logs from the enable.idempotence=true configuration?
Yes please!
I don't see any failures or retries in the (non-idempotence) log you provided.
Just to be clear: ordering is only maintained per partition in Kafka. If you don't specify an explicit partition whe you produce the message will be partitioned using the configured partitioner. If no partitioner is configured the random_consistent will be used which distributes messages without keys across all available partitions, while messages with keys will have the keys hashed and mapped to a partition.
I forgot to mention that every topic got a single partition and a replica factor of 3. A progressive number key is included in every message and it will be checked from the consumer to discard duplicates but, if the order is not guaranteed by Kafka, the trick won't do the job. Tomorrow I will test again your latest library (idempotency set to true) and I'll attach the log here
Are you specifying the partition when you produce, or using PARTITION_UA?
Yes, partition is always set to zero when rd_kafka_produce is called
Log in attachment.
Recap of current scenario:
Producer config enable.idempotence=true message.timeout.ms=0 linger.ms=10 batch.num.messages=10000 queue.buffering.max.messages=100000 message.max.bytes=1000000 debug=all
topics replica is set to 3, single partition
22 processes opening each one a connection to kafka. Every process create a new topic (random name) and push data related to the one single topic.
During this test run, a lot of data wasn't committed to Kafka brokers.
I also included log messages in return of rd_kafka_produce (the ones starting with "Error returned by rd_kafka_produce") and error catched by the commit message callback (the ones starting with "Commit error on Kafka broker: ")
Please let me know if you need further infos
Thank you!
Looking at one producer with at first a seemingly complete log, pid 1648, shows:
Inconsistent state
at 10:50:28What is weird is there is absolutely no debug logs for 9 minutes, when there should be at least a couple per second with debug=all.
......
debug Sep 18 10:43:44 producer producer[1648]: RDKAFKA - 7-APIVERSION: rdkafka#producer-1: [thrd:10.200.10.1:9092/bootstrap]: 10.200.10.1:9092/bootstrap: ApiKey DescribeAcls (29) Versions 0..1
debug Sep 18 10:43:44 producer producer[1648]: RDKAFKA - 7-APIVERSION: rdkafka#producer-1: [thrd:10.200.10.1:9092/bootstrap]: 10.200.10.1:9092/bootstrap: ApiKey CreateAcls (30) Versions 0..1
debug Sep 18 10:43:44 producer producer[1648]: RDKAFKA - 7-APIVERSION: rdkafka#producer-1: [thrd:10.200.10.1:9092/bootstrap]: 10.200.10.1:9092/bootstrap: ApiKey DeleteAcls (31) Versions 0..1
debug Sep 18 10:43:44 producer producer[1648]: RDKAFKA - 7-APIVERSION: rdkafka#producer-1: [thrd:10.200.10.1:9092/bootstrap]: 10.200.10.1:9092/bootstrap: ApiKey DescribeConfigs (32) Versions 0..2
debug Sep 18 10:43:44 producer producer[1648]: RDKAFKA - 7-APIVERSION: rdkafka#producer-1: [thrd:10.200.10.1:9092/bootstrap]: 10.200.10.1:9092/bootstrap: ApiKey AlterConfigs (33) Versions 0..1
err Sep 18 10:50:28 producer producer[1648]: Error returned by rd_kafka_produce: Local: Inconsistent state
err Sep 18 10:50:28 producer producer[1648]: Error returned by rd_kafka_produce: Local: Inconsistent state
err Sep 18 10:50:58 producer producer[1648]: Error returned by rd_kafka_produce: Local: Inconsistent state
err Sep 18 10:50:58 producer producer[1648]: Error returned by rd_kafka_produce: Local: Inconsistent state
err Sep 18 10:50:58 producer producer[1648]: Error returned by rd_kafka_produce: Local: Inconsistent state
err Sep 18 10:50:58 producer producer[1648]: Error returned by rd_kafka_produce: Local: Inconsistent state
.....
err Sep 18 10:52:28 producer producer[1648]: Commit error on Kafka broker: Local: Purged in queue
err Sep 18 10:52:28 producer producer[1648]: Commit error on Kafka broker: Local: Purged in queue
err Sep 18 10:52:28 producer producer[1648]: Commit error on Kafka broker: Local: Purged in queue
err Sep 18 10:52:28 producer producer[1648]: Commit error on Kafka broker: Local: Purged in queue
debug Sep 18 10:52:29 producer producer[1648]: RDKAFKA - 7-DESTROY: rdkafka#producer-1: [thrd:app]: Terminating instance (destroy flags none (0x0))
debug Sep 18 10:52:29 producer producer[1648]: RDKAFKA - 7-TERMINATE: rdkafka#producer-1: [thrd:app]: Interrupting timers
debug Sep 18 10:52:29 producer producer[1648]: RDKAFKA - 7-TERMINATE: rdkafka#producer-1: [thrd:app]: Sending TERMINATE to internal main thread
debug Sep 18 10:52:29 producer producer[1648]: RDKAFKA - 7-TERMINATE: rdkafka#producer-1: [thrd:app]: Joining internal main thread
debug Sep 18 10:52:29 producer producer[1648]: RDKAFKA - 7-TERMINATE: rdkafka#producer-1: [thrd:main]: Internal main thread terminating
The "error returned by rd_kafka_produce" means a fatal idempotent producer error was raised, but there is no corresponding debug or log message to that affect. It feels like there is alot of missing log lines.
side question: I guess the "Commit error on Kafka broker" is the delivery report callback?
yes... i confirm that "Commit error on Kafka broker" is logged into the delivery report callback. I confirm also that there's no further log messages. I dunno why that process (pid 1648) looks freezed for 7 minutes.
Can you share relevant parts of your producer? (init, produce loop, delivery report, termination)
Initialization
rd_kafka_t *conn = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
rd_kafka_topic_t *topic = rd_kafka_topic_new(conn, topic_string, NULL);
Produce
produce:
if (rd_kafka_produce(topic, 0,
RD_KAFKA_MSG_F_COPY,
buf, sz,
key, key_sz,
&err) == -1) {
rd_kafka_resp_err_t resp_err = rd_kafka_last_error();
// behaviour taken from example in kafka library srcs
if (resp_err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
rd_kafka_poll(conn, 1000);
goto produce;
} else {
log_write(LOG_ERR, "Error returned by rd_kafka_produce: %s",
rd_kafka_err2str(resp_err));
return 1;
}
}
Delivery report (note: msg_opaque was used in an early phase to test synchronous commit. Not used anymore)
static void storage_kafka_commit_cb (rd_kafka_t *rk,
void *payload, size_t len,
rd_kafka_resp_err_t err,
void *opaque, void *msg_opaque) {
// log in case of error
if (err) {
log_write(LOG_ERR, "Commit error on Kafka broker: %s",
rd_kafka_err2str(err));
}
// opaque is a pointer to the err value inside the producer function
rd_kafka_resp_err_t *errp = (rd_kafka_resp_err_t *) msg_opaque;
*errp = err;
}
Terminate
rd_kafka_flush(conn, -1);
if (conn) {
rd_kafka_destroy(conn);
conn = NULL;
}
Looks good (you might want to change to the richer dr_msg_cb, dr_cb is legacy and will go away eventually).
So, you are only seeing reordering, not drops, correct? Is it reproducible with less producers? I'd really like to understand why there is so much missing log, less producers might help narrow it down.
Make sure you are not forking after having creating the rd_kafka_t handle.
Quick recap:
rd_kafka_new is always called after the fork and just once per process.
I could test it again with less producers as you asked
Is the partition leader ever changing during these tests?
Also; no data seems to be lost since you get delivery reports for failing messages; but the messages fail to be written to the log. Message loss is when a write is indicated (through successful delivery report) but is not in fact persisted in the log.
Is the partition leader ever changing during these tests?
Could you please help me identifying which log file i have to search in (state-change.log?) and which message should i grep?
You could look for lines like this in server.log:
[2018-09-19 10:40:56,964] INFO [Partition test1-0 broker=3] test1-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
any any corresponding logs for the topic in state-change.log
I believe it is important to find out why the producer logs are missing log entries, a lesser number of producers might make it easier to find.
Please also make sure your ordering-check logic is correct and does not mix up messages from different producers.
Broker log related to the test i did yesterday (18th Sep) in attachment. Please note that the broker time is 1'30" back compared to producer server (just to be clear... when producer clocks 10:00:00, the broker clock 09:58:30)
Just to ensure I understand you correctly @davideberra with idempotence enabled you are no longer experiencing ordering issues but instead loss of data.
The server logs you provided are quite tumultuous which causes numerous produce failures which would require the producer to retry. I suspect the data loss here can actually be attributed to these produce failures. These failures would normally be retried with the idempotent producer however you eventually run into an out of sequence error which as you have noted is fatal. At this point the producer queues are purged which I could see being perceived as lost.
A cursory search of existing broker issues returned the following which may be worth looking into.
https://issues.apache.org/jira/browse/KAFKA-7298
Assuming you do receive the failed produce delivery reports @edenhill mentioned earlier I am fairly confident this is a broker issue as opposed to a client issue.
This should be fixed now. https://github.com/edenhill/librdkafka/pull/2081/commits
Description
Heavy load of traffic leads to a temporary "under-replicated" brokers status and messages order is lost despite max.in.flight.requests.per.connection is set to 1
Our scenario
3 Kafka brokers running Kafka 2.11-2 on a VM machine. All three brokers access to the same physical disk. Producer is a process using librdkafka-0.11.5. Average size of messages is 190 bytes and contains binary data. 30 producers are spawned, creating a new topic each one and pushing 100MB of binary data. When all of them complete the task, a new batch of 30 producers is spawned. This is done for 10 times for an amount of 300 topics created and filled with data.
How to reproduce
Setup a multi-broker scenario, 30 producers pushing data on 30 different topics until brokers move to "under-replicated" status. Then start the consumer and you'll get an unordered stream of data.
Log
Following WARNING and ERROR logs use to appear during the tests:
[2018-09-14 15:55:46,498] WARN [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Based on follower's leader epoch, leader replied with an unknown offset in PRFX2-0_16_187803764-0. The initial fetch offset 0 will be used for truncation. (kafka.server.ReplicaFetcherThread) [2018-09-14 15:51:46,588] ERROR [ReplicaManager broker=0] Error processing append operation on partition PRFX2-8_13_2076900336-0 (kafka.server.ReplicaManager) [2018-09-14 15:53:59,872] ERROR [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error for partition PRFX2-2_14_1151032901-0 at offset 258706 (kafka.server.ReplicaFetcherThread)
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
v0.11.5
v2.11-2
acks=-1 retries=0 message.timeout.ms=0 retry.backoff.ms=0 max.in.flight.requests.per.connection=1 linger.ms=10 batch.num.messages=10000 queue.buffering.max.messages=100000 message.max.bytes=10000000
Red Hat Enterprise Linux Server release 7.3
debug=..
as necessary) from librdkafka