Closed apeloquin-agilysys closed 14 hours ago
Thanks a lot for filing this issue!
I took a look, I saw where the problem is.
Results on my machine with identical code (Ubuntu on WSL), running a single local broker locally. I replaced log.info with console.log and defined an until method to poll the condition function every 100ms.
Initial results with the same code - I could reproduce the horrible performance.
# Confluent
Rec'd 1000 : 497836ms
Sent 1000 : 501832ms
# KafkaJS
Sent 1000 : 3895ms
Rec'd 1000 : 3897ms
I will go into a workaround first, and then dive into the reason in a follow up comment. The actual workaround is a couple of lines:
- await producer.send({
+ producer.send({
topic,
messages: [{value: "one"}]
});
if (++sentCount % 100 === 0) {
+ await producer.flush({ timeout: 5000 });
console.log(`Sent ${String(sentCount).padStart(4, " ")} : ${Date.now() - start}ms`);
}
However, this makes the comparison a bit unfair as librdkafka will internally use batching. I added these two options to the producer config to turn off batching, and make sure that no more than one produce request is in-flight at the same time. This isn't required by the work-around, but it's required for the fairness of the test.
- const producer = kafka.producer();
+ const producer = kafka.producer({'linger.ms': 0, 'max.in.flight': 1,});
Results after that for 1000 messages, Confluent is still slower but within the same order of magnitude:
# Confluent
Sent 1000 : 5493ms
Rec'd 1000 : 6494ms
# KafkaJS
Sent 1000 : 3895ms
Rec'd 1000 : 3897ms
Results for 10K messages (logging per 1K rather than per 100). Confluent becomes faster. The more you increase the messages, the larger this difference grows.
# Confluent
Sent 10000 : 5810ms
Rec'd 10000 : 6813ms
# KafkaJS
Sent 10000 : 9083ms
Rec'd 10000 : 9085ms
There were cases in the Confluent client where the producer started producing before the consumer had been assigned partitions (rebalance still ongoing), so I added this before producer.connect() to make sure that doesn't happen:
await until(async () => consumer.assignment().length > 0, {timeout: 5000});
Details of the issue:
Once a produce request is sent, librdkafka delivers the delivery-report on a poll basis. Internally the Confluent library calls this polling function with setInterval. When producer.send() is awaited on, it will block at least until the next poll.
The interval between polls is 500ms, and if one is sending messages continuously, every await producer.send()
will block for approximately 500ms... Which is why the performance is horrible, every message seems to take half a second to send, just because of the polling frequency.
We're discussing a few solutions internally. I'll keep this issue open until we're doing so.
I've merged a change within the code to fix this flow, and added guidelines in the MIGRATION.md file as well. For maintaining parity with librdkafka, the parameter 'linger.ms' has the value 5ms by default, and as a consequence, awaiting a send waits for 5ms + the send time.
Additional info in the MIGRATION file here, specifically,
However, in case it is desired to await every message, linger.ms should be set to 0, to ensure that the default batching behaviour does not cause a delay in awaiting messages. Example:
const kafka = new Kafka({ kafkaJS: { /* ... */ }}); const producer = kafka.producer({ 'linger.ms': 0 });
Here are the perf results, the only change I've made to the files is the addition of linger.ms as described above (no workaround/flush() calls needed).
1000 msgs
KafkaJS
Sent 1000 : 3610ms
Rec'd 1000 : 3612ms
Confluent
Sent 1000 : 1385ms
Rec'd 1000 : 4039ms
10k msgs, printing every 1000
KafkaJS
Sent 10000 : 7050ms
Rec'd 10000 : 7052ms
Confluent
Sent 10000 : 4513ms
Rec'd 10000 : 5522ms
With that, I'm marking the issue fixed, but please reopen this or another issue for any more perf issues. Thanks again for opening this.
Can you confirm that await producer.send({...})
will throw an error if message/batch is not received per the acks
value, as in KafkaJS?
In our scenario, our app has tracking that must be updated only after the messages are successfully delivered (with default acks: -1
for all insync replicas), and we rely on a thrown error to indicate otherwise.
Re-running with lingerMs: 0 definitely puts the message production on the fast track.
I'm wondering if there is a similar optimization that can be made on the consumer side.
With KafkaJS messages are being picked up by the consumer and processed nearly immediately as they are sent.
Sent 100 : 149ms
Rec'd 100 : 150ms
Sent 200 : 190ms
Rec'd 200 : 190ms
Sent 300 : 218ms
Rec'd 300 : 219ms
Sent 400 : 253ms
Rec'd 400 : 253ms
Sent 500 : 282ms
Rec'd 500 : 282ms
Sent 600 : 311ms
Rec'd 600 : 311ms
Sent 700 : 336ms
Rec'd 700 : 337ms
Sent 800 : 362ms
Rec'd 800 : 362ms
Sent 900 : 387ms
Rec'd 900 : 387ms
Sent 1000 : 417ms
Rec'd 1000 : 417ms
With Confluent, the send is now fast, but there is clearly a near-1 second processing interval at play on the consumer side:
Sent 100 : 96ms
Sent 200 : 96ms
Sent 300 : 97ms
Sent 400 : 97ms
Sent 500 : 97ms
Sent 600 : 97ms
Sent 700 : 98ms
Sent 800 : 98ms
Sent 900 : 98ms
Sent 1000 : 98ms
Rec'd 100 : 1060ms <-- +1 second
Rec'd 200 : 1063ms
Rec'd 300 : 1073ms
Rec'd 400 : 1075ms
Rec'd 500 : 1076ms
Rec'd 600 : 1078ms
Rec'd 700 : 1079ms
Rec'd 800 : 2083ms <-- +1 second
Rec'd 900 : 2084ms
Rec'd 1000 : 2085ms
In our application, a "message" is sent through multiple topics/consumers for processing. In the end our most important performance metric is the delay between message origination and reception in the final consumer. One of the responsibilities of this flow is cache eviction, so timeliness matters.
I put together an example to illustrate.
KafkaJS
import {Kafka} from "kafkajs";
const producers: Producer[] = [];
const consumers: Consumer[] = [];
const topic = "test-kafkajs-topic";
const total = 5;
const kafka = new Kafka({brokers: ["localhost:9092"]});
let lastReceived: number | undefined;
for (let i = 0; i < total; i++) {
const consumer = kafka.consumer({groupId: `${topic}-${i}-group`});
await consumer.connect();
await consumer.subscribe({topic: `${topic}-${i}`});
await consumer.run({
eachMessage: async ({message}) => {
log.info(`Rec'd @${i} : ${Date.now() - start}ms`);
if (i < total - 1) {
await producers[i].send({
topic: `${topic}-${i + 1}`,
messages: [{value: "one"}]
});
}
lastReceived = i;
}
});
consumers.push(consumer);
}
for (let i = 0; i < total; i++) {
const producer = kafka.producer();
await producer.connect();
producers.push(producer);
}
const start = Date.now();
await producers[0].send({
topic: `${topic}-0`,
messages: [{value: "one"}]
});
log.info(`Sent 1 : ${Date.now() - start}ms`);
await until(async () => lastReceived == total - 1);
for (let i = 0; i < total; i++) {
await producers[i].disconnect();
await consumers[i].disconnect();
}
Sent 1 : 15ms
Rec'd @0 : 18ms
Rec'd @1 : 25ms
Rec'd @2 : 30ms
Rec'd @3 : 35ms
Rec'd @4 : 37ms
Confluent
import {KafkaJS as Confluent} from "@confluentinc/kafka-javascript";
const producers: Confluent.Producer[] = [];
const consumers: Confluent.Consumer[] = [];
const topic = "test-confluent-topic";
const total = 5;
const kafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});
let lastReceived: number | undefined;
for (let i = 0; i < total; i++) {
const consumer = kafka.consumer({kafkaJS: {groupId: `${topic}-${i}-group`}});
await consumer.connect();
await consumer.subscribe({topic: `${topic}-${i}`});
await consumer.run({
eachMessage: async ({message}) => {
log.info(`Rec'd @${i} : ${Date.now() - start}ms`);
if (i < total - 1) {
await producers[i].send({
topic: `${topic}-${i + 1}`,
messages: [{value: "one"}]
});
}
lastReceived = i;
}
});
consumers.push(consumer);
}
for (let i = 0; i < total; i++) {
const producer = kafka.producer({"linger.ms": 0});
await producer.connect();
producers.push(producer);
}
const start = Date.now();
await producers[0].send({
topic: `${topic}-0`,
messages: [{value: "one"}]
});
log.info(`Sent 1 : ${Date.now() - start}ms`);
await until(async () => lastReceived == total - 1);
for (let i = 0; i < total; i++) {
await producers[i].disconnect();
await consumers[i].disconnect();
}
Rec'd @0 : 16ms
Sent 1 : 17ms
Rec'd @1 : 1018ms
Rec'd @2 : 2021ms
Rec'd @3 : 2044ms
Rec'd @4 : 2049ms
Can you confirm that
await producer.send({...})
will throw an error if message/batch is not received per theacks
value, as in KafkaJS?
Regarding this, yes, you will get a thrown error when awaiting send if the produce fails. For example this is what happens in a 3 broker cluster when the min insync replicas are 2, but only 1 broker is up.
Error: Broker: Not enough in-sync replicas
at Function.createLibrdkafkaError [as create] ($HOME/git/confluent-kafka-js/lib/error.js:456:10)
at Producer.<anonymous> ($HOME/git/confluent-kafka-js/lib/producer.js:91:31) {
name: 'KafkaJSProtocolError',
retriable: false,
fatal: false,
abortable: false,
code: 19,
type: 'ERR_NOT_ENOUGH_REPLICAS'
}
Regarding your second message, there are two things,
Before
Sent 100 : 4140ms
Rec'd 100 : 4167ms
Sent 200 : 4212ms
Rec'd 200 : 4239ms
Sent 300 : 4262ms
Rec'd 300 : 4302ms
Sent 400 : 4311ms
Sent 500 : 4357ms
Rec'd 400 : 4363ms
Rec'd 500 : 4364ms
Sent 600 : 4403ms
Sent 700 : 4447ms
Rec'd 600 : 4476ms
Rec'd 700 : 4478ms
Sent 800 : 4494ms
Sent 900 : 4541ms
Sent 1000 : 4586ms
Rec'd 800 : 5591ms <-- jump of 1 second
Rec'd 900 : 5595ms
Rec'd 1000 : 5597ms
After
Sent 100 : 4117ms
Rec'd 100 : 4119ms
Sent 200 : 4177ms
Rec'd 200 : 4187ms
Sent 300 : 4232ms
Rec'd 300 : 4234ms
Sent 400 : 4289ms
Rec'd 400 : 4294ms
Sent 500 : 4345ms
Rec'd 500 : 4358ms
Sent 600 : 4399ms
Rec'd 600 : 4408ms
Sent 700 : 4450ms
Rec'd 700 : 4451ms
Sent 800 : 4495ms
Rec'd 800 : 4506ms
Sent 900 : 4544ms
Rec'd 900 : 4545ms
Sent 1000 : 4602ms
Rec'd 1000 : 4604ms
Thanks for attaching the use case example, it's very helpful.
Regarding the second question above. The main culprit - the producer takes a longer time to send the first message to a topic. This is because there is no metadata cached for such a topic, so the producer has to make a metadata request to fetch info about the said topic before the produce can go through. At the moment, there are no workarounds available within this library, but this issue occurs only in the first message and not the subsequent ones to the same topic.
Understood. It appears that KafkaJS handles this by loading metadata for all topics at the time the producer is connected, not on-demand when the message is sent. For our case, this is preferable because this occurs during the startup phase, before the app is "ready" and considered online -- so there is no hit/delay when the first message is sent.
I understand the requirement, but at the same time, we avoid caching metadata for all topics initially, because it incurs a penalty for memory (small), and for subsequent metadata requests (large), since we make follow-up metadata requests on all the topics we have already cached, so the size of the metadata request can grow very large.
There is a workaround for it within the underlying library, I will work to make it accessible from the Javascript binding, and discuss with my team if there's anything I can do to make the process seamless.
Hi @milindl,
There is a workaround for it within the underlying library, I will work to make it accessible from the Javascript binding, and discuss with my team if there's anything I can do to make the process seamless.
Circling back to this, is the workaround accessible, and if so can you provide more details?
It isn't accessible yet, but I discussed this within the team and we finalized an approach. There's a non-trivial code change required internally to make it accessible, I'll keep this thread updated when I start working on it.
We've added the workaround in v0.5.1, to use the workaround, an example is given here: https://github.com/confluentinc/confluent-kafka-javascript/blob/master/examples/kafkajs/admin/dependent-admin.js#L72 .
I ran a test sending and receiving 1000 messages individually (no batching) using the KafkaJS library, and then ran the same test using the Confluent library (following the migration instructions).
KafkaJS: 455ms Confluent: 501951ms
That's not a typo. In this case, the Confluent test took 1000x time to complete.
I'm presuming there is some tuning that can be done via configuration; but this was an "out of the box" conversion, and my attempts at "tuning" the configuration did not yield any noticeable differences.
Notes
KafkaJS
Confluent