apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.16k stars 3.57k forks source link

PIP-174: Provide new implementation for broker dispatch cache #15954

Open merlimat opened 2 years ago

merlimat commented 2 years ago

Motivation

The current implementation of the read cache in the Pulsar broker has largely remained unchanged for a long time, except for a few minor tweaks.

While the implementation is stable and reasonably efficient for typical workloads, the overhead required for managing the cache evictions in a broker that is running many topics can be pretty high in terms of extra CPU utilization and on the JVM garbage collection to track an increased number of medium-lived objects.

The goal is to provide an alternative implementation that can adapt better to a wider variety of operating conditions.

Current implementation details

The broker cache is implemented as part of the ManagedLedger component, which sits in the Pulsar broker and provides a higher level of abstraction of top of BookKeeper.

Each topic (and managed-ledger) has its own private cache space. This cache is implemented as a ConcurrentSkipList sorted map that maps (ledgerId, entryId) -> payload. The payload is a ByteBuf reference that can either be a slice of a ByteBuf that we got when reading from a socket, or it can be a copied buffer.

Each topic cache is allowed to use the full broker max cache size before an eviction is triggered. The total cache size is effectively a resource shared across all the topics, where a topic can use a more prominent portion of it if it "asks for more".

When the eviction happens, we need to do an expensive ranking of all the caches in the broker and do an eviction in a proportional way to the currently used space for each of them.

The bigger problem is represented by the ConcurrentSkipList and the ByteBuf objects that need to be tracked. The skip list is essentially like a "tree" structure and needs to maintain Java objects for each entry in the cache. We also need to potentially have a huge number of ByteBuf objects.

A cache workload is typically the worst-case scenario for each garbage collector implementation because it involves creating objects, storing them for some amount of time and then throwing them away. During that time, the GC would have already tenured these objects and copy them into an "old generation" space, and sometime later, a costly compaction of that memory would have to be performed.

To mitigate the effect of the cache workload on the GC, we're being very aggressive in purging the cache by triggering time-based eviction. By putting a max TTL on the elements in the cache, we can avoid keeping the objects around for too long to be a problem for the GC.

The reverse side of this is that we're artificially reducing the cache capacity to a very short time frame, reducing the cache usefulness.

The other problem is the CPU cost involved in doing these frequent evictions, which can be very high when there are 10s of thousands of topics in a broker.

Proposed changes

Instead of dealing with individual caches for each topic, let's adopt a model where there is a single cache space for the broker.

This cache is broken into N segments which act as a circular buffer. Whenever a segment is full, we start writing into the next one, and when we reach the last one, we will restart recycling the first segment.

This model has been working very well for the BookKeeper ReadCache: https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java

The eviction becomes a completely trivial operation, buffers are just rotated and overwritten. We don't need to do any per-topic task or keep track of utilization.

Today, there are 2 ways of configuring the cache, one that "copies" data into the cache and another that will just use reference-counting on the original buffers to avoid payload copies.

Memory copies into the cache

Each segment is composed of a buffer, an offset, and a hashmap which maps (ledgerId, entryId) -> offset.

The advantage of this approach is that entries are copied into the cache buffer (in direct memory), and we don't need to keep any long-lived Java objects around

Keeping reference-counted buffers in the cache

Each segment in the cache will contain a map (ledgerId, entryId) -> ByteBuf. Buffers will have an increase reference count that will keep the data alive as long as the buffer is in the cache and it will be released when the cache segment is rotated.

The advantage is we avoid any memory copy when inserting into or reading from the cache. The disadvantage is that we will have references to all the ByteBuf objects that are in the cache.

API changes

No user-facing API changes are required.

New configuration options

The existing cache implementation will not be removed at this point. Users will be able to configure the old implementation in broker.conf.

This option will be helpful in case of performance regressions would be seen for some use cases with the new cache implementation.

lhotari commented 2 years ago

btw. The current broker cache seems to be broken for scenarios where there are multiple active cursors (/consumers). That happened in 2.8.2 / 2.9.0 . I have created an issue #16054 .

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.