Closed blindspotbounty closed 2 weeks ago
I am curious to where this slow down come from. Is it from how we are polling Kafka or just the fact that every next
call on the iterator has to go through the lock. If it is the latter we need to come up with something in the stdlib. Potentially a few @inlinable
annotations could also go a long way here.
Hi @FranzBusch! Thank you for your thoughts. Sorry for the late response - was on vacation for a while. Unfortunately, I did not profile code, so can't say for sure immediately.
My assumption is that there are coincidence of 2 factors (I have to profile and check to prove it though):
However, there is something else here because I think I've tried to make sequence that returns one-by-one messages but inside operates with arrays and it was not as good as providing arrays as is
. Though, need to double check that as well.
I've made two examples in the fork for single and multiple messages transferred throw async sequence. Note, that I could not see the difference in docker because the overall execution was too slow. So, tests were performed with bare metal and with RedPanda (Redpanda just because it was installed).
bulk messages
Start consuming
read up to 127730 in partition 2, ctr: 750031, rate: 231962 (10839KB/s), avgRate: (10839KB/s), timePassed: 3sec
read up to 260882 in partition 5, ctr: 1500101, rate: 187978 (8933KB/s), avgRate: (9786KB/s), timePassed: 7sec
read up to 371852 in partition 4, ctr: 2250147, rate: 187244 (8959KB/s), avgRate: (9491KB/s), timePassed: 11sec
read up to 492048 in partition 3, ctr: 3000200, rate: 187757 (8984KB/s), avgRate: (9358KB/s), timePassed: 15sec
read up to 621339 in partition 2, ctr: 3750202, rate: 187910 (8991KB/s), avgRate: (9282KB/s), timePassed: 19sec
read up to 749399 in partition 1, ctr: 4500242, rate: 188128 (9002KB/s), avgRate: (9234KB/s), timePassed: 23sec
read up to 872846 in partition 0, ctr: 5250322, rate: 241929 (11576KB/s), avgRate: (9510KB/s), timePassed: 26sec
read up to 999513 in partition 4, ctr: 6000339, rate: 187922 (8992KB/s), avgRate: (9442KB/s), timePassed: 30sec
read up to 1113565 in partition 3, ctr: 6750400, rate: 187136 (8954KB/s), avgRate: (9385KB/s), timePassed: 34sec
read up to 1253273 in partition 5, ctr: 7500486, rate: 187697 (8981KB/s), avgRate: (9343KB/s), timePassed: 38sec
read up to 1376083 in partition 3, ctr: 8250571, rate: 189206 (9053KB/s), avgRate: (9315KB/s), timePassed: 42sec
read up to 1495120 in partition 2, ctr: 9000604, rate: 187918 (8992KB/s), avgRate: (9288KB/s), timePassed: 46sec
read up to 1636007 in partition 0, ctr: 9750698, rate: 188045 (8998KB/s), avgRate: (9265KB/s), timePassed: 50sec
read up to 1753974 in partition 0, ctr: 10500766, rate: 241701 (11723KB/s), avgRate: (9408KB/s), timePassed: 53sec
read up to 1872605 in partition 0, ctr: 11250822, rate: 186572 (9109KB/s), avgRate: (9387KB/s), timePassed: 57sec
read up to 1996171 in partition 0, ctr: 12000853, rate: 188279 (9193KB/s), avgRate: (9374KB/s), timePassed: 61sec
read up to 2128611 in partition 5, ctr: 12750944, rate: 187247 (9142KB/s), avgRate: (9360KB/s), timePassed: 65sec
read up to 2252451 in partition 5, ctr: 13500975, rate: 187131 (9137KB/s), avgRate: (9347KB/s), timePassed: 69sec
read up to 2383333 in partition 5, ctr: 14250999, rate: 187491 (9154KB/s), avgRate: (9336KB/s), timePassed: 73sec
All read up to ctr: 15000000, avgRate: (9222KB/s), timePassed: 78sec
Finish consuming
single messages
Start consuming
read up to 127699 in partition 2, ctr: 750000, rate: 232178 (10849KB/s), avgRate: (10849KB/s), timePassed: 3sec
read up to 260781 in partition 5, ctr: 1500000, rate: 188189 (8943KB/s), avgRate: (9797KB/s), timePassed: 7sec
read up to 371705 in partition 4, ctr: 2250000, rate: 187622 (8978KB/s), avgRate: (9505KB/s), timePassed: 11sec
read up to 491848 in partition 3, ctr: 3000000, rate: 187857 (8989KB/s), avgRate: (9369KB/s), timePassed: 15sec
read up to 621137 in partition 2, ctr: 3750000, rate: 187526 (8973KB/s), avgRate: (9287KB/s), timePassed: 19sec
read up to 749157 in partition 1, ctr: 4500000, rate: 187730 (8983KB/s), avgRate: (9234KB/s), timePassed: 23sec
read up to 872524 in partition 0, ctr: 5250000, rate: 242031 (11581KB/s), avgRate: (9511KB/s), timePassed: 26sec
read up to 999174 in partition 4, ctr: 6000000, rate: 187677 (8980KB/s), avgRate: (9441KB/s), timePassed: 30sec
read up to 1130808 in partition 2, ctr: 6750000, rate: 188368 (9013KB/s), avgRate: (9391KB/s), timePassed: 34sec
read up to 1252787 in partition 5, ctr: 7500000, rate: 187679 (8980KB/s), avgRate: (9348KB/s), timePassed: 38sec
read up to 1375512 in partition 3, ctr: 8250000, rate: 188217 (9006KB/s), avgRate: (9316KB/s), timePassed: 42sec
read up to 1494516 in partition 2, ctr: 9000000, rate: 188002 (8996KB/s), avgRate: (9288KB/s), timePassed: 46sec
read up to 1635309 in partition 0, ctr: 9750000, rate: 188564 (9023KB/s), avgRate: (9267KB/s), timePassed: 50sec
read up to 1753208 in partition 0, ctr: 10500000, rate: 241318 (11704KB/s), avgRate: (9409KB/s), timePassed: 53sec
read up to 1871783 in partition 0, ctr: 11250000, rate: 186998 (9130KB/s), avgRate: (9390KB/s), timePassed: 57sec
read up to 1995318 in partition 0, ctr: 12000000, rate: 187522 (9156KB/s), avgRate: (9375KB/s), timePassed: 61sec
read up to 2127667 in partition 5, ctr: 12750000, rate: 187701 (9165KB/s), avgRate: (9362KB/s), timePassed: 65sec
read up to 2251476 in partition 5, ctr: 13500000, rate: 187220 (9141KB/s), avgRate: (9349KB/s), timePassed: 69sec
read up to 2382334 in partition 5, ctr: 14250000, rate: 187237 (9142KB/s), avgRate: (9338KB/s), timePassed: 73sec
read up to 2501196 in partition 1, ctr: 15000000, rate: 187098 (9135KB/s), avgRate: (9327KB/s), timePassed: 77sec
All read up to ctr: 15000000, avgRate: (9327KB/s), timePassed: 77sec
single messages
Start consuming
read up to 132363 in partition 1, ctr: 750000, rate: 42409 (1981KB/s), avgRate: (1981KB/s), timePassed: 17sec
read up to 248823 in partition 0, ctr: 1500000, rate: 42277 (2009KB/s), avgRate: (1995KB/s), timePassed: 35sec
read up to 377336 in partition 4, ctr: 2250000, rate: 43744 (2093KB/s), avgRate: (2027KB/s), timePassed: 52sec
read up to 500301 in partition 3, ctr: 3000000, rate: 42628 (2039KB/s), avgRate: (2030KB/s), timePassed: 70sec
read up to 624243 in partition 2, ctr: 3750000, rate: 42295 (2023KB/s), avgRate: (2029KB/s), timePassed: 87sec
read up to 748529 in partition 1, ctr: 4500000, rate: 42859 (2050KB/s), avgRate: (2032KB/s), timePassed: 105sec
read up to 877719 in partition 0, ctr: 5250000, rate: 42891 (2052KB/s), avgRate: (2035KB/s), timePassed: 122sec
read up to 1000660 in partition 5, ctr: 6000000, rate: 42502 (2033KB/s), avgRate: (2035KB/s), timePassed: 140sec
read up to 1123272 in partition 3, ctr: 6750000, rate: 43075 (2061KB/s), avgRate: (2038KB/s), timePassed: 157sec
read up to 1245166 in partition 2, ctr: 7500000, rate: 43042 (2059KB/s), avgRate: (2040KB/s), timePassed: 175sec
read up to 1377618 in partition 1, ctr: 8250000, rate: 43203 (2067KB/s), avgRate: (2042KB/s), timePassed: 192sec
read up to 1502962 in partition 0, ctr: 9000000, rate: 43525 (2082KB/s), avgRate: (2046KB/s), timePassed: 209sec
read up to 1625774 in partition 5, ctr: 9750000, rate: 43286 (2071KB/s), avgRate: (2047KB/s), timePassed: 227sec
read up to 1749433 in partition 4, ctr: 10500000, rate: 43500 (2109KB/s), avgRate: (2052KB/s), timePassed: 244sec
read up to 1873427 in partition 4, ctr: 11250000, rate: 43004 (2099KB/s), avgRate: (2055KB/s), timePassed: 261sec
read up to 1995651 in partition 4, ctr: 12000000, rate: 42952 (2097KB/s), avgRate: (2058KB/s), timePassed: 279sec
read up to 2118730 in partition 4, ctr: 12750000, rate: 43041 (2101KB/s), avgRate: (2060KB/s), timePassed: 296sec
read up to 2246627 in partition 4, ctr: 13500000, rate: 42915 (2095KB/s), avgRate: (2062KB/s), timePassed: 314sec
read up to 2373631 in partition 5, ctr: 14250000, rate: 41108 (2007KB/s), avgRate: (2059KB/s), timePassed: 332sec
read up to 2502202 in partition 4, ctr: 15000000, rate: 41532 (2027KB/s), avgRate: (2057KB/s), timePassed: 350sec
All read up to ctr: 15000000, avgRate: (2057KB/s), timePassed: 350sec
Finish consuming
The slight difference between 1 and 2 I guess related to rate calculation, while it seems that for the 3rd async sequence overhead make the most impact.
Even though we know that the performance problem somewhere in AsyncSequence that might require use better synchronisation primitives or any other improvements in stdlib, it still would be nice to have bulk updates. Our use case is to write those updates directly to our cache database which would be preferable to have the in bulks. Since eventPoll accumulates an array of events, it can aggregate all consumer messages to array. Additionally, if end user needs per message iterator, that iterator could be wrapped easily (as shown in 1st branch).
@FranzBusch could you suggest if you have something in mind on that, please?
Hi @felixschlegel , @FranzBusch! I think I need to do a step back and elaborate a bit why @blindspotbounty suggests to implement reading in batches.
We did some measurements internally with small app which just reads data from Kafka and does nothing. Just to understand maximum reading throughput we can get from Kafka. At the moment swift client is slower (@blindspotbounty will provide exact numbers).
@blindspotbounty , maybe it's even better to add benchmark test to the swift-kafka-client library that @felixschlegel and @FranzBusch will have reproducible test at hands.
So we are thinking what kind of improvements we can suggest to the swift client to get parity with native librdkafka reading throughput.
As one possible solution, as @FranzBusch mention, we can optimise AsyncStream to make it much more efficient for reading / writing, another solutions is, much more extreme and maybe not desired, the swift client can expose native poll
interface and do not expose swift concurrency friendly interface with streams. Yet another possible approach is to provide data to consumer in batches, which seems give per message reading performance quite close to librdkafka reading performance.
So, to summarise, I would rename the case to Optimise reading performance to get as close as possible to librdkafka performance
. I hope we all agree it would be very desired! As next step it would be great to get your input about how you envision this optimisation. Based on that we can do some work in our fork which later on we can shared with you and do PR(s).
I do understand the necessity to improve performance and it is something we haven't focused on yet. I agree that the first step would be to setup a benchmarking target similar to what we have done in swift-nio
and swift-certificates
. This will give us a good baseline to see where we are starting from.
On the different approaches, I think the poll
based on is really a last resort and I am quite sure we can do better than that. I do think that the batch consumption is a very pressing topic for AsyncSequence
s in general and we do want to solve this holistically and I would love to dodge having special code in kafka just to cater for it.
Though first things first, let's add a benchmark and then actually see the instrument traces where we are spending our time and allocations.
@mr-swifter thank you for your input!
The bulks is not the target but rather one of possible solution to speedup current implementation to be able to make it comparable with librdkafka.
Sorry that I didn't provide it before... But to show the difference, I've made two tests in one executable (available at https://github.com/ordo-one/swift-kafka-client/tree/pure-librdkafka-poll). The branch is based on swift-kafka-client/main.
Test creates unique topic (similar to tests) and produce 15_000_000
messages. Then read them one of the following ways:
To reproduce locally, they can be run with the following command:
swift run -c release
Results for (1):
All read up to ctr: 15000000, avgRate: (1979KB/s), timePassed: 364sec
Results for (2):
All read up to ctr: 15000000, avgRate: (133266KB/s), timePassed: 5sec
I've also added docker-compose-consumer-performance.yaml
in branch. Unfortunately, in docker it is too slow to generate 15_000_000
, so I've set number of messages to 3_000_000
(can be tweaked with MESSAGES_NUMBER
environment variable).
Docker command would be similar to tests:
docker-compose -f docker/docker-compose-consumer-performance.yaml run test
Results for (1) in docker:
All read up to ctr: 3000000, avgRate: (2442KB/s), timePassed: 58sec
Results for (2) in docker:
All read up to ctr: 3000000, avgRate: (159903KB/s), timePassed: 0sec
Finally, the difference between running with native librdkafka and swift-kafka-client interface is about 65x.
I believe there are two places where swift-kafka-client spend a lot of time:
KafkaConsumerMessage
The above branch is main with slight modification exclusively for this test. Therefore, it is possible to test/profile swift-kafka-client code directly using that test executable.
@FranzBusch, @felixschlegel maybe that code will help in benchmarking and probably comparison with librdkafka as baseline.
@blindspotbounty we should set up an embedded benchmark target as suggested - please coordinate with @mr-swifter as I’ll be away a couple of days.
@hassila, @mr-swifter added a draft PR with both tests. Hope that will be helpful :)
@blindspotbounty are we happy with the performance now?
@FranzBusch yeah, after moving to direct access of librdkafka it is not a problem anymore: https://github.com/swift-server/swift-kafka-client/pull/158
So, this case we should consider closed!
Currently, messages in
KafkaConsumer
provide messages one by one. That is convenient, however it is not efficient for reading big topics, especially on service recovery.I've made a small experiment by changing
consumerMessages
enum to accept array instead of single message and pack all messages from single poll to one event.Also changed
messages
andKafkaConsumerMessages
to provide bulks.Then tested that with simple consumer applications. For single messages:
With results:
For bulk:
Results:
The latter shows results that are near 1Gbps network limits.
This is interesting as it is mostly done in the library and very natural with current poll implementation (as in some librdkafka examples) but not provided to end user. From our perspective, that is especially useful when application require recovery from a huge topic(s) and needs to cache data e.g. in database, so it can receive and use bulk data.