sofastack / sofa-bolt

SOFABolt is a lightweight, easy to use and high performance remoting framework based on Netty.
https://www.sofastack.tech/projects/sofa-bolt/
Apache License 2.0
2.4k stars 856 forks source link

RpcClient 的 oneway 方法怎么设置发送时的顺序 #298

Closed iohao closed 1 year ago

iohao commented 2 years ago

Your question

代码中,消息可以发送成功; 但是没有按预期的发送顺序。


Connection connection = ...;
// 这是一个自定义的结构
TheHelloMessage message = .... ;

for (int i = 0; i < 10; i++) {
    TheHelloMessage message = new TheHelloMessage();
    message.amount = i;

    this.rpcClient.oneway(connection, message);
}

大概接收的日志如下

TheHelloMessage(amount=1)

TheHelloMessage(amount=0)

TheHelloMessage(amount=2)

TheHelloMessage(amount=3)

... ... 省略部份

就是接收的消息没有按预期的顺序,请问如何设置相关的参数!

Environment

iohao commented 2 years ago

接收方的日志

INFO [tor-7-thread-12]
INFO [tor-7-thread-15]
INFO [tor-7-thread-19]
INFO [tor-7-thread-18]
INFO [utor-7-thread-1]
INFO [utor-7-thread-4]
INFO [utor-7-thread-1]
INFO [utor-7-thread-9]
INFO [utor-7-thread-8]
INFO [utor-7-thread-3]
INFO [tor-7-thread-10]
INFO [utor-7-thread-7]

会不会是接收时的线程不是同一个导致的,如果想调整,如何配置。

iohao commented 2 years ago

知道了,在 AbstractUserProcessor 实现类中重写 getExecutor() 方法;使用一个线程处理就可以得到正确的顺序了。

chuailiwu commented 1 year ago

是的,时序要考虑清楚发送线程和处理线程的逻辑

iohao commented 1 year ago

@chuailiwu

image

请问下,接收方重写 getExecutor() 后,理论上可以保证顺序。但是现在接收到的顺序是乱的,在发送方还需要做什么设置吗。

... 省略部分代码

for (int i = 1; i <= 100; i++) {
    StringValue ready1 = new StringValue();
    ready1.value = "data == " + i;

    rpcClient.oneway(connection, ready1);
}
chuailiwu commented 1 year ago

client 用一个connection 顺序发送,服务端用一个线程处理应该是保证顺序的 抓包看下呢,看网络层数据是有序的吗?

iohao commented 1 year ago

client 用一个connection 顺序发送,服务端用一个线程处理应该是保证顺序的 抓包看下呢,看网络层数据是有序的吗?

还没抓包, #323 这里有个可复现的 demo

JavaTen commented 1 year ago

client 用一个connection 顺序发送,服务端用一个线程处理应该是保证顺序的 抓包看下呢,看网络层数据是有序的吗?

你好,这个bug能定位到问题吗?

OrezzerO commented 1 year ago

@iohao 看了下你的demo, 你把验证是否乱序的逻辑写到了客户端的 Callback中, 我觉得这是个问题点.

我们整体看一次调用, 会涉及到三个线程池:

  1. 客户端发送的线程池
  2. 服务端处理的线程池
  3. 客户端处理回调的线程池

现在 客户端处理回调的线程池 出现了乱序, 我看了下,这个线程池是 org.java_websocket.client.WebSocketClient 控制的,并不能代表 2.服务端处理的线程池 出现了乱序.

iohao commented 1 year ago

@iohao 看了下你的demo, 你把验证是否乱序的逻辑写到了客户端的 Callback中, 我觉得这是个问题点.

我们整体看一次调用, 会涉及到三个线程池:

  1. 客户端发送的线程池
  2. 服务端处理的线程池
  3. 客户端处理回调的线程池

现在 客户端处理回调的线程池 出现了乱序, 我看了下,这个线程池是 org.java_websocket.client.WebSocketClient 控制的,并不能代表 2.服务端处理的线程池 出现了乱序.

org.java_websocket.client.WebSocketClient 是指 DemoWebsocketClient 这个启动类吧,这个启动类的主要作用是为了触发请求。

也就是为了触发下面 DemoAction.order() 方法

@ActionController(1)
public class DemoAction {
    @ActionMethod(3)
    public void order() {

        // 
        var broadcastContext = BrokerClientHelper.getBroadcastOrderContext();

        CmdInfo cmdInfo = CmdInfo.getCmdInfo(1, 3);
        for (int i = 1; i <= 100; i++) {
            IntValue intValue = new IntValue();
            intValue.value = i;

            broadcastContext.broadcastOrder(cmdInfo, intValue);
        }

    }
}

broadcastContext.broadcastOrder(cmdInfo, intValue); 内部是调用的 rpcClient.oneway 方法,已经确定使用的是同一连接 channel 了。

此时的调用连链是:RpcClient -> RpcServer -> RpcClient -> DemoWebsocketClient

与之对应的调用处理代码点就是: 1 broadcastContext.broadcastOrder -> 2 BroadcastOrderMessageBrokerProcessor.java -> 3 BroadcastOrderMessageExternalProcessor.java -> 4 DemoWebsocketClient.java

所以,只需要看 2、3 这两个环节就可以了,

看下图的打印

以 - : 打头的是 BroadcastOrderMessageBrokerProcessor.java 打印的数据,分别打印的是:
 - 30
 - 31
 - 32

以 local ------ : 打头的是 BroadcastOrderMessageExternalProcessor.java 打印的数据,分别打印的是:
local ------ :  29
local ------ :  32
local ------ :  30

所以,在还没有给到模拟客户端这个环节数据就乱了;BroadcastOrderMessageBrokerProcessor、BroadcastOrderMessageExternalProcessor 都是设置的单线程。

image

最后才到模拟客户端的类 DemoWebsocketClient.java 在 DemoWebsocketClient.java 的控制台中,打印的数据分别是:29、32、30

那么也就是说,从 BroadcastOrderMessageExternalProcessor 给到模拟客户端的数据是对得上的。

image
iohao commented 1 year ago

@OrezzerO

OrezzerO commented 1 year ago

@iohao 看了下确实有你说的问题. 尝试调试了下,没查出问题. 现在 Client 和 Server 再一个 JVM 里面, 我打了断点很难确认是客户端逻辑还是服务端逻辑, 能不能提供一个 客户端和服务端分开启动的 Demo 呢?

iohao commented 1 year ago

@OrezzerO

@iohao 看了下确实有你说的问题. 尝试调试了下,没查出问题. 现在 Client 和 Server 再一个 JVM 里面, 我打了断点很难确认是客户端逻辑还是服务端逻辑, 能不能提供一个 客户端和服务端分开启动的 Demo 呢?

拉取一下 order 分支的代码 https://github.com/iohao/ioGameSimpleOne

.
├── DemoAction.java
├── DemoApplication.java
├── DemoLogicServer.java
├── DemoWebsocketClient.java
└── multiple
    ├── MyBrokerServerApp.java
    ├── MyExternalApp.java
    └── MyLogicApp.java

新增了 MyBrokerServerApp、MyExternalApp、MyLogicApp 三个类,可以分别的单独启动。

启动顺序为

  1. MyBrokerServerApp
  2. MyExternalApp
  3. MyLogicApp
  4. DemoWebsocketClient

之后可以多启动几次 DemoWebsocketClient.java 发起请求来测试。

OrezzerO commented 1 year ago

@iohao 找到原因了: RpcCommandHandler.handle 在处理批量请求的时候,会引入一个线程池,这个线程池导致了后续处理乱序. 加上这行代码关掉这个功能之后, 我测试就不乱序了:

        System.setProperty(RpcConfigs.DISPATCH_MSG_LIST_IN_DEFAULT_EXECUTOR,"false");
iohao commented 1 year ago

@iohao 找到原因了: RpcCommandHandler.handle 在处理批量请求的时候,会引入一个线程池,这个线程池导致了后续处理乱序. 加上这行代码关掉这个功能之后, 我测试就不乱序了:

        System.setProperty(RpcConfigs.DISPATCH_MSG_LIST_IN_DEFAULT_EXECUTOR,"false");

感谢!可以分享一下分析过程吗。

OrezzerO commented 1 year ago

感谢!可以分享一下分析过程吗。

1.通过抓包和分析请求可以发现, 在 bolt 包 content 中 第 438, 349 (350) 两到三位字节代表 proto 中的那个int ,我们抓包, 日志都通过观察这三位.其中 438 固定是 8 , 439 从0 开始 以2 为 step 递增.

  1. 一开始用 wireshark 抓包,想看看包是不是有序的. 看上去是有序的, 但实际上最后发现,包也是乱序的. 用wireshrk看还是不清晰.
  2. 改动bolt Encoder , Decoder 代码,分别输出 线程号和 439位数据. 发现服务端 Decoder是顺序的, encoder 是乱序的.
  3. debug 服务端处理代码, 发现在 userProcessor executor 的阻塞队列中的数据已经是乱序的了.
  4. 在创建 com.alipay.remoting.rpc.protocol.RpcRequestProcessor.ProcessTask#ProcessTask 的时候打印 线程名称,发现有意料之外的线程名, debug 到这边,往上翻翻 stack 就看到 RpcCommandHandler 的代码了 @iohao
iohao commented 1 year ago

感谢!可以分享一下分析过程吗。

1.通过抓包和分析请求可以发现, 在 bolt 包 content 中 第 438, 349 (350) 两到三位字节代表 proto 中的那个int ,我们抓包, 日志都通过观察这三位.其中 438 固定是 8 , 439 从0 开始 以2 为 step 递增. 2. 一开始用 wireshark 抓包,想看看包是不是有序的. 看上去是有序的, 但实际上最后发现,包也是乱序的. 用wireshrk看还是不清晰. 3. 改动bolt Encoder , Decoder 代码,分别输出 线程号和 439位数据. 发现服务端 Decoder是顺序的, encoder 是乱序的. 4. debug 服务端处理代码, 发现在 userProcessor executor 的阻塞队列中的数据已经是乱序的了. 5. 在创建 com.alipay.remoting.rpc.protocol.RpcRequestProcessor.ProcessTask#ProcessTask 的时候打印 线程名称,发现有意料之外的线程名, debug 到这边,往上翻翻 stack 就看到 RpcCommandHandler 的代码了 @iohao

谢谢分享这个过程。问一个问题,当开启了 System.setProperty(RpcConfigs.DISPATCH_MSG_LIST_IN_DEFAULT_EXECUTOR,"false"); 是会影响全局吗,就是不能使用这个特性了。

OrezzerO commented 1 year ago

是的, 如果要修改这一行为, 需要定制 RpcCommandHandler, 然后重新注册一个 Protocol , .现在默认启用的是: com.alipay.remoting.rpc.protocol.RpcProtocol

iohao commented 1 year ago

是的, 如果要修改这一行为, 需要定制 RpcCommandHandler, 然后重新注册一个 Protocol , .现在默认启用的是: com.alipay.remoting.rpc.protocol.RpcProtocol

🆗

nobodyiam commented 1 year ago

@OrezzerO

RpcConfigs.DISPATCH_MSG_LIST_IN_DEFAULT_EXECUTOR 的默认值是true,所以默认就会放到线程池执行,无法保证顺序。

@chuailiwu 针对这些配置,我们可以在文档里面补充一下?

chuailiwu commented 1 year ago

@OrezzerO

RpcConfigs.DISPATCH_MSG_LIST_IN_DEFAULT_EXECUTOR 的默认值是true,所以默认就会放到线程池执行,无法保证顺序。

@chuailiwu 针对这些配置,我们可以在文档里面补充一下?

ok, i create a demo(https://github.com/sofastack/sofa-bolt/pull/325) first