alibaba / transmittable-thread-local

📌 a missing Java std lib(simple & 0-dependency) for framework/middleware, provide an enhanced InheritableThreadLocal that transmits values between threads even using thread pooling components.
https://github.com/alibaba/transmittable-thread-local
Apache License 2.0
7.66k stars 1.69k forks source link

讨论:Session级Cache场景下,TransmittableThreadLocal的使用 #122

Closed olove closed 5 years ago

olove commented 5 years ago

场景

某些业务流程计算逻辑复杂,基础数据读取服务可能需要多次调用。

希望做线程级缓存(更准确地说,因为涉及多个上下游线程,其实是Session级的缓存),避免重复调用外部服务。

问题

数据产生可能来源于子线程,TransmittableThreadLocalcopy数据的时候,忽略主线程不存在的key,导致主线程无法读取到子线程新创建的ThreadLocal值。

解决方案

TransmittableThreadLocal可以增加参数控制是否需要向父线程传递新增ThreadLocal

示例代码

package com.example.practice.java.modern.tech.sandbox.library.hello;

import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlRunnable;
import io.reactivex.Flowable;
import io.reactivex.functions.Consumer;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import org.junit.Before;
import org.junit.Test;

public class EasySample {

    @Before
    public void setUp() {
        RxJavaPlugins.setScheduleHandler(TtlRunnable::get);
    }

    @Test
    public void getSomethingByCache() {
        BizService bizService = new BizService();

        final Consumer<Object> consumer = result -> System.out.println(Thread.currentThread().getName() + ":" + bizService.getCache());

        Flowable.just(bizService)
                .observeOn(Schedulers.io())
                .map(BizService::getSomethingByCache)
                .doOnNext(consumer)
                .blockingSubscribe(consumer);

        // 业务在某些时刻需要用到
        Object object = bizService.getSomethingByCache();

        // doSomething...
    }

    @Test
    public void getSomethingByCache2() {
        BizService bizService = new BizService();

        // 如果在父线程生成,子线程可以直接使用
        Object object = bizService.getSomethingByCache();

        // doSomething...

        final Consumer<Object> consumer = result -> System.out.println(Thread.currentThread().getName() + ":" + bizService.getCache());

        Flowable.just(bizService)
                .observeOn(Schedulers.io())
                .map(BizService::getSomethingByCache)
                .doOnNext(consumer)
                .blockingSubscribe(consumer);
    }

    private static class BizService {
        public final TransmittableThreadLocal<Object> cache = new TransmittableThreadLocal<>();

        public Object getSomething() {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }

            return new Object();
        }

        /**
         * 获取业务数据,一般使用spring cache做缓存,这里简单实现
         */
        public Object getSomethingByCache() {
            Object value = cache.get();
            if (value == null) {
                value = getSomething();
                cache.set(value);
                return value;
            } else {
                return cache.get();
            }
        }

        public Object getCache() {
            return cache.get();
        }

        public void remove() {
            cache.remove();
        }
    }
}

The results:

com.alibaba.nageer.EasySample#getSomethingByCache

RxCachedThreadScheduler-1:java.lang.Object@29d7d137
main:null

com.alibaba.nageer.EasySample#getSomethingByCache2

RxCachedThreadScheduler-1:java.lang.Object@740a86a8
main:java.lang.Object@740a86a8
oldratlee commented 5 years ago

『Cache的问题』 与 『TTL的传递问题』是正交的。

表述如下: @olove

设计表述

在设计上/思路/概念上,

实现上的注意点

代码表述

对应你的Demo代码,调整后,用代码表述如下:

import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlRunnable;
import io.reactivex.Flowable;
import io.reactivex.functions.Consumer;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;

public class SessionCacheDemo {
    @BeforeClass
    public static void beforeClass() {
        // RxJava 集成 TTL
        RxJavaPlugins.setScheduleHandler(TtlRunnable::get);
    }

    @Test
    public void getSomethingByCache() throws Exception {
        BizService bizService = new BizService();

        final Consumer<Object> printer = result -> System.out.printf("[%30s]: %s%n", Thread.currentThread().getName(), bizService.getCache());

        Flowable.just(bizService)
                .observeOn(Schedulers.io())
                .map(BizService::getItemByCache)
                .doOnNext(printer)
                .blockingSubscribe(printer);

        // 业务 在后续时刻 需要用到
        Object object = bizService.getItemByCache();
        printer.accept(object);
    }

    /**
     * Mock Service
     */
    private static class BizService {
        private static final String ONLY_KEY = "ONLY_KEY";

        private final TransmittableThreadLocal<ConcurrentMap<String, Item>> cache_context = new TransmittableThreadLocal<ConcurrentMap<String, Item>>() {
            @Override
            protected ConcurrentMap<String, Item> initialValue() {
                return new ConcurrentHashMap<>(); // init cache
            }
        };

        public BizService() {
            // NOTE: AVOID cache object lazy init
            cache_context.get();
        }

        public Item getItem() {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // do nothing
            }
            return new Item(ThreadLocalRandom.current().nextInt(0, 10_000));
        }

        /**
         * 获取业务数据,一般使用spring cache做缓存,这里简单实现
         */
        public Item getItemByCache() {
            final ConcurrentMap<String, Item> cache = cache_context.get();
            return cache.computeIfAbsent(ONLY_KEY, key -> getItem());
        }

        public Item getCache() {
            return cache_context.get().get(ONLY_KEY);
        }

        public void clearCache() {
            cache_context.get().clear();
        }
    }

    /**
     * Mock Cache Data
     */
    public static class Item {
        private int id;

        public Item(int id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return "Item{id=" + id + '}';
        }
    }
}

运行结果

[     RxCachedThreadScheduler-1]: Item{id=248}
[                          main]: Item{id=248}
[                          main]: Item{id=248}
oldratlee commented 5 years ago

转成Demo代码(SessionCacheDemo)添加了到代码库中了 :

https://github.com/alibaba/transmittable-thread-local/blob/b0854e767f1022ba1ce41f1d9562d966b879a67b/src/test/java/com/alibaba/demo/session_cache/SessionCacheDemo.kt#L31-L45

运行结果:

[     pool-1-thread-2] cache: Item(id=9335)
[                main] cache: Item(id=9335)
[                main] cache: Item(id=9335)
[     RxCachedThreadScheduler-1] cache: Item(id=8624)
[                          main] cache: Item(id=8624)
[                          main] cache: Item(id=8624)

感谢 @olove 场景说明 与 示例(代码)提供! ❤️

olove commented 5 years ago

谢谢@oldratlee 这么详细的说明和demo用例