apache / incubator-uniffle

Uniffle is a high performance, general purpose Remote Shuffle Service.
https://uniffle.apache.org/
Apache License 2.0
381 stars 148 forks source link

[Improvement] Add Netty buffer leak detection in integration test. #1153

Open qijiale76 opened 1 year ago

qijiale76 commented 1 year ago

Code of Conduct

Search before asking

What would you like to be improved?

As mentioned in https://github.com/apache/incubator-uniffle/issues/1152, by adding -Dio.netty.leakDetection.level=advanced, we discovered the problem of buffer leak in Netty. It is necessary to include this leak detection feature in integration testing to better identify such problems.

[ERROR] 2023-08-16 12:56:11,453 Grpc-146 ResourceLeakDetector reportTracedLeak - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
        io.netty.buffer.AdvancedLeakAwareByteBuf.order(AdvancedLeakAwareByteBuf.java:71)
        io.netty.buffer.CompositeByteBuf.newComponent(CompositeByteBuf.java:346)
        io.netty.buffer.CompositeByteBuf.consolidate0(CompositeByteBuf.java:1758)
        io.netty.buffer.CompositeByteBuf.consolidateIfNeeded(CompositeByteBuf.java:571)
        io.netty.buffer.CompositeByteBuf.addComponent(CompositeByteBuf.java:266)
        io.netty.buffer.CompositeByteBuf.addComponent(CompositeByteBuf.java:222)
        org.apache.uniffle.server.buffer.ShuffleBuffer.updateShuffleData(ShuffleBuffer.java:272)
        org.apache.uniffle.server.buffer.ShuffleBuffer.getShuffleData(ShuffleBuffer.java:169)
        org.apache.uniffle.server.buffer.ShuffleBufferManager.getShuffleData(ShuffleBufferManager.java:231)
        org.apache.uniffle.server.ShuffleTaskManager.getInMemoryShuffleData(ShuffleTaskManager.java:516)
        org.apache.uniffle.server.ShuffleServerGrpcService.getMemoryShuffleData(ShuffleServerGrpcService.java:798)
        org.apache.uniffle.proto.ShuffleServerGrpc$MethodHandlers.invoke(ShuffleServerGrpc.java:1053)
        io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
        io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
        io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
        io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
        io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
        io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        java.base/java.lang.Thread.run(Thread.java:829)

......

: 14 leak records were discarded because they were duplicates
: 1010 leak records were discarded because the leak record count is targeted to 4. Use system property io.netty.leakDetection.targetRecords to increase the limit.

Ref: https://netty.io/wiki/reference-counted-objects.html

How should we improve?

No response

Are you willing to submit PR?

zuston commented 1 year ago

Nice feature!

rickyma commented 10 months ago

@qijiale76 Any progress here?

qijiale76 commented 10 months ago

@qijiale76 Any progress here?

@rickyma I have attempted to address this issue. It's fairly straightforward to add tests related to memory leaks, and using the older version of the code that hasn't fixed the bug, these leaks can be detected. However, I haven't found a method to cause the entire unit test to fail when a memory leak occurs. If you have any effective solutions, please let me know. Thank you for your attention.

rickyma commented 4 months ago

I think you can use a custom ResourceLeakDetector:

import io.netty.util.ResourceLeakDetector;
import io.netty.util.ResourceLeak;
import io.netty.util.internal.StringUtil;

public class CustomResourceLeakDetector<T> extends ResourceLeakDetector<T> {

    public CustomResourceLeakDetector(Class<?> resourceType, int samplingInterval) {
        super(resourceType, samplingInterval);
    }

    @Override
    protected void reportTracedLeak(String records, String additionalString) {
        throw new AssertionError("Resource leak detected: " + records + StringUtil.NEWLINE + additionalString);
    }

    @Override
    protected void reportUntracedLeak(String records) {
        throw new AssertionError("Resource leak detected: " + records);
    }
}

And the demo test code could be:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.ResourceLeakDetectorFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class NettyLeakTest {

    private CustomResourceLeakDetector<ByteBuf> leakDetector;

    @Before
    public void setUp() {
        // Create a custom ResourceLeakDetector
        leakDetector = new CustomResourceLeakDetector<>(ByteBuf.class, 1);
        // Set ResourceLeakDetectorFactory to the custom ResourceLeakDetector
        ResourceLeakDetectorFactory.setResourceLeakDetectorFactory(() -> leakDetector);
    }

    @Test
    public void testLeak() {
        // Write your business code here
        ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
        ByteBuf buffer = allocator.buffer();
        // Intentionally do not release the buffer to trigger leak detection
        // buffer.release();
    }

    @After
    public void tearDown() {
        // Check for leaks after the test is finished
        // If a leak is detected, the custom ResourceLeakDetector will throw an AssertionError, causing the JUnit test to fail
        leakDetector.reportLeak();
    }
}

The code might not be workable. You can dig into it. Help Uniffle to gain the ability to find out potential leaks during tests.

@qijiale76