alibaba / DataX

DataX是阿里云DataWorks数据集成的开源版本。
Other
15.55k stars 5.35k forks source link

BufferedRecordExchanger 线程安全吗? #2142

Closed makeloffve closed 4 weeks ago

makeloffve commented 1 month ago

ReaderRunner和WriteRunner都共用一个BufferedRecordExchanger,我看里面的bufferIndex会被并发修改,不会有问题嘛,即使是RingBuffer应该也是head和tail区分啊?为什么这样设计

FuYouJ commented 4 weeks ago

当时我看这里也有点迷惑,你的提问让我又去复习了一下。 这里要结合TaskGroupContaioner如何初始化ReaderRunner和WriterRunner开始看起。 1:在源码里,ReaderRunner,WriterRunner虽然是共享一个channel,但是却不是共享的一个BufferedRecordExchanger,而是各自new了一个实例。 2:BufferedRecordExchanger将数据缓存在buffer里,达到了一定批次再往channel里塞,channel是线程安全的,这有利于减少锁的冲突,这个设计非常了不起。

makeloffve commented 4 weeks ago

当时我看这里也有点迷惑,你的提问让我又去复习了一下。 这里要结合TaskGroupContaioner如何初始化ReaderRunner和WriterRunner开始看起。 1:在源码里,ReaderRunner,WriterRunner虽然是共享一个channel,但是却不是共享的一个BufferedRecordExchanger,而是各自new了一个实例。 2:BufferedRecordExchanger将数据缓存在buffer里,达到了一定批次再往channel里塞,channel是线程安全的,这有利于减少锁的冲突,这个设计非常了不起。

👍又去学习了一下,确实是一个线程一个buffer,只有full、empty、flush、terminate的时候才会操作BlockingQueue. 就是exchanger这个名字让我想到了共享