Alluxio / alluxio

Alluxio, data orchestration for analytics and machine learning in the cloud
https://www.alluxio.io
Apache License 2.0
6.75k stars 2.92k forks source link

UfsSyncPathCache Cache Eviction Recursion Issue #18641

Open ssyssy opened 1 month ago

ssyssy commented 1 month ago

Alluxio Version: master-2.x

Describe the bug I found a bug in master-2.x, in the onCacheEviction handler, when the guava cache in UfsSyncPathCache is full and there are 1000 threads concurrently syncing paths, there will be a recursion loop which would lead to stack overflow and lock all the rpcs threads.

To Reproduce Having a large number(~1000) of threads, concurrently syncing different paths.

Expected behavior Threads won't get locked, even there is a large traffic.

Urgency This could influence users when the scale is large, leading to all the rpc threads in the master is dead, and thus block all the services.

Are you planning to fix it https://github.com/Alluxio/alluxio/pull/18640

Additional context Here is the Jstack of one thread we found in the bug: "master-rpc-executor-TPE-thread-31670" #73207 daemon prio=5 os_prio=0 cpu=490.13ms elapsed=114135.12s tid=0x0000ffc5c8545800 nid=0x539fb waiting on condition [0x0000ffc3fedf9000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.23/Native Method)

ljluestc commented 3 weeks ago
package alluxio.master.file.meta;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class UfsSyncPathCache {

    private final LoadingCache<String, String> mCache;
    private final LinkedBlockingQueue<String> mEvictionQueue = new LinkedBlockingQueue<>();
    private boolean mEvictionInProgress = false;
    private final ExecutorService mEvictionExecutor = Executors.newSingleThreadExecutor();

    public UfsSyncPathCache() {
        mCache = CacheBuilder.newBuilder()
                .maximumSize(1000)
                .removalListener(new RemovalListener<String, String>() {
                    @Override
                    public void onRemoval(RemovalNotification<String, String> notification) {
                        handleEviction(notification.getKey());
                    }
                })
                .build(new CacheLoader<String, String>() {
                    @Override
                    public String load(String key) throws Exception {
                        return syncPath(key);
                    }
                });
    }

    private void handleEviction(String key) {
        mEvictionQueue.add(key);
        processEvictionQueue();
    }

    private synchronized void processEvictionQueue() {
        if (mEvictionInProgress) {
            return;
        }
        mEvictionInProgress = true;
        mEvictionExecutor.submit(() -> {
            try {
                while (!mEvictionQueue.isEmpty()) {
                    String key = mEvictionQueue.poll();
                    if (key != null) {
                        notifyInvalidationInternal(key);
                    }
                }
            } finally {
                mEvictionInProgress = false;
            }
        });
    }

    private void notifyInvalidationInternal(String key) {
        // Existing invalidation logic
        System.out.println("Invalidating path: " + key);
    }

    private String syncPath(String path) {
        // Simulate path syncing logic
        System.out.println("Syncing path: " + path);
        return path;
    }

    public void sync(String path) {
        mCache.getUnchecked(path);
    }
}