bulldog2011 / bigqueue

A big, fast and persistent queue based on memory mapped file.
http://bulldog2011.github.com/bigqueue
Apache License 2.0
558 stars 219 forks source link

MappedByteBuffer not release due to JDK11 does not support sun.misc.Cleaner #39

Open 0tanks opened 4 years ago

0tanks commented 4 years ago

Hi,

MappedPageImpl current using sun.misc.Cleaner to clean up the mapped byte buffer of the index/data page files.

class sun.misc.Cleaner doest not exist in JDK11, for those app run on JDK11 will have issue on disk release.

with Ubuntu 18/OpenJDK11, use below command you can see the deleted files, and there didn't didn't release, until restart the app.

lsof -p | grep DEL

use htop command will see the process with high VIRT (run to 100G+)

There have sun.misc.Unsafe.invokeCleaner(java.nio.ByteBuffer directBuffer) available able to resolve that issue.

patch.diff.tar.gz

kcbaltz commented 2 years ago

@0tanks Do you have an example for how to reproduce this issue? I tried running the unit tests and didn't see any output from the lsof -p <pid> | grep DEL command you gave.

0tanks commented 2 years ago

@kcbaltz

I am grad that you are looking on it.

Refer to the test code below, and the screenshots after running for 1hrs+ (Ubuntu 22.04 + openjdk11)

import com.leansoft.bigqueue.BigQueueImpl; import com.leansoft.bigqueue.IBigQueue; import java.io.IOException; import static com.leansoft.bigqueue.BigArrayImpl.MINIMUM_DATA_PAGE_SIZE;

public class Test {

public static void main(String[] args) throws Exception {

    new Test().test();
}

public void test() throws IOException, InterruptedException {
    String queueDir = "./bigqueue";
    String queueName = "demo";
    long pid = getPID();
    long time = System.currentTimeMillis();
    System.out.println("pid: " + pid);

    IBigQueue bigQueue = new BigQueueImpl(queueDir, queueName, MINIMUM_DATA_PAGE_SIZE);
    bigQueue.gc();

    DataProducer producer = new DataProducer(bigQueue);
    DataConsumer consumer = new DataConsumer(bigQueue);

    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        System.out.println("shutdown.." + ((System.currentTimeMillis() - time) / 1000));
        producer.stop();
        consumer.stop();
        try {
            System.out.println("gc..");
            bigQueue.gc();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        try {
            System.out.println("close..");
            bigQueue.close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }));

    new Thread(consumer, "consumer").start();
    new Thread(producer, "producer").start();
    int count = Integer.MAX_VALUE / 10;
    int i = 0;
    while (i < count) {
        i++;
        Thread.sleep(10_000);
        bigQueue.gc();
        System.out.println("queue size: " + bigQueue.size());
    }
}

public static long getPID() {
    String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
    return Long.parseLong(processName.split("@")[0]);
}

private static final String data = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";

class DataProducer implements Runnable {
    boolean stop = false;
    IBigQueue bigQueue = null;

    public DataProducer(IBigQueue bigQueue) {
        this.bigQueue = bigQueue;
    }

    public void stop() {
        this.stop = true;
    }

    @Override
    public void run() {
        int noOfMsg = 0;
        while (!stop) {
            noOfMsg++;
            try {
                bigQueue.enqueue(data.getBytes());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            if (noOfMsg % 10000 == 0) {
                System.out.println("write: noOfMsg: " + noOfMsg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            }
        }
    }
}

class DataConsumer implements Runnable {
    boolean stop = false;
    IBigQueue bigQueue = null;

    public DataConsumer(IBigQueue bigQueue) {
        this.bigQueue = bigQueue;
    }

    public void stop() {
        this.stop = true;
    }

    @Override
    public void run() {
        int noOfMsg = 0;
        while (!stop) {
            try {
                byte[] data = bigQueue.dequeue();
                if (data == null) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    noOfMsg++;
                }
                if (noOfMsg % 10000 == 0) {
                    System.out.println("consume: noOfMsg: " + noOfMsg);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

}

bq_Screenshot from 2022-06-18 18-22-33 Screenshot from 2022-06-18 19-55-16