* <p> If this channel is in non-blocking mode then an invocation of this
* method initiates a non-blocking connection operation. If the connection
* is established immediately, as can happen with a local connection, then
* this method returns <tt>true</tt>. Otherwise this method returns
* <tt>false</tt> and the connection operation must later be completed by
* invoking the {@link #finishConnect finishConnect} method.
/* In the "Plaintext" setting, we are using socketChannel to read & write to the network. But for the "SSL" setting,
* we encrypt the data before we use socketChannel to write data to the network, and decrypt before we return the responses.
* This requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrypted
* we won't be able to read exact no.of bytes as kafka protocol requires. We read as many bytes as we can, up to SSLEngine's
* application buffer size. This means we might be reading additional bytes than the requested size.
* If there is no further data to read from socketChannel selector won't invoke that channel and we've have additional bytes
* in the buffer. To overcome this issue we added "stagedReceives" map which contains per-channel deque. When we are
* reading a channel we read as many responses as we can and store them into "stagedReceives" and pop one response during
* the poll to add the completedReceives. If there are any active channels in the "stagedReceives" we set "timeout" to 0
* and pop response and add to the completedReceives.
* Atmost one entry is added to "completedReceives" for a channel in each poll. This is necessary to guarantee that
* requests from a channel are processed on the broker in the order they are sent. Since outstanding requests added
* by SocketServer to the request queue may be processed by different request handler threads, requests on each
* channel must be processed one-at-a-time to guarantee ordering.
*/
我们知道kafka是基于TCP连接的。其并没有像很多中间件使用netty作为TCP服务器。而是自己基于Java NIO写了一套。关于kafka为什么没有选用netty的原因可以看这里。
对Java NIO不太了解的同学可以先看下这两篇文章,本文需要读者对NIO有一定的了解。
https://segmentfault.com/a/1190000012316621
https://www.jianshu.com/p/0d497fe5484a
更多文章见个人博客:https://github.com/farmerjohngit/myblog
几个重要类
先看下Kafka Client的网络层架构,图片来自于这篇文章。
本文主要分析的是Network层。
Network层有两个重要的类:
Selector
和KafkaChannel
。这两个类和Java NIO层的
java.nio.channels.Selector
和Channel
有点类似。Selector
几个关键字段如下从网络层来看kafka是分为client端(producer和consumer,broker作为从时也是client)和server端(broker)的。本文将分析client端是如何建立连接,以及收发数据的。server也是依靠
Selector
和KafkaChannel
进行网络传输。在Network层两端的区别并不大。建立连接
kafka的client端启动时会调用
Selector#connect
(下文中如无特殊注明,均指org.apache.kafka.common.network.Selector
)方法建立连接。这里的流程和标准的NIO流程差不多,需要单独说下的是
socketChannel#connect
方法返回true的场景,该方法的注释中有提到也就是说在非阻塞模式下,对于
local connection
,连接可能在马上就建立好了,那该方法会返回true,对于这种情况,不会再触发之后的connect
事件。因此kafka用一个单独的集合immediatelyConnectedKeys
将这些特殊的连接记录下来。在接下来的步骤会进行特殊处理。之后会调用poll方法对网络事件监听:
因为
immediatelyConnectedKeys
中的连接不会触发CONNNECT事件,所以在poll时会单独对immediatelyConnectedKeys
的channel调用finishConnect
方法。在明文传输模式下该方法会调用到PlaintextTransportLayer#finishConnect
,其实现如下:关于
immediatelyConnectedKeys
更详细的内容可以看看这里。发送数据
kafka发送数据分为两个步骤:
1.调用
Selector#send
将要发送的数据保存在对应的KafkaChannel
中,该方法并没有进行真正的网络IO。Selector#poll
,在第一步中已经对该channel注册了WRITE事件的监听,所以在当channel可写时,会调用到pollSelectionKeys
将数据真正的发送出去。当可写时,会调用
KafkaChannel#write
方法,该方法中会进行真正的网络IO:接收数据
如果远端有发送数据过来,那调用poll方法时,会对接收到的数据进行处理。
在之后的
addToCompletedReceives
方法中会对该集合进行处理。读出数据后,会先放到stagedReceives集合中,然后在
addToCompletedReceives
方法中对于每个channel都会从stagedReceives取出一个NetworkReceive(如果有的话),放入到completedReceives中。这样做的原因有两点:
mute
掉,即不再从该channel上读取数据。当处理完成之后,才将该channelunmute
,即之后可以从该socket上读取数据。而client端则是通过InFlightRequests#canSendMore
控制。代码中关于这段逻辑的注释如下:
End
本文分析了kafka network层的实现,在阅读kafka源码时,如果不把network层搞清楚会比较迷,比如req/resp的顺序保障机制、真正进行网络IO的不是send方法等等。