alibaba / transmittable-thread-local

📌 a missing Java std lib(simple & 0-dependency) for framework/middleware, provide an enhanced InheritableThreadLocal that transmits values between threads even using thread pooling components.
https://github.com/alibaba/transmittable-thread-local
Apache License 2.0
7.59k stars 1.69k forks source link

ConcurrentModificationException问题及其系统设计讨论 #293

Open wmq930626 opened 3 years ago

wmq930626 commented 3 years ago

image image 使用过程中,多线程开启任务时报错

Markkkkks commented 3 years ago

java版本:java version "1.8.0_301" ttl版本:2.12.1 操作步骤:

  1. 线程池声明:
    private static final ExecutorService EXECUTOR = new ThreadPoolExecutor(8, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());
  2. 提交任务
    List<Future> futures = new ArrayList<>();
    for (List<RuleDTO> ruleList : ruleContainer) {
     ruleList.forEach(futures.add(EXECUTOR.submit(Objects.requireNonNull(TtlRunnable.get(() -> ...)));
    }

    3.主线程获取任务结果

    futures.foreach(f->{
        try {
             future.get();
        } catch (Exception e) {
             log.error("多线程异常",e);
        }
    })

    异常堆栈: 企业微信截图_20210915143622

wuwen5 commented 3 years ago

与#220 重复,建议提供可复现的demo,运行时环境,是否引入其他agent

oldratlee commented 2 years ago

@wmq930626 你的业务逻辑实现,是不是在

这2个业务回调(业务逻辑)中,有触发增减TransmittableThreadLocal实例的操作(即holder WeakHashMap的变更,put/remove),如TransmittableThreadLocal#remove

@wmq930626 请排查确认一下,并给一下分析说明 与 结果。💕


虽然在业务回调(beforeExecute/afterExecute)有触发增TransmittableThreadLocal实例的操作,应该是不合理/可靠的操作;

但即使如此,TTL抛异常的方式来反馈,感觉是值得优化的。

@wuwen5 你怎么看? :")

Markkkkks commented 2 years ago

@oldratlee 我代为回答一下,业务逻辑中并没有做回调一下两个方法。堆栈中的错误是在future.get();这个代码中抛出的异常。

目前猜测是否短时间内新建多个ttlRunable的时候,TTL框架在做threadLocal相关的拷贝时,在TransmittableThreadLocal的内部类com.alibaba.ttl.TransmittableThreadLocal#holder上发生了并发修改的问题?

Markkkkks commented 2 years ago

在提交到TtlRunnable到线程池的时候,在TtlRunnable结束前有调用com.alibaba.ttl.TransmittableThreadLocal#remove做当前线程Context的清除,不知道是否是这个操作不合理

wuwen5 commented 2 years ago

image

有个疑点,你的截图中的栈信息里面,restoreTtlValues方法与doExecuteCallback 方法之间有一段置入代码,我看目前版本中不存在的,是否有使用agent修改过呢?

wuwen5 commented 2 years ago

@oldratlee 你看下呢?这个正常么, 我翻代码看应该不存在.access$400这次调用,怀疑是不是被其他agent修改过? image

wuwen5 commented 2 years ago

@Markkkkks 你确认有引入其他agent么?

Markkkkks commented 2 years ago

@Markkkkks 你确认有引入其他agent么?

启动参数里面仅有这个agent -javaagent \IDEA-U\ch-0\212.5080.55\plugins\java\lib\rt\debugger-agent.jar

wuwen5 commented 2 years ago

忽略我的疑问,栈信息没问题,那个是内部类调用产生的

oldratlee commented 2 years ago

目前猜测是否短时间内新建多个ttlRunable的时候,TTL框架在做threadLocal相关的拷贝时,在TransmittableThreadLocal的内部类com.alibaba.ttl.TransmittableThreadLocal#holder上发生了并发修改的问题?

@Markkkkks
TransmittableThreadLocal#holder成员本身是个ThreadLocal,多个ttlRunable的修改是隔离的,不会有并发修改的问题。

在提交到TtlRunnable到线程池的时候,在TtlRunnable结束前有调用com.alibaba.ttl.TransmittableThreadLocal#remove做当前线程Context的清除,不知道是否是这个操作不合理

TtlRunnable结束前,调用TransmittableThreadLocal#remove, 即是在业务的Runnable逻辑中调用TransmittableThreadLocal#remove,是吧?@Markkkkks

如果是这个情况,我可以很肯定是安全的。并且这样的用法,在业务中是很常见的。

oldratlee commented 2 years ago

根据上面提供的信息写了一个可以运行的验证demo(demo代码附下面):

没有出现问题。 @Markkkkks @wmq930626 @wuwen5

当然,这个demo并不能说明 一定没有 并发的ConcurrentModificationException问题。 只能概率上说明,上面提到的信息还不够有效复现;即还没有找到实际引发问题的关键。


import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlRunnable;

import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ConcurrentModificationExceptionDemo {
    private static final ExecutorService EXECUTOR = new ThreadPoolExecutor(
            16, 16, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(), Thread::new, new ThreadPoolExecutor.CallerRunsPolicy());

    private static final TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

    public static void main(String[] args) {
        context.set("set-in-main");

        List<Future<?>> futures = IntStream.range(0, 10_000).mapToObj(num -> {
            TtlRunnable task = TtlRunnable.get(() -> {
                long ms = ThreadLocalRandom.current().nextLong(0, 7);
                sleep(ms);
                System.out.println("run in thread[" + Thread.currentThread().getName()
                        + "] with context[" + context.get() + "], num " + num + " sleep " + ms);

                // remove in summited runnable
                context.remove();
            });
            return EXECUTOR.submit(task);
        }).collect(Collectors.toList());

        futures.forEach(f -> {
            try {
                f.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        EXECUTOR.shutdown();
    }

    private static void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

PS:源码文件

oldratlee commented 2 years ago

@Markkkkks @wmq930626 期望能从你这边出这个问题的业务代码中, 分离整理 一个可以(有较大概率)复现问题的极简可运行Demo工程:❤️

否则:

Markkkkks commented 2 years ago

期望能从你这边出这个问题的业务代码中, 分离整理 一个可以(有较大概率)复现问题的极简Demo工程

@oldratlee 是这种做法,谢谢回复与test code,了解这种处理方式是比较合适的。 我尝试在业务代码中寻找不安全的并发操作 and 用剥离业务代码的test code复现这个问题。

另外根据堆栈的信息,是在holderWeakHashMapfast fail机制下触发的并发修改异常,也无法从堆栈中快速发现可疑的并发操作。

请问下

  1. 是否能优化堆栈的报错信息
  2. 除了demo代码外,框架中是否有其他操作or机制会,调用这个WeakHashMapput方法,导致在iterate这个Map的时候出现并发异常?
oldratlee commented 2 years ago

1. 是否能优化堆栈的报错信息

这看起来是个JVM的基础加强,一方面不合适TTL来做, 另一方面在业界我也没有发现可用方案,可能不可实现 或是 实现很困难。 @Markkkkks

2. 除了demo代码外,框架中是否有其他操作or机制会,调用这个WeakHashMapput方法,导致在iterate这个Map的时候出现并发异常?

holder WeakHashMap的变更(put/remove方法,即增减TransmittableThreadLocal的操作) 只会在下面的方法中调用:

@Markkkkks 你可以通过TransmittableThreadLocal的源码再确认一遍。

oldratlee commented 2 years ago

@Markkkkks ConcurrentModificationException这个问题在你的业务代码可以有一定的复现率(容易运行出来)吗?


TransmittableThreadLocal#holder成员本身是个ThreadLocal,多个ttlRunable的修改是隔离的,不会有并发修改的问题。

@Markkkkks 从TTL自身实现逻辑的分析来看,应该只可能是下面这2个业务回调引起。 我改个测试版,然后你试一下,看看会不会好了?

  • com.alibaba.ttl.TransmittableThreadLocal#beforeExecute
  • com.alibaba.ttl.TransmittableThreadLocal#afterExecute

这2个业务回调(业务逻辑)中,有触发增减TransmittableThreadLocal的操作,如TransmittableThreadLocal#remove

oldratlee commented 2 years ago

修改好了,在分支expt/hotfix-293

@Markkkkks 安装到本地,使用TTL版本2.12.2-SNAPSHOT试一下, 看看ConcurrentModificationException这个问题是不是好了?

git checkout expt/hotfix-293

mvn install -Dmaven.test.skip
Markkkkks commented 2 years ago

修改好了,在分支expt/hotfix-293

@Markkkkks 安装到本地,使用TTL版本2.12.2-SNAPSHOT试一下?

git checkout expt/hotfix-293

mvn install -Dmaven.test.skip

好的😊,我试试

oldratlee commented 2 years ago

holder WeakHashMap的变更(put/remove方法) 只会在下面的方法中调用:

  • TransmittableThreadLocalget / set / remove方法
  • Transmitterreplay / restore方法

还有一个可能是TransmittableThreadLocal被 GC 了。 (当然出现这个情况,ThreadLocal使用方式往往不合理。ThreadLocal的使用方式应该是作为static字段,是不会被 GC 的。)


还要进一步确认:

WeakHashMap在遍历过程中,Key被 GC 了,会不会导致ConcurrentModificationException异常。

上面情况应该不会出现,否则WeakHashMap的使用会非常复杂难用;我可以构造一个case 来实测证明。

如果这个情况会出现,hotfix的实现要再加强。

oldratlee commented 2 years ago

修改好了,在分支expt/hotfix-293。 @Markkkkks 安装到本地,使用TTL版本2.12.2-SNAPSHOT试一下?

git checkout expt/hotfix-293

mvn install -Dmaven.test.skip

好的,我试试

测试了吗,结果如何? 😄 @Markkkkks

oldratlee commented 2 years ago

还要进一步确认:

WeakHashMap在遍历过程中,Key被 GC 了,会不会导致ConcurrentModificationException异常。

上面情况应该不会出现,否则WeakHashMap的使用会非常复杂难用;我可以构造一个case 来实测证明。

如果这个情况会出现,hotfix的实现要再加强。

写了测试Demo 实证: WeakHashMap在遍历过程中,Key被 GC 了,并不会导致ConcurrentModificationException异常。 @Markkkkks

验证Demo

https://github.com/oldratlee/HelloKt

运行方式:

./gradlew execTestMain -P mainClass=playground.weakhashmap.WeakHashMapGcIterationKt

demo代码:

https://github.com/oldratlee/HelloKt/blob/ec5f62d1d08b8d3a5fd2abf6ae5cc13dab59f9a0/src/test/kotlin/playground/weakhashmap/WeakHashMapGcIteration.kt#L22-L34

一次运行结果

Key(num=271828)
Preparing data...
[round 0] begin! keyListSize: 1000, weakHashMapSize: 1000
[round 0] KeyList: removed Key(num=692), keyListSize: 999, weakHashMapSize: 1000
[round 0] KeyList: removed Key(num=561), keyListSize: 998, weakHashMapSize: 1000
[round 0] KeyList: removed Key(num=45), keyListSize: 997, weakHashMapSize: 999
finalize Key(num=692)
[round 0] KeyList: removed Key(num=743), keyListSize: 996, weakHashMapSize: 998
finalize Key(num=45)
[round 0] KeyList: removed Key(num=263), keyListSize: 995, weakHashMapSize: 997
finalize Key(num=743)
[round 0] KeyList: removed Key(num=400), keyListSize: 994, weakHashMapSize: 996
finalize Key(num=263)
[round 0] KeyList: removed Key(num=503), keyListSize: 993, weakHashMapSize: 995
finalize Key(num=400)
finalize Key(num=561)
finalize Key(num=271828)
[round 0] KeyList: removed Key(num=347), keyListSize: 992, weakHashMapSize: 994
[round 0] KeyList: removed Key(num=456), keyListSize: 991, weakHashMapSize: 994
finalize Key(num=503)
finalize Key(num=347)
[round 0] KeyList: removed Key(num=710), keyListSize: 990, weakHashMapSize: 992
finalize Key(num=456)
[round 0] KeyList: removed Key(num=336), keyListSize: 989, weakHashMapSize: 991
finalize Key(num=710)
[round 0] KeyList: removed Key(num=92), keyListSize: 988, weakHashMapSize: 990
finalize Key(num=336)
......
[round 0] KeyList: removed Key(num=197), keyListSize: 950, weakHashMapSize: 952
finalize Key(num=481)
[round 0] KeyList: removed Key(num=79), keyListSize: 949, weakHashMapSize: 951
finalize Key(num=709)
[round 0] KeyList: removed Key(num=295), keyListSize: 948, weakHashMapSize: 950
finalize Key(num=197)
finalize Key(num=79)
[round 0] end! keyListSize: 1000 -> 948, weakHashMapSize: 1000 -> 949
[round 1] begin! keyListSize: 948, weakHashMapSize: 949
[round 1] KeyList: removed Key(num=985), keyListSize: 947, weakHashMapSize: 949
[round 1] KeyList: removed Key(num=268), keyListSize: 946, weakHashMapSize: 949
finalize Key(num=295)
[round 1] KeyList: removed Key(num=145), keyListSize: 945, weakHashMapSize: 947
finalize Key(num=985)
[round 1] KeyList: removed Key(num=78), keyListSize: 944, weakHashMapSize: 946
finalize Key(num=268)
[round 1] KeyList: removed Key(num=291), keyListSize: 943, weakHashMapSize: 945
finalize Key(num=145)
finalize Key(num=78)
[round 1] KeyList: removed Key(num=364), keyListSize: 942, weakHashMapSize: 944
[round 1] KeyList: removed Key(num=44), keyListSize: 941, weakHashMapSize: 943
finalize Key(num=291)
[round 1] KeyList: removed Key(num=712), keyListSize: 940, weakHashMapSize: 942
finalize Key(num=364)
[round 1] KeyList: removed Key(num=904), keyListSize: 939, weakHashMapSize: 941
finalize Key(num=44)
[round 1] KeyList: removed Key(num=951), keyListSize: 938, weakHashMapSize: 941
finalize Key(num=712)
......
Markkkkks commented 2 years ago

使用hotfix版本的ttl这个问题没有重现了,thanks a lot @oldratlee

holder WeakHashMap的变更(put/remove方法) 只会在下面的方法中调用:

TransmittableThreadLocal的get / set / remove方法 Transmitter 的 replay / restore方法

业务代码中,在子线程future中调用了remove方法,同时主线程使用了future.get()方法, 可能是这个原因导致的ConcurrentModificationException,我尝试复现下。

oldratlee commented 2 years ago

使用hotfix版本的ttl这个问题没有重现了,thanks a lot @oldratlee

业务代码中,在子线程future中调用了remove方法,同时主线程使用了future.get()方法, 可能是这个原因导致的ConcurrentModificationException,我尝试复现下。

发布了正式版v2.12.2,包含了这个hotfix的修改内容。 https://github.com/alibaba/transmittable-thread-local/releases/tag/v2.12.2


这个Issue 先关闭了;如果复现问题,随时 Reopen~ 💕 @Markkkkks @wmq930626

happyomg commented 2 years ago

我今天也遇到了这个报错。我这边的case是能够稳定复现。

先说最后排查结论:业务方在二方包里实现了TTL的 beforeExecute 和 afterExecute 方法,并在该方法里触发了addValue()和removeValue(),最终导致了ConcurrentModificationException异常的发生。

以下是我的排查过程:

  1. 观察错误堆栈,完全看不出来哪里触发的,具体错误堆栈如下:
Exception in thread "HXXBizProcessor-DEFAULT-12-thread-104" java.util.ConcurrentModificationException
  at java.util.WeakHashMap$HashIterator.nextEntry(WeakHashMap.java:806)
  at java.util.WeakHashMap$EntryIterator.next(WeakHashMap.java:845)
  at java.util.WeakHashMap$EntryIterator.next(WeakHashMap.java:843)
  at com.alibaba.ttl.TransmittableThreadLocal.doExecuteCallback(TransmittableThreadLocal.java:164)
  at com.alibaba.ttl.TransmittableThreadLocal.access$300(TransmittableThreadLocal.java:54)
  at com.alibaba.ttl.TransmittableThreadLocal$Transmitter.replay(TransmittableThreadLocal.java:328)
  at com.alibaba.ttl.TtlRunnable.run(TtlRunnable.java:49)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:874)
  1. 研读TTL代码,反复确认,发现在ttl内部是没有泄露 hold 持有的 WeakHashMap 对象,基本确认TTL不应该出现线程不安全的问题

  2. 出现该类报错几乎只应该在 beforeExecute()afterExecute() 内,且调用到了 addValue()removeValue() 两个方法(因为除了反射外,整个TTL内部只有这2个方法可能修改到 hold 内的 WeakHashMap 对象 )

  3. 但由于没去申请业务代码权限(深刻自我检讨),一直没找到在哪里有实现了 beforeExecute()afterExecute()方法,甚至一度怀疑起公司内部自编译的 jdk 的协程里是否有 bug

  4. 查看本issue,发现v2.12.2版本做了一个fix,通过new一个新map的方式规避了该问题。 让业务方升级该版本后确实有用,但并没有找出问题的根因。

  5. 通过arthaswatch命令观察:发现执行方法一溜的都是 com.alibaba.ttl.TransmittableThreadLocal.beforeExecute ,看起来非常正常。

    watch com.alibaba.ttl.TransmittableThreadLocal beforeExecute {@Thread@currentThread().getName()}
  6. 然而在watch结果中一个非常不起眼的地方,发现了一处不太预期的地方:arthas抓到有一次执行 beforeExecute 的方法是在一个业务类里 com.xx.xx.xx.service.xxxx.xxxxServiceImpl$1

    method=com.alibaba.ttl.TransmittableThreadLocal.beforeExecute location=AtExit
    ts=2021-12-24 01:32:44; [cost=0.006412ms] result=@ArrayList[
    @String[RxComputationThreadPool-7],
    null,
    ]
    method=com.alibaba.ttl.TransmittableThreadLocal.beforeExecute location=AtExit
    ts=2021-12-24 01:32:44; [cost=0.004131ms] result=@ArrayList[
    @String[RxComputationThreadPool-7],
    null,
    ]
    method=com.xx.xx.xx.service.xxxx.xxxxServiceImpl$1.beforeExecute location=AtExit
    ts=2021-12-24 01:32:44; [cost=0.290056ms] result=@ArrayList[
    @String[RxComputationThreadPool-7],
    null,
    ]
  7. arthas反编译该类,发现业务方某一个二方包内实现了 beforeExecute()afterExecute()

    jad com.xx.xx.xx.service.xxxx.xxxxServiceImpl

    业务方实现的代码如下:

                protected void beforeExecute() {
    /* 58*/             super.beforeExecute();
    /* 59*/             Object ctx = this.get();
    /* 60*/             if (ctx != null && ctx != Trace.getRpcContext()) {
    /* 61*/                 Trace.setRpcContext((Object)ctx);
                    }
                }
    
                protected void afterExecute() {
    /* 67*/             super.afterExecute();
    /* 68*/             Trace.clearRpcContext();
                }
  8. Trace.getRpcContext()Trace.setRpcContext()方法内,触发了TTL的addValue()removeValue()方法,具体调用链路如下:

    doExecuteCallback --> iteraror.next() --> beforeExecute --> getRpcContext() 
    -->  ttl.get()  --> setRpcContext() --> ttl.set(null) & ttl.set(newCapture) 
    --> iteraror.next() --> ConcurrentModificationException
happyomg commented 2 years ago

我今天也遇到了这个报错。我这边的case是能够稳定复现。

先说最后排查结论:业务方在二方包里实现了TTL的 beforeExecute 和 afterExecute 方法,并在该方法里触发了addValue()和removeValue(),最终导致了ConcurrentModificationException异常的发生。

以下是我的排查过程:

……

跟 @zavakid 讨论了一种可能的检测方式:

oldratlee commented 2 years ago

@happyomg 非常专业的问题说明、排查说明 及其 设计讨论~ 👍


  • 建议:是否可以在 beforeExecuteafterExecute 的doc上加上一些明显的提示,提示用户在该实现内务必反复确认不要触发到addValue()removeValue()方法(无论间接还是直接)

@happyomg 『doc上加上一些明显的提示』赞成!

容易出问题的地方 值得说明出来。

  • 讨论:如果我们通过代码分析出来,TTL几乎不可能存在线程不安全的操作,那么是否还有必要做 v2.12.2 版本的那个修正? 该修正会掩盖用户错误实现( ConcurrentModificationException本身就是为了fail-fast,帮助用户发现bug ),并且还可能跟用户的预期不符(比如导致某些TTL丢失或者多传了)

@happyomg 『掩盖用户错误实现( ConcurrentModificationException本身就是为了fail-fast,帮助用户发现bug ),并且还可能跟用户的预期不符(比如导致某些TTL丢失或者多传了)』 赞成!

有问题不能fail-fast/掩盖问题 是不好的设计。

v2.12.2中对这个ConcurrentModificationException问题的修复方式过于 Quick,这些进一步的问题没有想清楚。


跟 @zavakid 讨论了一种可能的检测方式:

  • 在调用 beforeExecuteafterExecute 之前,将对应TTL实例标记一个in iterator 的状态;
  • 然后在 addValueremoveValue 这类可能触发 ConcurrentModificationException 的方法里判断当时是否处于 in iterator 状态。
  • 如果是,则抛异常出来,这样会比文档的约束效果更强,而且也能记录到调用堆栈的信息,方便用户直观找到问题所在。

@happyomg @zavakid

addValueremoveValue这样方法中,检测并提示有问题或风险的业务使用做法, 包含了业务调用栈,能直接定位出业务使用的问题点,非常方便了问题的排查,COOL~ 👍 💕 对用户使用问题,库做了主动的 trouble shooting!


如何具体修改方式,如

我再想一下。

欢迎一起讨论考虑解决方式及其选型评估~ ❤️ @happyomg @zavakid @大家

zavakid commented 2 years ago

如何具体修改方式,如

  • 是 强力的fail-fast
  • 还是 只做 使用问题或风险 的信息提示
    • 想到这样做而不fail-fast的原因是:
    • 我现在并不能完全确定:回调的方法(beforeExecute和afterExecute)中调整增减TTL实例在业务上一定是不需要的、不合理的。
    • 回调的方法的异常 是想被忽略的,回调不影响外部的主业务流程。

我再想一下。

cool! 确实要做这个决定的前提是要确定清楚 TTL 本身支不支持在 回调的方法(beforeExecute和afterExecute)中调整增减TTL实例,更进一步是,站在用户角度上,在用户需求上,是否有这个需要,值得好好分析。

我也还不确定用户在这个场景做了这样的事情,他的初始预期是什么?现在还缺少这样的信息。也需要 @wmq930626 @Markkkkks @happyomg 提供一下场景的使用初衷,虽然并不能代表全部用户,但还是能提供一下输入,也能一起帮助对设计做出更好的判断。

同时,我认为还有一点需要注意,TTL 一般都是被底层中间件或者作为很基础的横向组件在使用,在修改时必须要考虑向前的兼容性,既然 v2.12.2 中已经修复(或者说已经不让这个异常发生),现在突然直接 fastfail,这会让跑在 v2.12.2 的应用场景中可能突然故障。所以即使后续决定要进行 TTL 层面的 fast-fail,也需要有一个缓和的过渡。