apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
13.94k stars 3.53k forks source link

[Enhancement] Improve Pulsar Broker memory usage metrics which are also used in the Pulsar Load Manager #21973

Open lhotari opened 5 months ago

lhotari commented 5 months ago

Search before asking

Motivation

Currently Pulsar uses heap usage as one of the resource metrics for the Pulsar Load Manager. This is the code that is used: https://github.com/apache/pulsar/blob/3158fd3550f9e3a0b2c0316c92265318b209f4f5/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java#L321-L336 Overall heap usage is not a very useful metric since heap usage varies in the JVM because of the way memory management works.

Solution

The OpenJDK JVM provides better ways to track actual memory usage. One possibility is to observe the memory usage after GC events (minor & major). The GC events can be used to calculate a better estimation of how much of the memory is used by the objects retained in the heap.

On OpenJDK there is a JMX API for registering a listener to receive GC events.

This is documented in com.sun.management.GarbageCollectionNotificationInfo . The GcInfo included in the event: com.sun.management.GcInfo

GC events are handled separately for different memory pools (or spaces, referred to "eden", "survivor", "tenured", etc., depending on GC implementation). It makes usually only sense to track the usage of the tenured memory pool ("G1 Old Gen" for G1GC) after a GC event. The value is accurate after a Full GC. It's possible to have some way to calculate an estimate after GC events where the tenured space is partially GCed. For example, taking the last Full GC event and calculating an average after a partial GC event. This all works well for at least for CMS, Parallel and G1GC on OpenJDK. ZGC might behave differently but it should be possible to find a good metric also for ZGC based on the GC events.

ChatGPT generated this kind of sample code for observing GC events on OpenJDK. It looks about right:

import java.lang.management.*;
import javax.management.*;
import javax.management.openmbean.CompositeData;
import com.sun.management.GarbageCollectionNotificationInfo;
import com.sun.management.GcInfo;

public class GcNotificationListener {

    public static void main(String[] args) throws Exception {
        // Get the platform MBeanServer
        MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();

        // Listen for GarbageCollectorMXBean notifications
        for (GarbageCollectorMXBean gcMxBean : ManagementFactory.getGarbageCollectorMXBeans()) {
            NotificationEmitter emitter = (NotificationEmitter) gcMxBean;
            NotificationListener listener = (notification, handback) -> {
                if (notification.getType().equals(GarbageCollectionNotificationInfo.GARBAGE_COLLECTION_NOTIFICATION)) {
                    // Extract the GarbageCollectionNotificationInfo
                    CompositeData cd = (CompositeData) notification.getUserData();
                    GarbageCollectionNotificationInfo info = GarbageCollectionNotificationInfo.from(cd);

                    // Get the GcInfo object
                    GcInfo gcInfo = info.getGcInfo();

                    // Here you can use the gcInfo object
                    System.out.println("GC Name: " + info.getGcName());
                    System.out.println("GC Action: " + info.getGcAction());
                    System.out.println("GC Cause: " + info.getGcCause());
                    System.out.println("GC Info: " + gcInfo.toString());
                }
            };

            emitter.addNotificationListener(listener, null, null);
        }

        // Keep the program running to listen for GC events
        System.out.println("Listening for GC events...");
        Thread.sleep(Long.MAX_VALUE);
    }
}

Alternatives

There isn't really good alternatives for estimating the memory usage of objects that are long living in the JVM heap. In OpenJDK, adding a listener for GC events is the best way to estimate this.

Anything else?

The JVM also has a separate notification interface for detecting low memory conditions. It's possible to set a threshold for memory usage and an event will be emitted when the threshold is crossed. This is documented in MemoryPoolMXBean. This is useful for JVM health checks, but that would be a different problem to solve than the memory usage metrics.

ChatGPT generated this kind of example for MemoryPoolMXBean.setCollectionUsageThreshold. Sharing it here just to show a complete picture of the JVM interfaces for monitoring low memory conditions:

import javax.management.*;
import javax.management.openmbean.CompositeData;
import java.lang.management.*;
import java.util.List;
import com.sun.management.MemoryNotificationInfo;
import com.sun.management.MemoryPoolMXBean;

public class MemoryThresholdListener {

    public static void main(String[] args) throws Exception {
        // Get the platform MBean server
        MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();

        // Obtain the MemoryMXBean
        MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();

        // Register a listener for memory notifications
        NotificationEmitter emitter = (NotificationEmitter) memoryMXBean;
        NotificationListener listener = (notification, handback) -> {
            if (MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED.equals(notification.getType())) {
                CompositeData cd = (CompositeData) notification.getUserData();
                MemoryNotificationInfo info = MemoryNotificationInfo.from(cd);
                System.out.println("Memory collection threshold exceeded");
                System.out.println("Memory usage details: " + info.getUsage());
            }
        };
        emitter.addNotificationListener(listener, null, null);

        // Set a usage threshold on a MemoryPoolMXBean
        List<MemoryPoolMXBean> pools = ManagementFactory.getMemoryPoolMXBeans();
        for (MemoryPoolMXBean pool : pools) {
            // Example: Set the threshold on the Old Gen pool
            if (pool.getName().contains("Old Gen")) {
                // Set the threshold to 90% of the current usage
                long max = pool.getUsage().getMax();
                pool.setCollectionUsageThreshold((long) (max * 0.9));
                System.out.println("Threshold set for " + pool.getName());
            }
        }

        // Keep the program running to listen for memory threshold exceed notifications
        System.out.println("Listening for memory threshold exceed notifications...");
        Thread.sleep(Long.MAX_VALUE);
    }
}

Are you willing to submit a PR?

lhotari commented 5 months ago

Related to the problem described in the description of this PR: #3199

lhotari commented 5 months ago

The problem is also reported in #16564

lhotari commented 4 months ago

also related: https://github.com/apache/pulsar/pull/21168 and https://github.com/apache/pulsar/pull/19559

lhotari commented 1 month ago

There's also Otel runtime-telemetry-java17 that collects metrics using JMX and JFR: https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/instrumentation/runtime-telemetry/runtime-telemetry-java17/library process.runtime.jvm.memory.usage_after_last_gc is similar to what is described in the issue description, to observe the memory usage after GC events.

The OpenJDK JVM provides better ways to track actual memory usage. One possibility is to observe the memory usage after GC events (minor & major). The GC events can be used to calculate a better estimation of how much of the memory is used by the objects retained in the heap.

The benefit of using JMX API directly is that there won't be dependencies on Otel libraries.