xiwenAndlejian / my-blog

Java基础学习练习题
1 stars 0 forks source link

Rpc 实现笔记(三)半包粘包 & 使用 EmbeddedChannel 进行单元测试 #23

Open xiwenAndlejian opened 5 years ago

xiwenAndlejian commented 5 years ago

Rpc 实现笔记(三)半包粘包 & 使用 EmbeddedChannel 进行单元测试

半包粘包现象

由于底层数据传输是通过TCP协议,因此在消息传输中可能遇到粘包半包现象。

具体现象

服务端与客户端建立连接后,发送1000hello,服务端将接受的结果打印到终端。

for (int i = 0; i < 1_000; i++) {
    // 注:需要增加字符串编解码器
    channel.writeAndFlush("hello");
}

一下是部分服务端接受结果:

hello
hello
hellohellohellohello
hellohellohellohellohellohellohello
hello
hello
hellohello
hello
hellohellohellohellohellohellohellohellohellohellohellohellohell
o
hellohello
hellohellohellohello
hellohellohellohellohellohellohellohellohellohello
hellohellohellohellohellohello
hellohellohellohellohello
hellohellohellohellohellohellohello
hellohellohello
hellohellohellohellohello

根据输出我们可以看出存在三种接受情况

这里暂不讨论出现上述粘包半包现象的具体原理,仅给出解决方案。

Netty 解码器

Netty内部提供了解决上述现象的一些解码器。

例如:

具体选择哪一个解码器,需要根据 TCP 传输时使用的协议格式而定。这里我选择的是LengthFieldBasedFrameDecoder。使用的协议格式如下:

魔法数[4]-版本号[1]-数据长度[4]-序列化方式[1]-消息类型长度[4]-消息类型[N]-消息长度[4]-消息-[M]

注:

  1. -只是为了展示协议格式,区分各个数据段而使用的分隔符,实际协议中并不包含。
  2. []中的数字代表当前数据段的具体长度。其中NM表示变量,表示不确定具体长度
  3. 数据长度代表其后数据的长度,不包括它自己,即数据长度值=1 + 4 + N + 4 + M

代码

public class RpcFrameDecoder extends LengthFieldBasedFrameDecoder {

    // 长度域起始字节:魔法数+版本号
    private static final int LENGTH_FIELD_OFF_SET = 5;
    // 长度域的长度
    private static final int LENGTH_FIELD_LENGTH = 4;
    // 长度域校准
    // eg:当长度不只是代表数据块的长度时,需要对数据块的长度进行校准,减去额外的部分,获取实际数据块的长度
    private static final int LENGTH_FIELD_ADJUSTMENT = 0;
    // 抛弃头部字节的长度:魔法数+版本号+长度域
    private static final int DISCARD_HEADER_LENGTH = 9;

    public RpcFrameDecoder() {
        super(Integer.MAX_VALUE, LENGTH_FIELD_OFF_SET, LENGTH_FIELD_LENGTH, LENGTH_FIELD_ADJUSTMENT, DISCARD_HEADER_LENGTH);
    }

}
xiwenAndlejian commented 5 years ago

单元测试

Netty提供了EmbeddedChannel类,方便我们对ChannelHandler进行测试。

例如,对上述解码器进行测试

@Test
public void testChannel() {
    // 消息
    RpcRequestPacket in = RpcRequestPacket.builder()
        .clazz(Hello.class)
        .methodName("hello")
        .parameters(null)
        .build();

    EmbeddedChannel channel = new EmbeddedChannel(
        new RpcFrameDecoder(),
        new PacketEncoder(),
        new PacketDecoder()
    );
    // 写入入站消息
    Assert.assertTrue(channel.writeInbound(in));
    Assert.assertTrue(channel.writeInbound(in));
    // 读取消息,并判断消息是否成功解析
    RpcRequestPacket read = channel.readInbound();
    Assert.assertEquals(in, read);
    read = channel.readInbound();
    Assert.assertEquals(in, read);
}