iohao / ioGame

无锁异步化、事件驱动架构设计的 java netty 网络编程框架; 轻量级,无需依赖任何第三方中间件或数据库就能支持集群、分布式; 适用于网络游戏服务器、物联网、内部系统及各种需要长连接的场景; 通过 ioGame 你可以很容易的搭建出一个集群无中心节点、集群自动化、分布式的网络服务器;FXGL、Unity、UE、Cocos Creator、Godot、Netty、Protobuf、webSocket、tcp、socket;java Netty 游戏服务器框架;
http://game.iohao.com
GNU Affero General Public License v3.0
904 stars 201 forks source link

userId 与 Executor 关联,确保同一玩家使用的业务线程是相同的 #77

Closed iohao closed 1 year ago

iohao commented 1 year ago

框架提供 RequestMessageClientProcessorHook 新的默认实现 DefaultRequestMessageClientProcessorHook 功能

  1. userId 与 Executor 关联,确保同一玩家使用的业务线程是相同的。
  2. 如果没有登录,使用 channelId 计算;如果 channelId 不存在,则使用 cmd

目前的 RequestMessageClientProcessorHook 默认实现较为简单,如下

RequestMessageClientProcessorHook requestMessageClientProcessorHook = BarSkeleton::handle;

在 bolt 用户线程中直接调用业务框架来消费请求,在某些业务场景中,这种默认配置更适合,因为 bolt 处理业务时是多线程的,使用的是 bolt 的用户线程;(注意:这个用户线程并不是指的玩家线程,这个会在之后的相关文档中说明)。

由于 bolt 处理业务时是多线程的,当同一个玩家的多个请求同时修改一个共享数据时,会有并发问题。所以通常需要开发者自定义线程关联来处理这个问题,让开发者自定义线程的关联,具有更高的灵活性与扩展性,这个在稍后举例说明;

如何定制 RequestMessageClientProcessorHook

这里给出一些参考:使用JDK的、使用 Netty 的、使用 TTL

新默认实现 DefaultRequestMessageClientProcessorHook

框架提供将要提供的,新的默认实现;使用的JDK 提供的

final class DefaultRequestMessageClientProcessorHook implements RequestMessageClientProcessorHook {
    final Executor[] executors;
    ... ... 省略部分代码
    public DefaultRequestMessageClientProcessorHook() {
        int availableProcessors = availableProcessors2n();
        ... ... 省略部分代码

        for (int i = 0; i < availableProcessors; i++) {
            // 线程名:RequestMessage-线程总数-当前线程编号
            int threadNo = i + 1;
            String threadNamePrefix = String.format("RequestMessage-%s-%s", availableProcessors, threadNo);
            executors[i] = ExecutorKit.newSingleThreadExecutor(new TheThreadFactory(threadNamePrefix));
        }
    }

    @Override
    public void processLogic(BarSkeleton barSkeleton, FlowContext flowContext) {
        long userId = flowContext.getUserId();

        if (userId == 0) {
            HeadMetadata headMetadata = flowContext.getRequest().getHeadMetadata();
            // 如果没有登录,使用 channelId 计算;如果 channelId 不存在,则使用 cmd
            userId = Optional.ofNullable(headMetadata.getChannelId())
                    .map(String::hashCode)
                    .map(Math::abs)
                    .orElseGet(headMetadata::getCmdMerge);
        }

        // 根据 userId 获取对应的 Executor
        int index = (int) (userId & (executorLength - 1));

        // 执行业务框架
        executors[index].execute(() -> barSkeleton.handle(flowContext));
    }
}

使用 Netty 的 DefaultThreadFactory

使用 netty 提供的 DefaultThreadFactory 线程工厂创建的 Executor。 DefaultThreadFactory 提供了 FastThreadLocal、FastThreadLocalRunnable、FastThreadLocalThread 相关的实现,重写了 JDK 提供的部分,开发者可以按需参考

final class NettyRequestMessageClientProcessorHook implements RequestMessageClientProcessorHook {
    ... ... 省略部分代码
    public NettyRequestMessageClientProcessorHook() {
        ... ... 省略部分代码

        for (int i = 0; i < availableProcessors; i++) {
            // io.netty.util.concurrent.DefaultThreadFactory
            var threadFactory = new DefaultThreadFactory("RequestMessage-" + i);
            executors[i] = ExecutorKit.newSingleThreadExecutor(threadFactory);
        }
    }
}

使用 TTL

在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题。

如果关注的是异步执行时上下文传递的问题,可以参考 https://github.com/alibaba/transmittable-thread-local

小结

除了列出的参考实现外,还有很多其他的实现,开发者按需参考与选择;

参考文档:业务线程编排-钩子接口

线程相关文档将在 colse issu 后更新与添加更多的内容

iohao commented 1 year ago

ioGame 线程相关文档

iohao commented 1 year ago

17.1.38 及之后版本