apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.16k stars 11.66k forks source link

[Bug] Memory leakage detected when IndexStoreService is enabled and TieredMessageStore's FileSegment is PosixFileSegment #8017

Open bxfjb opened 6 months ago

bxfjb commented 6 months ago

Before Creating the Bug Report

Runtime platform environment

CentOS 7

RocketMQ version

branch: develop

JDK Version

OpenJDK 8

Describe the Bug

2024-04-11 09:57:22 INFO ShutdownHook - Shutdown hook was invoked, 1

内存上涨

public boolean doCompactThenUploadFile(IndexFile indexFile) {
        ...  
        boolean result = flatAppendFile.commitAsync().join();
        ...
}
@Override
    @SuppressWarnings("ResultOfMethodCallIgnored")
    public CompletableFuture<Boolean> commit0(
        FileSegmentInputStream inputStream, long position, int length, boolean append) {

        return CompletableFuture.supplyAsync(() -> {
            try {
                byte[] byteArray = ByteStreams.toByteArray(inputStream);
                writeFileChannel.position(position);
                ByteBuffer buffer = ByteBuffer.wrap(byteArray);
                while (buffer.hasRemaining()) {
                    writeFileChannel.write(buffer);
                }
                writeFileChannel.force(true);
            } catch (Exception e) {
                return false;
            }
            return true;
        }, MessageStoreExecutor.getInstance().bufferCommitExecutor);
    }
private static class BufferCache {
    private ByteBuffer[] buffers;
    private int count;
    private int start;
    ...
}

private static ThreadLocal<BufferCache> bufferCache;
  1. find a DirectByteBuffer which is big enough to store data of buffer, if there is not, allocate one;
  2. write data from DirectByteBuffer to pagecache;
  3. return the DirectByteBuffer to BufferCache and do not release it;
public class FileWrite {
    private FileChannel readFileChannel;
    private FileChannel writeFileChannel;
    private ExecutorService executor;
    private final String placeHolder = new String(new char[60000]).replace('\0', '1');

    public FileWrite(String path) throws Exception {
        this.readFileChannel = new RandomAccessFile(new File(path), "r").getChannel();
        this.writeFileChannel = new RandomAccessFile(new File(path), "rwd").getChannel();
        this.executor = new ThreadPoolExecutor(
                Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
                Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
                60 * 1000,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(100));
    }

    public void run() throws InterruptedException {
        for (int i = 0;i < 100;++i) {
            writeAsync();
            Thread.sleep(5 * 1000)
        }
    }

    public CompletableFuture<Boolean> writeAsync() {
        return CompletableFuture.supplyAsync(this::write, executor);
    }
}

buffer

buffer_limit

Steps to Reproduce

2 choices:

  1. Deploy a rocketmq cluster with TieredStore and offload cold data to hdd with PosixFileSegment, keep on producing new messages and the direct memory usage should increase unlimitedly.
  2. Or run IndexStoreServiceTest.doCompactionTest and keep an eye on the direct buffer usage.

What Did You Expect to See?

The direct memory usage should not increase unlimitedly.

What Did You See Instead?

The direct memory usage keeps on increasing until OOM or all threads in commitExecutor run commit0 at least once.

Additional Context

There might be some solution of this bug:

  1. Reduce the size of bufferCommitExecutor;
  2. Limit the size of BufferCache by config -Djdk.nio.maxCachedBufferSize;
  3. Limit the size of inputStream in PosixFileSegment.commit0's parameter list;
bxfjb commented 6 months ago

We may introduce a DirectByteBuffer pool like TransientStorePool or use MappedByteBuffer

caigy commented 6 months ago

ByteStreams.toByteArray() creates large array in memory and is unnecessary. It seems FileChannel.transferFrom() with NIO channel would be more efficient for writing streams to a file.

bxfjb commented 6 months ago

ByteStreams.toByteArray() creates large array in memory and is unnecessary. It seems FileChannel.transferFrom() with NIO channel would be more efficient for writing streams to a file.

Thx for reply, I tried FileChannel.transferFrom() but find out some unittest failed:

@Test
public void consumeQueueTest() throws ClassNotFoundException, NoSuchMethodException {
    ...
    ByteBuffer cqItem1 = fileSegment.read(initPosition, unitSize);
    Assert.assertEquals(baseOffset, cqItem1.getLong());  // failed
    ...
}

java.lang.AssertionError: Expected :1000 Actual :0

It turns out the cq data was not flush correctly by debugging, I wonder if the way I use FileChannel.transferFrom() is wrong: Before:

                byte[] byteArray = ByteStreams.toByteArray(inputStream);
                writeFileChannel.position(position);
                ByteBuffer buffer = ByteBuffer.wrap(byteArray);
                while (buffer.hasRemaining()) {
                    writeFileChannel.write(buffer);
                }
                writeFileChannel.force(true);

After:

                ReadableByteChannel readableByteChannel = Channels.newChannel(inputStream);
                writeFileChannel.transferFrom(readableByteChannel, position, length);
                writeFileChannel.force(true);
caigy commented 6 months ago

ByteStreams.toByteArray() creates large array in memory and is unnecessary. It seems FileChannel.transferFrom() with NIO channel would be more efficient for writing streams to a file.

Thx for reply, I tried FileChannel.transferFrom() but find out some unittest failed:

@Test
public void consumeQueueTest() throws ClassNotFoundException, NoSuchMethodException {
    ...
    ByteBuffer cqItem1 = fileSegment.read(initPosition, unitSize);
    Assert.assertEquals(baseOffset, cqItem1.getLong());  // failed
    ...
}

java.lang.AssertionError: Expected :1000 Actual :0

It turns out the cq data was not flush correctly by debugging, I wonder if the way I use FileChannel.transferFrom() is wrong: Before:

                byte[] byteArray = ByteStreams.toByteArray(inputStream);
                writeFileChannel.position(position);
                ByteBuffer buffer = ByteBuffer.wrap(byteArray);
                while (buffer.hasRemaining()) {
                    writeFileChannel.write(buffer);
                }
                writeFileChannel.force(true);

After:

                ReadableByteChannel readableByteChannel = Channels.newChannel(inputStream);
                writeFileChannel.transferFrom(readableByteChannel, position, length);
                writeFileChannel.force(true);

It seems the modification is not equivalent. writeFileChannel.position(position) has side effect, making writeFileChannel written from position. But writeFileChannel.transferFrom() exits when position is larger than the size of writeFileChannel.

You might extend the file of writeFileChannel to position and also check if the transferred bytes is equal to length.

bxfjb commented 6 months ago

ByteStreams.toByteArray() creates large array in memory and is unnecessary. It seems FileChannel.transferFrom() with NIO channel would be more efficient for writing streams to a file.

Thx for reply, I tried FileChannel.transferFrom() but find out some unittest failed:

@Test
public void consumeQueueTest() throws ClassNotFoundException, NoSuchMethodException {
    ...
    ByteBuffer cqItem1 = fileSegment.read(initPosition, unitSize);
    Assert.assertEquals(baseOffset, cqItem1.getLong());  // failed
    ...
}

java.lang.AssertionError: Expected :1000 Actual :0

It turns out the cq data was not flush correctly by debugging, I wonder if the way I use FileChannel.transferFrom() is wrong: Before:

                byte[] byteArray = ByteStreams.toByteArray(inputStream);
                writeFileChannel.position(position);
                ByteBuffer buffer = ByteBuffer.wrap(byteArray);
                while (buffer.hasRemaining()) {
                    writeFileChannel.write(buffer);
                }
                writeFileChannel.force(true);

After:

                ReadableByteChannel readableByteChannel = Channels.newChannel(inputStream);
                writeFileChannel.transferFrom(readableByteChannel, position, length);
                writeFileChannel.force(true);

It seems the modification is not equivalent. writeFileChannel.position(position) has side effect, making writeFileChannel written from position. But writeFileChannel.transferFrom() exits when position is larger than the size of writeFileChannel.

You might extend the file of writeFileChannel to position and also check if the transferred bytes is equal to length.

Agree with you. But then I was wondering if the failed unittest necessary, it seems acting like:

Is it acceptable to remove step 1 in consumeQueueTest? @lizhimins 'cause the scenario seems nonexistent in production.

Furthermore, FileChannel.read() has the same problem with FileChannel.write(), but to return a ByteBuffer, calling it looks like inevitable. I didn't find out how to use FileChannel.transferTo() to replace it completely. I'd like to use MappedByteBuffer as a alternative.

caigy commented 6 months ago

@bxfjb If you want to extend the size of writeFileChannel to make FileChannel.transferTo() successful, you may try writing a byte at position - 1:

writeFileChannel.write(ByteBuffer.wrap(new byte[]{0}), position - 1);
bxfjb commented 6 months ago

@bxfjb If you want to extend the size of writeFileChannel to make FileChannel.transferTo() successful, you may try writing a byte at position - 1:

writeFileChannel.write(ByteBuffer.wrap(new byte[]{0}), position - 1);

Eventually I decide to use MappedByteBuffer for FileChannel.transferFrom()'s worse IO preformance in tests, could you pls review the #8036?

lizhimins commented 6 months ago

The default implementation in the current code is DefaultMetadataStore and PosixFileSegment. In the original design, PosixFileSegment only as test cases and validation purposes. We used metadata management based on Rocksdb implementation and FileSegment based on object storage or HDFS implementation in out production. In order to achieve higher performance and lower storage costs, this part of the code will contribute in future pr.

lizhimins commented 6 months ago

Adding a new file type for index looks good

caigy commented 6 months ago

@bxfjb If you want to extend the size of writeFileChannel to make FileChannel.transferTo() successful, you may try writing a byte at position - 1:

writeFileChannel.write(ByteBuffer.wrap(new byte[]{0}), position - 1);

Eventually I decide to use MappedByteBuffer for FileChannel.transferFrom()'s worse IO preformance in tests, could you pls review the #8036?

In general mmap has a greater impact on page cache and may cause resource contention with normal commit log. We need to evaluate the impact on overall performance carefully.

lizhimins commented 6 months ago

@bxfjb If you want to extend the size of writeFileChannel to make FileChannel.transferTo() successful, you may try writing a byte at position - 1:

writeFileChannel.write(ByteBuffer.wrap(new byte[]{0}), position - 1);

Eventually I decide to use MappedByteBuffer for FileChannel.transferFrom()'s worse IO preformance in tests, could you pls review the #8036?

In general mmap has a greater impact on page cache and may cause resource contention with normal commit log. We need to evaluate the impact on overall performance carefully.

If the target storage is based on the POSIX protocol, using directIO is a good choice.

bxfjb commented 6 months ago

@bxfjb If you want to extend the size of writeFileChannel to make FileChannel.transferTo() successful, you may try writing a byte at position - 1:

writeFileChannel.write(ByteBuffer.wrap(new byte[]{0}), position - 1);

Eventually I decide to use MappedByteBuffer for FileChannel.transferFrom()'s worse IO preformance in tests, could you pls review the #8036?

In general mmap has a greater impact on page cache and may cause resource contention with normal commit log. We need to evaluate the impact on overall performance carefully.

Got it, I am doing stress tests to reveal the impact as possible as I can. So far it looks good under about 60k producer TPS.

bxfjb commented 6 months ago

@bxfjb If you want to extend the size of writeFileChannel to make FileChannel.transferTo() successful, you may try writing a byte at position - 1:

writeFileChannel.write(ByteBuffer.wrap(new byte[]{0}), position - 1);

Eventually I decide to use MappedByteBuffer for FileChannel.transferFrom()'s worse IO preformance in tests, could you pls review the #8036?

In general mmap has a greater impact on page cache and may cause resource contention with normal commit log. We need to evaluate the impact on overall performance carefully.

If the target storage is based on the POSIX protocol, using directIO is a good choice.

I chose MappedByteBuffer cuz that:

I wonder what do you mean about direct IO, does it mean DirectByteBuffer or something?

lizhimins commented 5 months ago

对于 SSD 转 HDD 场景,当前设计中 PosixFileSegment 主要是为了测试场景,可以改进。如果用 MappedByteBuffer,把数据从 CommitLog 复制到 FileSegment,也会有 pagecache 浪费内存的问题。至于如何实现,可以搜索 Java directIO lib

For copy data from SSD to HDD scenario, the PosixFileSegment in the current design is mainly for testing scenarios and can be improved. If you use MappedByteBuffer to copy data from CommitLog to FileSegment, there will also be a problem of pagecache wasting memory. We can use Java directIO lib to slove this problem