openmessaging / benchmark

OpenMessaging Benchmark Framework
Apache License 2.0
384 stars 237 forks source link

Large Number of Consumers Prevent Test Completion (Kafka Driver) #416

Open ColinSullivan1 opened 2 weeks ago

ColinSullivan1 commented 2 weeks ago

Hello OpenMessaging maintainers - thank you for your work on this project!

I'm running some very large benchmarks (20k consumers) spread out over 8 very large machines to simulate a large scale test. As the test nears completion (I suspect when results are aggregated) there are numerous consumer errors. These result in the WorkloadGenerator timing out.

Even with smaller tests, I see the timeouts due consumer errors at the end of the tests.

For example, I can see the aggregate high level stats are OK:


15:30:11.050 [main] INFO WorkloadGenerator - Pub rate  8983.0 msg/s /  8.8 MB/s | Pub err     0.0 err/s | Cons rate  8983.0 msg/s /  8.8 MB/s | Backlog:  0.0 K | Pub Latency (ms) avg:  2.0 - 50%:  2.0 - 99%:  2.5 - 99.9%:  6.7 - Max: 31.0 | Pub Delay Latency (us) avg: 57.1 - 50%: 57.0 - 99%: 61.0 - 99.9%: 65.0 - Max: 1462.0
15:30:21.169 [main] INFO WorkloadGenerator - Pub rate  8988.9 msg/s /  8.8 MB/s | Pub err     0.0 err/s | Cons rate  8988.5 msg/s /  8.8 MB/s | Backlog:  0.0 K | Pub Latency (ms) avg:  2.0 - 50%:  2.0 - 99%:  2.5 - 99.9%:  4.6 - Max: 18.3 | Pub Delay Latency (us) avg: 57.3 - 50%: 57.0 - 99%: 64.0 - 99.9%: 67.0 - Max: 124.0
15:30:21.255 [main] INFO WorkloadGenerator - ----- Aggregated Pub Latency (ms) avg:  2.0 - 50%:  2.0 - 95%:  2.3 - 99%:  2.5 - 99.9%:  6.7 - 99.99%: 18.3 - Max: 61.4 | Pub Delay (us)  avg: 57.9 - 50%: 57.0 - 95%: 60.0 - 99%: 64.0 - 99.9%: 70.0 - 99.99%: 1462.0 - Max: 20531.0

There are no errors, no backlog, and steady throughput at the rate I've specified. However, the test times out and the consumer logs show errors. Note that with high throughput tests and fewer numbers of consumers I do not see the issue.

Example Test Setup


name: MyTestDriver
driverClass: io.openmessaging.benchmark.driver.kafka.KafkaBenchmarkDriver

# Kafka client-specific configuration
replicationFactor: 3
reset: true

topicConfig: |

commonConfig: |
  # add additional time for large scale tests. = 30000 = 60000

producerConfig: |

consumerConfig: |
  # try to fix the issue of consumer not consuming messages
  max.poll.records = 50 = 300000 = 90000


name: services-1

# overprovision partitions to rule out contention
partitionsPerTopic: 15
messageSize: 1024
payloadFile: "payload/"
subscriptionsPerTopic: 1
consumerPerSubscription: 12
producersPerTopic: 1
producerRate: 9000
consumerBacklogSizeGB: 0
warmupDurationMinutes: 1
testDurationMinutes: 3

Consumer Errors

Some of the consumer errors include:

15:24:15.049 [pool-116-thread-1] ERROR ConsumerCoordinator - [Consumer clientId=consumer-sub-000-Pp8hAjg-115, groupId=sub-000-Pp8hAjg] Offset commit with offsets {test-topic-0000022-tQSNfvA-0=OffsetAndMetadata{offset=2082, leaderEpoch=null, metadata=''}} failed

org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException

15:24:14.921 [pool-13-thread-1] ERROR ConsumerCoordinator - [Consumer clientId=consumer-sub-000-trBz_PI-12, groupId=sub-000-trBz_PI] Offset commit with offsets {test-topic-0000141-ZzmWoJA-0=OffsetAndMetadata{offset=2080, leaderEpoch=null, metadata=''}} failed
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.

I've attached output and logs of a test that exhibits these symptoms.

benchmark-output.txt sanitized-benchmark-worker.log

Would you have any suggestions for running the benchmarks with extremely large numbers of consumers? Happy to provide more information if you need it.

Edit: Perhaps all of the rebalancing is blocking some requests around shutdown/closing consumers?


ColinSullivan1 commented 2 weeks ago

FYI, I tried setting the internal property, = false which didn't seem to make a difference on the system I was benchmarking.

Adding some instrumentation showed that the Kafka driver's close API was taking up to 800 milliseconds to complete. Waiting for all to consumers to close would have taken quite a long time.

One more data point - as an experiment, I commented out consumer.close() in the Kafka driver, and this test made it completion.

I'm wondering if gathering stats and writing the output file before closing the consumers would work, followed by invoking the consumer.close() APIs in an executor to parallelize work that occurs during the close.

Another option might be to use the close API that accepts a duration, but with 20k consumers I'm not sure if that'd help enough on its own.


ColinSullivan1 commented 2 weeks ago

I notice theworker.stopAll() API is called in the before the method exits (and before the results file is generated). This prevents the results from from being generated as worker.stopAll() takes a very long time to complete and the benchmark times out. Note that worker.stopAll() is also called later on during the workload shutdown.

Removing this line allows me generate the results file. The benchmark still times out from the subsequent worker.stopAll() call is made though, but at least I can get results.

Is worker.stopAll() necessary in