redis / lettuce

Advanced Java Redis client for thread-safe sync, async, and reactive usage. Supports Cluster, Sentinel, Pipelining, and codecs.
https://lettuce.io
MIT License
5.39k stars 970 forks source link

OutOfDirectMemoryError can cause different threads to read each other's data #2590

Open bironran opened 9 months ago

bironran commented 9 months ago

Bug Report

When reading from multiple connections using multiple threads, an OutOfDirectMemoryError can cause threads to read the result of commands sent by different threads, e.g. thread 1 issues "get(x)", thread 2 issues "get(y)" and but receives the result for key x.

Input Code

Input Code ```java import java.lang.management.ManagementFactory; import java.time.Duration; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import io.lettuce.core.ClientOptions; import io.lettuce.core.ClientOptions.DisconnectedBehavior; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisCommandInterruptedException; import io.lettuce.core.SocketOptions; import io.lettuce.core.TimeoutOptions; import io.lettuce.core.api.StatefulRedisConnection; import com.sun.management.HotSpotDiagnosticMXBean; import com.sun.management.VMOption; public class LettuceDirectMemoryFailure { public static void main(String[] args) throws InterruptedException { final HotSpotDiagnosticMXBean hsdiag = ManagementFactory.getPlatformMXBean(HotSpotDiagnosticMXBean.class); if (hsdiag != null) { final VMOption maxDirectMemorySize = hsdiag.getVMOption("MaxDirectMemorySize"); System.out.println(maxDirectMemorySize); final long maxDirectMemorySizeLong = Long.parseLong(maxDirectMemorySize.getValue()); if(maxDirectMemorySizeLong <= 0 || maxDirectMemorySizeLong > 6_000_000) { System.out.println("Test must be run with -XX:MaxDirectMemorySize=5M"); return; } } AtomicLong failures = new AtomicLong(0L); AtomicLong success = new AtomicLong(0L); RedisClient redisClient = RedisClient.create("redis://127.0.0.1:7000/0"); redisClient.setOptions(ClientOptions.builder().disconnectedBehavior(DisconnectedBehavior.ACCEPT_COMMANDS).socketOptions( SocketOptions.builder().connectTimeout(Duration.ofMillis(50)).tcpNoDelay(true).build()).autoReconnect(true). timeoutOptions(TimeoutOptions.builder().timeoutCommands(true).fixedTimeout(Duration.ofSeconds(5)).build()).build()); List> connections = new ArrayList<>(); for (int i = 0; i < 10; i++) { connections.add(redisClient.connect()); } String spaces = new String(new char[1_000_000]).replace('\0', ' '); final int NUMBER_OF_THREADS = 100; for (int i = 0; i < NUMBER_OF_THREADS; i++) { connections.get(0).sync().set("key" + i, i + ":" + spaces); } List threads = new LinkedList<>(); for (int i = 0; i < NUMBER_OF_THREADS; i++) { final int threadId = i; Runnable runnable = () -> { while (failures.get() == 0) { try { final String key = "key" + threadId; final StatefulRedisConnection connection = connections.get(threadId / 10); final String result = connection.async().get(key).get(20, TimeUnit.SECONDS).split(":")[0]; if (result != null && !Integer.toString(threadId).equals(result)) { System.out.println("ERROR on thread " + threadId + " got " + result); failures.incrementAndGet(); } else { success.incrementAndGet(); } } catch (RedisCommandInterruptedException | InterruptedException | ExecutionException | TimeoutException e) { //ignore } } }; final Thread thread = new Thread(runnable, "worker " + i); thread.setDaemon(true); thread.start(); threads.add(thread); } while(failures.get() == 0) { System.out.println("success: " + success.get() + " ; failures " + failures.get()); Thread.sleep(100); } System.out.println("success: " + success.get() + " ; failures " + failures.get()); for (Thread thread : threads) { thread.interrupt(); } for (StatefulRedisConnection connection : connections) { connection.close(); } redisClient.shutdown(); } } ```

Expected behavior/code

Even on OutOfDirectMemoryError, different threads must never read results from commands they have not issued.

Environment

bironran commented 9 months ago

Please note we consider this a super-critical bug. There's no way we can make sure this doesn't happen again, nor catch it before wrong results are returned and acted upon. Unless we can find a way to bullet-proof this, we'll be switching to Jedis with urgent priority, which is too bad as Lettuce provides quite a bit more functionality and ease of use.

mp911de commented 9 months ago

Any reason you do not switch to heap buffers? Managing direct memory, in addition to the heap size, can be quite challenging.

bironran commented 9 months ago

@mp911de how would we do that? I've looked at -Dio.netty.noPreferDirect=true but according to https://github.com/netty/netty/issues/10189 it seems it has very little effect.

mp911de commented 9 months ago

Setting -Dio.netty.noPreferDirect=true is honored by Lettuce for read and write buffers. However, netty's read attempts to obtain a direct buffer still. In any case, you can reduce the direct memory impact.

The alternative using code is:

ClientResources resources = ClientResources.builder()
        .nettyCustomizer(new NettyCustomizer() {
            @Override
            public void afterBootstrapInitialized(Bootstrap bootstrap) {
                bootstrap.option(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(false));
            }
        }).build();
bironran commented 9 months ago

Thanks, this seem to work well with my test class.

Wouldn't that mode be a more sensible default, given the critical nature of the bug?

codingdie commented 6 months ago

hello, I encountered the same problem in the production environment, is there any plans to solve this problem?