Tencent / spring-cloud-tencent

Spring Cloud Tencent is a Spring Cloud based Service Governance Framework provided by Tencent.
Other
3.17k stars 491 forks source link

元数据路由支持线程池场景使用 #372

Closed lingxiao-wu closed 2 years ago

lingxiao-wu commented 2 years ago

What is the feature you want to add? 目前Spring Cloud Tencent的元数据由InheritableThreadLocal保存,可以保证InheritableThreadLocal中的元数据在父子线程中传递,但是InheritableThreadLocal不能解决线程场景,因为线程池存在线程复用问题,不会每次new,所以线程取InheritableThreadLocal 的值还是残留第一次传递进来值,导致结果不正确

image

Why do you want to add this feature? 用于保证在线程池场景下,可以获取正确的元数据

How to implement this feature? transmittable-thread-local是解决复用的通用解决方案,可以通过文档的方式引导用户添加transmittable-thread-local解决,但是这种处理方式不太优雅。 Spring Cloud Tencent作为元数据路由功能的提供方,应该提供一套机制解决线程复用问题。

目前比较简单的方式是为用户提供线程池的包装类,在提交给线程前将ThreadLocal中的数据保存起来,在提交时将数据写入子线程的ThreadLocal中

伪代码如下:


public class MetadataThreadSupport {

    static class MetadataRunnable implements Runnable {

        private final Runnable runnable;

        private final MetadataContext metadataContext;

        public MetadataRunnable(Runnable runnable) {
            this.runnable = runnable;
            this.metadataContext = MetadataContextHolder.get();
        }

        @Override
        public void run() {
            // 线程旧的MetadataContext
            MetadataContext pre = MetadataContextHolder.get();
            MetadataContextHolder.set(metadataContext);
            try {
                runnable.run();
            } finally {
                // 还原线程旧的MetadataContext
                MetadataContextHolder.set(pre);
            }
        }
    }

    static class MetadataCallable<V> implements Callable<V> {

        private final Callable<V> callable;

        private final MetadataContext metadataContext;

        public MetadataCallable(Callable<V> callable) {
            this.callable = callable;
            this.metadataContext = MetadataContextHolder.get();
        }

        @Override
        public V call() throws Exception {
            MetadataContext pre = MetadataContextHolder.get();
            MetadataContextHolder.set(this.metadataContext);
            try {
                return callable.call();
            } finally {
                // 还原线程旧的MetadataContext
                MetadataContextHolder.set(pre);
            }
        }
    }

    static class MetadataExecutorService implements ExecutorService {

        private final ExecutorService executorService;

        public MetadataExecutorService(ExecutorService executorService) {
            this.executorService = executorService;
        }

        @Override
        public void shutdown() {
            executorService.shutdown();
        }

        @Override
        public List<Runnable> shutdownNow() {
            return executorService.shutdownNow();
        }

        @Override
        public boolean isShutdown() {
            return executorService.isShutdown();
        }

        @Override
        public boolean isTerminated() {
            return executorService.isTerminated();
        }

        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
            return executorService.awaitTermination(timeout, unit);
        }

        @Override
        public <T> Future<T> submit(Callable<T> task) {
            return executorService.submit(new MetadataCallable<>(task));
        }

        @Override
        public <T> Future<T> submit(Runnable task, T result) {
            return executorService.submit(new MetadataRunnable(task), result);
        }

        @Override
        public Future<?> submit(Runnable task) {
            return executorService.submit(new MetadataRunnable(task));
        }

        @Override
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
            return executorService.invokeAll(convert(tasks));
        }

        @Override
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
            return executorService.invokeAll(convert(tasks), timeout, unit);
        }

        @Override
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
            return executorService.invokeAny(convert(tasks));
        }

        @Override
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return executorService.invokeAny(convert(tasks), timeout, unit);
        }

        @Override
        public void execute(Runnable command) {
            executorService.execute(new MetadataRunnable(command));
        }

        private <T> Collection<? extends Callable<T>> convert(Collection<? extends Callable<T>> tasks) {
            return tasks.stream().map(
                    task -> (Callable<T>) new MetadataCallable<>(task)
            ).collect(Collectors.toCollection(ArrayList::new));
        }
    }

    public static void main(String[] args) {
        MetadataExecutorService metadataExecutorService = new MetadataExecutorService(Executors.newSingleThreadExecutor());
        metadataExecutorService.submit(
                () -> {
                    // do business
                }
        );
    }
}

Additional context Add any other context or screenshots about the feature request here.

lepdou commented 2 years ago

很好的建议,我们将会在周会中详细讨论你的建议。后续作为优化点优化。 https://github.com/Tencent/spring-cloud-tencent/discussions/351

lepdou commented 2 years ago

@lingxiao-wu

我们周会讨论了一下,你提供的这种方案确实是可行的,方便帮忙提交一个 pr 支持一下不~

感谢

lingxiao-wu commented 2 years ago

@lingxiao-wu

我们周会讨论了一下,你提供的这种方案确实是可行的,方便帮忙提交一个 pr 支持一下不~

感谢

方便的,我明天提一下

misselvexu commented 2 years ago

The scenario of passing ThreadLocal values in the case of using thread pools and other execution components that will pool multiplex threads can be solved using existing three-party open source solutions directly.

lingxiao-wu commented 2 years ago

在Spring环境下,为了线程池能优雅关闭,一般会选择将线程池提交给IOC容器管理,我们是否有必要提供一个后置处理器,用于将Java的线程池包装成TTL线程池,这样可以减少TTL对用户代码的侵入,也可以避免使用TTL java agent。

此外,METADATA_CONTEXT目前使用InheritableThreadLocal,TTL版本建议使用2.3.0之后的版本,之前的版本可能无法正确处理没有经过TransmittableThreadLocal包装的ThreadLocal对象



private static final ThreadLocal<MetadataContext> METADATA_CONTEXT = new InheritableThreadLocal<>();
`
SkyeBeFreeman commented 2 years ago

This problem will not be considered in this project for the time being, and restarting will be considered later if necessary.