xiwenAndlejian / my-blog

Java基础学习练习题
1 stars 0 forks source link

RPC 实现笔记(五)Consul 的初步使用 #25

Open xiwenAndlejian opened 5 years ago

xiwenAndlejian commented 5 years ago

在之前的基础上,引入Consul作为分布式调度组件,主要作用:服务发现、服务健康检查、订阅中心。

引入 Consul 后的结构:

RpcServer -> Consul:Server 在 Consul 中注册自己,成为 Consul 中的 Service。

Consul -> RpcServer:Consul 使用 check 对 Server 的健康状况进行检查。

RpcClient -> Consul:Client 连接 Consul,读取 Client 所需的服务,并订阅服务。

Consul -> RpcClient:向启用订阅的 Client 发送当前健康的 Service 列表。

RpcServer <--> RpcClient(旧):Client 发起 RPC 请求,Server 响应。

此处应有结构图(待补充)

引入 Consul 流程

启动 Consul

这里使用的是docker镜像的方式,也可直接安装

docker 启动单节点 server 模式 consul 源自博客

docker run -p 8500:8500/tcp \
    -d \
    consul \
    agent -server \
    -ui \
    -bootstrap-expect=1 \
    -client=0.0.0.0

宿主机端口访问问题

Consul 提供了很多检查服务健康的方式(Register Check APIChecks),包含httpttltcpgrpc等方式。由于我的 RPC 实现使用是基于 TCP 的,因此使用的是tcp check。这就面临一个问题,docker容器(consul)需要访问宿主机(server)端口,来检查 Server 是否运行良好。

想要访问宿主机端口并不是直接通过127.0.0.1即可,这里宿主机与容器之间似乎是建立了一个内网的关系,它们同在一个内网中,只能通过内网 IP 来互相访问。

幸好docker提供了解决方案:

工程中使用 Consul

可选择的 Java 客户端:

这里我选择的是 consul-client,API 的调用命名和调用方式较为舒服。

RpcServer:注册 Consul 服务

// 健康检查延时
int CONSUL_CHECK_INTERVAL = 3;
// host 与 port 的分隔符
char HOST_PORT_SEPARATOR = ':';
// 根据配置连接 consul
Consul client = Consul.builder()
                  .withHostAndPort(HostAndPort.fromString(config.getConsulHostAndPort()))
                  .build();

// 注册 service
Registration consulService = ImmutableRegistration.builder()
    .id(rpcHost + ":" + rpcPort)
    .name(serviceName)
    .address(rpcHost)
    .port(rpcPort)
    //                .check(Registration.RegCheck.tcp(rpcHost + HOST_PORT_SEPARATOR + rpcPort, CONSUL_CHECK_INTERVAL))
    // todo mac docker 容器与宿主机通信ip
    .check(Registration.RegCheck.tcp("docker.for.mac.localhost"+ HOST_PORT_SEPARATOR + rpcPort, CONSUL_CHECK_INTERVAL))// 注意这里的延时单位为 s
    .tags(Collections.singletonList("rpc"))// 标签
    .meta(Collections.singletonMap("version", "1.0"))
    .build();

// consul 中注册服务
consul.agentClient().register(consulService);

:warning:注意:上述代码中存在缺陷尚未解决。由于涉及容器访问宿主机端口问题,因此 consul 中 check 的 host 使用的是docker.for.mac.localhost,实际应使用变量rpcHost

RpcClient:获取服务列表 & 订阅服务

虽然向 consul 订阅服务之后,consul 会推送一次服务信息。但我在初始化服务信息时,还是手动获取了一遍服务列表。

private void init(List<String> consumerPackages) {
    // 根据配置文件中配置的扫描包列表,获取 consumer 所需的 serviceName 列表
    var serviceNameSet = ClassScanUtil.scanner(consumerPackages)
        .stream()
        .filter(clazz -> clazz.isAnnotationPresent(ProxyConsumer.class))
        .map(clazz -> clazz.getAnnotation(ProxyConsumer.class).serverName())
        .collect(Collectors.toSet());
    // 初始化:
    // 1. 从 consul 中,根据 serviceName 获取 healthyService 列表
    // 2. 根据 service 信息,建立长连接,并存储单个 service 的连接集合 serviceConnectMap = address -> connect
    // 3. 根据 serviceName 存储所有 service 的连接集合,connectMap = serviceName -> serviceConnectMap
    // 4. 构建负载均衡集合 loadBalanceMap =
    serviceNameSet.forEach(serviceName -> {
        Map<ServiceAddress, RpcConnect> initConnectMap = new HashMap<>();
        var healthyServices = consul.healthClient()
            .getHealthyServiceInstances(serviceName)
            .getResponse();
        healthyServices.stream()
            .map(ServiceHealth::getService)
            .forEach(service -> {
                ServiceAddress address = new ServiceAddress(service.getAddress(), service.getPort());
                RpcConnect     connect = new RpcConnect(address, this).connect();
                initConnectMap.put(address, connect);
            });
        connectMap.put(serviceName, initConnectMap);
        // 负载均衡
        LoadBalance<RpcConnect> loadBalance = loadBalanceFunction.apply(initConnectMap.values().toArray(new RpcConnect[0]));
        loadBalanceMap.put(serviceName, loadBalance);
    });
    // 订阅健康服务
    serviceNameSet.forEach(serviceName -> {
        serviceHealthCacheMap.put(serviceName, healthCacheFunction.apply(consul.healthClient()).apply(serviceName));
    });

}

healthCacheFunction:

private Function<HealthClient, Function<String, ServiceHealthCache>> healthCacheFunction = healthClient -> serviceName -> {
    log.debug("==== services update ====");
    ServiceHealthCache healthCache = ServiceHealthCache.newCache(healthClient, serviceName);
    healthCache.addListener((Map<ServiceHealthKey, ServiceHealth> newValues) -> {
        Map<ServiceAddress, RpcConnect> oldConnectMap = connectMap.get(serviceName);
        Map<ServiceAddress, RpcConnect> newConnectMap = new HashMap<>();
        // Set<serviceAddress>
        var newList = newValues.values()
            .stream()
            .map(ServiceHealth::getService)
            .map(service -> new ServiceAddress(service.getAddress(), service.getPort()))
            .collect(Collectors.toSet());
        // Set<serviceAddress>
        var oldList = connectMap.get(serviceName).keySet();
        //  在当前列表(N),但不在旧列表(O)。N - (N ∩ O)。创建新的 rpc 连接
        var createList = newList.stream()
            .filter(x -> !oldList.contains(x))
            .collect(Collectors.toSet());
        createList.forEach(address -> {
            RpcConnect connect = new RpcConnect(address, this).connect();
            newConnectMap.put(address, connect);
        });
        // 在旧列表(O),但不在当前列表(N)。O - (N ∩ O)。N关闭 rpc 连接 (释放资源、中断尝试重连)
        var closeList = oldList.stream()
            .filter(x -> !newList.contains(x))
            .collect(Collectors.toSet());
        closeList.forEach(address -> {
            RpcConnect connect = oldConnectMap.remove(address);
            connect.disConnect();
        });
        // 当前列表和旧列表都存在的(N ∩ O)不做处理
        // 旧 Map 中的剩余连接不做操作,放入新 Map 中
        newConnectMap.putAll(oldConnectMap);
        connectMap.replace(serviceName, newConnectMap);
        LoadBalance<RpcConnect> loadBalance = loadBalanceFunction.apply(newConnectMap.values().toArray(new RpcConnect[0]));
        loadBalanceMap.replace(serviceName, loadBalance);
    });
    // 启动订阅
    healthCache.start();
    return healthCache;
};

LoadBalance.java:

public abstract class LoadBalance<T> {
    protected List<T> data;
    protected int size;

    public LoadBalance(T[] data) {
        this.data = Arrays.asList(data);
        this.size = data.length;
    }

    public abstract T get();

    public int size() {
        return size;
    }

    public synchronized void add(T t) {
        data.add(t);
        size = data.size();
    }
}

轮询:

public class RoundRobinBalance<T> extends LoadBalance<T> {
    private int pos = 0;

    public RoundRobinBalance(T[] data) {
        super(data);
    }

    @Override
    public synchronized T get() {
        T result = data.get(pos);
        this.pos++;
        if (pos >= size) {
            pos = 0;
        }
        return result;
    }
}

获取服务:

public RpcConnect getConnect(String serviceName) {
    try {
        return loadBalanceMap.get(serviceName).get();
    } catch (NullPointerException e) {
        throw new ProxyException(e);
    }
}