apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.26k stars 3.59k forks source link

[improve] [broker] replace HashMap with inner implementation ConcurrentLongLongPairHashMap in Negative Ack Tracker. #23582

Closed thetumbled closed 6 days ago

thetumbled commented 1 week ago

Motivation

Negative ack feature need to retain the message id and timestamp info in the memory of the consumer client side, leading to great memory consumption. This PR aim to replace the HashMap with the inner map implementation ConcurrentLongLongPairHashMap to reduce the memory consumption. Though HashMap is faster than the inner map implementation ConcurrentLongLongPairHashMap in some cases, but the most important issue in this case is memory consumption instead of the speed.

Some test data list as follows:

experiment 1


    public static void main(String[] args) throws IOException {
        ConcurrentLongLongPairHashMap map1 = ConcurrentLongLongPairHashMap.newBuilder()
                .autoShrink(true)
                .concurrencyLevel(16)
                .build();
        HashMap<MessageId, Long> map2 = new HashMap<>();
        long numMessages = 5000000;
        long ledgerId, entryId, partitionIndex, timestamp;
        for (long i = 0; i < numMessages; i++) {
            ledgerId = 10000+i;
            entryId = i;
            partitionIndex = 0;
            timestamp = System.currentTimeMillis();
            map1.put(ledgerId, entryId, partitionIndex, timestamp);
            map2.put(new MessageIdImpl(ledgerId, entryId, (int)partitionIndex), timestamp);
        }
        System.out.println("map1 size: " + map1.size());
        System.out.println("map2 size: " + map2.size());
        try {
            Thread.sleep(10000000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

HashMap:178Mb ConcurrentLongLongPairHashMap:64Mb

HashMap:566Mb ConcurrentLongLongPairHashMap:256Mb

HashMap:1132MB Approximately each entry consume 1132MB/10000000=118byte.

ConcurrentLongLongPairHashMap:512MB Approximately each entry consume 512MB/10000000=53byte.

With this improvement, we can reduce 50+% of the memory consumption!

experiment 2

Test three candidate data structures:

Test code:

    public static void main(String[] args) throws IOException {
        ConcurrentLongLongPairHashMap map1 = ConcurrentLongLongPairHashMap.newBuilder()
                .autoShrink(true)
                .concurrencyLevel(16)
                .build();
        HashMap<LongPair, Long> map4 = new HashMap<>();
        HashMap<LongLongPair, Long> map5 = new HashMap<>();
        long numMessages = 5000000, numLedgers=100;
        long numEntries = numMessages/numLedgers;
        long ledgerId, entryId, partitionIndex, timestamp;
        for(long i=0; i<numLedgers; i++) {
            ledgerId = 10000+i;
            for(long j=0; j<numEntries; j++) {
                entryId = 10000+j;
                partitionIndex = 0;
                timestamp = System.currentTimeMillis();
                map1.put(ledgerId, entryId, partitionIndex, timestamp);
                map4.put(new LongPair(ledgerId, entryId), timestamp);
                map5.put(LongLongPair.of(ledgerId, entryId), timestamp);
            }
        }

        System.out.println("map1 size: " + map1.size());
        System.out.println("map4 size: " + map4.size());
        System.out.println("map5 size: " + map5.size());
        try {
            Thread.sleep(10000000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

The results list as follows:

Conclusion: HashMap<LongPair, Long> 91MB HashMap<LongLongPair, Long> 114MB ConcurrentLongLongPairHashMap 64MB

It shows that the ConcurrentLongLongPairHashMap is still the best option to store enormous amount of entries.

Modifications

Replace HashMap with ConcurrentLongLongPairHashMap in Negative Ack Tracker.

Verifying this change

(Please pick either of the following options)

This change is already covered by existing tests, such as (please describe tests).

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

Documentation

Matching PR in forked repository

PR in forked repository: https://github.com/thetumbled/pulsar/pull/63

github-actions[bot] commented 1 week ago

@thetumbled Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
lhotari commented 1 week ago

HashMap<LongLongPair, Long>

btw. In Fastutil, there's also Obj2LongMap interface which would be applicable in this case when the value is a long, for example using Object2LongOpenHashMap implementation. In Object2LongOpenHashMap, there's a trim method to reduce the size. I guess the benefit of ConcurrentLongLongPairHashMap is that it has the auto shrink feature.

thetumbled commented 1 week ago

's also Obj2LongMap interface which would be applicable in this case when the value is a long, for example using Object2LongOpenHashMap implementation. In Object2LongOpenHashMap, there's a trim method to reduce

No, there is no shrink logic triggerd in the test code, as i only add new item into the map, without any deletion. Shrinking logic is triggered by item deletion. The reason why ConcurrentLongLongPairHashMap is space efficient is that it use open hash addrressing with linear probing, which require less space to implement, while HashMap require more space to implement the data structure, and there is no any wrapper in ConcurrentLongLongPairHashMap. As for Object2LongOpenHashMap, i guess it take up more space than ConcurrentLongLongPairHashMap too, as it use wrapper. There is no any wrapper in ConcurrentLongLongPairHashMap .