xiwenAndlejian / my-blog

Java基础学习练习题
1 stars 0 forks source link

RPC 实现笔记(一) #20

Open xiwenAndlejian opened 5 years ago

xiwenAndlejian commented 5 years ago

RPC 实现笔记(一)

wiki :远程过程调用(英语:Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用远程方法调用

简单来说就是实现一个工具,使得两个不同(也可以相同)服务器的程序能够互相调用函数

首先,实现服务端与客户端的通信

实现基于TCP的服务端与客户端通信

相关技术:

需要实现的大致流程

  1. 开启服务端,监听某个端口(eg:8000)
  2. 启动客户端,绑定 8000 端口
  3. 客户端发起请求:
    1. 客户端:组装请求参数 => 序列化 => 编码器
    2. 服务端:过滤器 => 解码器 => 反序列化 => 还原请求参数
  4. 服务端响应请求:
    1. 服务端:组装响应信息 => 序列化 => 编码器
    2. 客户端端:过滤器 => 解码器 => 反序列化 => 还原响应信息

所以,需要的组件:

  1. 服务端 & 客户端
  2. 序列化 & 反序列化工具
  3. 编码器 & 解码器
  4. 请求参数 & 响应信息

注:其中服务端 & 客户端可以参考Netty学习笔记(一)、(二)

序列化 & 反序列化:protostuff

protostuff

使用起来极为方便,参考 github 上的代码示例即可。

public class ProtoStuffSerializer {

    @SuppressWarnings("unchecked")
    public <T> byte[] serialize(T obj) {
        Schema<T> schema = (Schema<T>) RuntimeSchema.getSchema(obj.getClass());
        // Re-use (manage) this buffer to avoid allocating on every serialization
        LinkedBuffer buffer = LinkedBuffer.allocate(512);
        // ser
        final byte[] protostuff;
        try {
            protostuff = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } finally {
            buffer.clear();
        }
        return protostuff;
    }

    public <T> T deserialize(Class<T> clazz, byte[] bytes) {
        Schema<T> schema = RuntimeSchema.getSchema(clazz);
        T         obj    = schema.newMessage();
        ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
        return obj;
    }

}

编码器 & 解码器

首先,定义一个通信中使用的数据格式也可称为协议(protocol)

然后根据我们定义的数据格式,实现我们的编解码工具

数据格式

固定长度(首部):

  1. 魔法数(magic_number):4 字节,一个固定的数值,用于提供过滤器过滤请求的条件。请求以 magic_number 开头的才允许执行后续流程,否则返回错误信息,并断开连接。
  2. 协议版本(version):1 字节,用于后续协议升级。
  3. 序列化方式(serializer):1 字节,告诉消息接受方,应该以何种反序列化方式来处理此报文。
  4. 消息类型长度(messageTypeLength):4 字节,传输的消息类型序列化后的长度。
  5. 消息长度(messageLength):4 字节,消息序列化后的长度。

不定长(数据):

  1. 消息类型(messageType):长度 M,消息类型序列化的字节数组。
  2. 消息(message):长度 N,消息序列化的字节数组。

注意:尽量分开固定长度和不固定长度的,这样数据更便于处理粘包、拆包等情况。

编解码工具

public <T extends Packet> ByteBuf encode(T packet) {
    ByteBuf    byteBuf    = ByteBufAllocator.DEFAULT.ioBuffer();
    Serializer serializer = Serializer.DEFAULT_SERIALIZER;
    // packet 类型序列化
    // todo 类对象重复利用
    byte[] clazzBytes = serializer.serialize(new PacketInfo(packet.getClass()));
    // 将消息序列化
    byte[] msgBytes = serializer.serialize(packet);

    // 首部:协议中固定长度的部分
    // 1. magic_number 4 字节
    byteBuf.writeInt(MAGIC_NUMBER);
    // 2. 版本号 1 字节
    byteBuf.writeByte(packet.getVersion());
    // 3. 序列化方式 1 字节
    byteBuf.writeByte(serializer.getSerializerType());
    // 4. 消息类型数据长度 4 字节
    byteBuf.writeInt(clazzBytes.length);
    // 5. 消息数据长度 4 字节
    byteBuf.writeInt(msgBytes.length);

    // 尾部:协议中不定长的部分
    // 6. 消息类型 长度 N
    byteBuf.writeBytes(clazzBytes);
    // 7. 消息数据长度 M
    byteBuf.writeBytes(msgBytes);

    return byteBuf;
}

@SuppressWarnings("unchecked")
public <T extends Packet> T decode(ByteBuf byteBuf) {
    // 跳过 magic_number 和 版本号
    // 这两个部分应当在 inboundHandler 中进行校验,不通过校验的消息将被舍去,并关闭连接。
    byteBuf.skipBytes(4).skipBytes(1);
    // 3. 序列化方式
    byte serializerCode = byteBuf.readByte();
    // 4. 数据类型 1 字节
    int clazzLength = byteBuf.readInt();
    // 5. 数据长度 4 字节
    int msgLength = byteBuf.readInt();

    // 6. 类型
    byte[] clazzBytes = new byte[clazzLength];
    // 7. 数据
    byte[] msgBytes = new byte[msgLength];
    byteBuf.readBytes(clazzBytes);
    byteBuf.readBytes(msgBytes);

    Serializer              serializer = getSerializer(serializerCode);
    Class<? extends Packet> clazz      = serializer.deserialize(PacketInfo.class, clazzBytes).getClazz();

    return (T) serializer.deserialize(clazz, msgBytes);
}

@AllArgsConstructor
@Data
class PacketInfo {
    private Class<? extends Packet> clazz;
}

备注:由于无法直接序列化Class对象,因此用PacketInfo包装了一下

Packet.java

@Data
public abstract class Packet<ID> {

    private transient byte version = 1;
    public abstract ID getRequestId();
    public abstract void setRequestId(ID id);

}

解码器:

public class PacketDecoder extends ByteToMessageDecoder {

    private static final PacketCodec codec = PacketCodec.INSTANCE;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Packet packet = codec.decode(in);
        out.add(packet);
    }

}

编码器:

public class PacketEncoder extends MessageToByteEncoder<Packet> {

    private static final PacketCodec codec = PacketCodec.INSTANCE;

    @Override
    protected void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception {
        out.writeBytes(codec.encode(msg));
    }

}

RPC 请求 & 响应信息

请求:

@EqualsAndHashCode(callSuper = false)
@Data
@Builder
public class RpcRequestPacket extends Packet<Long> {

    private Class    clazz;
    private String   methodName;
    private Object[] parameters;
    private Class[]  parameterTypes;
    private Long     requestId;

}

以上成员属性除requestId外均是提供动态代理使用的属性,即调用某个类的某个方法名,以及此方法需要的参数列表和对应参数类型列表。

requestId:用于客户端关联异步执行的 RPC 请求与结果,服务端不做任何操作,直接返回即可。

响应:

@EqualsAndHashCode(callSuper = false)
@Data
public class RpcResponsePacket extends Packet<Long> {

    private Class          clazz;
    private Object         response;
    private ProxyException exception;
    private Long           requestId;

    @SuppressWarnings("unchecked")
    public <T> T getRpcResult() {
        if (exception != null) {
            throw exception;
        }
        return (T) response;
    }

}