Matrix-Zhang / tokio_kcp

A Kcp implementation for tokio
MIT License
176 stars 44 forks source link

空data支持 #37

Open TheCGDF opened 3 months ago

TheCGDF commented 3 months ago

当客户端发送一个空的数据包时:

let mut buffer = [0u8; 0];
...
stream.write_all(&buffer).await.unwrap();

服务端的read会卡住,或者说,忽略这个空数据包

while let Ok(n) = stream.read(&mut buffer).await {
...
}

这导致一些兼容性问题。

以及某些kcp库的client在connect时会发送一个空数据包作为握手。

是否有办法接受空数据包?

zonyitoo commented 3 months ago

read() 返回 0 表示 EOF ,所以不能实现。 如果要实现握手,那应该在KCP Session层实现,即使是空数据包也可以发一个完整的KCP Segment,接收端也可以发一个ACK

zonyitoo commented 3 months ago

https://github.com/Matrix-Zhang/tokio_kcp/blob/bf3466b74ec8a93622998e776e2e42fa70fcb9d3/src/skcp.rs#L138-L185

从逻辑上看没有不允许发送 len() = 0 的数据,你是否是启用了 stream 模式?

TheCGDF commented 3 months ago

从逻辑上看没有不允许发送 len() = 0 的数据,你是否是启用了 stream 模式?

没有开启stream

我用wireshark抓包试了,当send buffer为空时,tokio_kcp不会发出任何udp包。

当用其他kcp库向tokio_kcp发一个buffer为空的udp包时,stream.read不会返回

TheCGDF commented 3 months ago

我看到作为依赖的kcp库中有 https://github.com/Matrix-Zhang/kcp/blob/58e863fcbdf4cdd0df9e9c378a864dd4d0c8f58f/src/kcp.rs#L505-L507 这样的逻辑,是否和这个有关(我不太确定)

zonyitoo commented 3 months ago

这段代码仅在 stream = true 时生效,所以问你是否有开启 stream

TheCGDF commented 3 months ago

真没开。 KcpConfig::default()默认是关闭的,我也试了下面这样的显式指定:

    let config =KcpConfig {
        mtu: 1400,
        nodelay: KcpNoDelayConfig {
            nodelay: false,
            interval: 40,
            resend: 0,
            nc: false,
        },
        wnd_size: (256, 256),
        session_expire: std::time::Duration::from_secs(90),
        flush_write: true,
        flush_acks_input: true,
        stream: false,
    };

nctrue/false也都试了没影响。

TheCGDF commented 3 months ago

https://github.com/Matrix-Zhang/kcp/blob/58e863fcbdf4cdd0df9e9c378a864dd4d0c8f58f/src/kcp.rs#L1246-L1249 这里还有一处判断,可能是这里?我打断点调试看到这里被跳过了

zonyitoo commented 3 months ago

https://github.com/Matrix-Zhang/kcp/blob/58e863fcbdf4cdd0df9e9c378a864dd4d0c8f58f/src/kcp.rs#L521-L539

至少会往 self.snd_queue 里面放 1 个 segment ,按理说不会被跳过

TheCGDF commented 3 months ago

https://github.com/Matrix-Zhang/kcp/blob/58e863fcbdf4cdd0df9e9c378a864dd4d0c8f58f/src/kcp.rs#L521-L539

至少会往 self.snd_queue 里面放 1 个 segment ,按理说不会被跳过

打断点看了,空数组的时候走不到那段代码,poll_sendsend都不会被调用,但是数组里有元素的时候这两个函数就会被调用。但是我找不到哪个分支判断处理的

TheCGDF commented 3 months ago

翻了tokio源码终于找到了

https://github.com/tokio-rs/tokio/blob/c8f3539bc11e57843745c68ee60ca5276248f9f9/tokio/src/io/util/write_all.rs#L42-L51

数组为空时不会调用poll_write,断点调试也验证了确实是这一个判断的区别

那我估计这是难修了,除非多一层封装先把 segment 放进去再调用poll

但是read那边也不支持空数组,还是有些头疼,难道也要连着 segment 读出来?

zonyitoo commented 3 months ago

read() 返回 0 是肯定不可以的,返回 0 表示的是 EOF ,不能去破坏这个约定 。要么就去另外搞一个函数去读,不用 AsyncRead trait

TheCGDF commented 3 months ago

read() 返回 0 是肯定不可以的,返回 0 表示的是 EOF ,不能去破坏这个约定 。要么就去另外搞一个函数去读,不用 AsyncRead trait

那目前的实现如果不打算改的话还是在文档或者example里标记一下比较好。。。

比如len() == 0视为UB什么的,毕竟也算是一个不兼容的特性

zonyitoo commented 3 months ago

不需要的,因为Rust所有的API都是类POSIX标准,read() = 0 就是 EOF ,不需要特意说明

zonyitoo commented 3 months ago

解决办法是加个option,如果设置了的话,write 允许写空包,那不要用 write_all 就好了,用 write() 是没有这个判断的 https://github.com/Matrix-Zhang/tokio_kcp/blob/bf3466b74ec8a93622998e776e2e42fa70fcb9d3/src/stream.rs#L165-L171

zonyitoo commented 3 months ago

不过你的业务层代码也直接用 write() 不就好了?不要用 write_all()

TheCGDF commented 3 months ago

还是有个比较奇怪的点,tokio的UdpSocket用的是send,而不是write,我试了下send可以发空包:

let u = UdpSocket::bind("127.0.0.1:6666".parse::<SocketAddr>().unwrap()).await.unwrap();
u.connect("127.0.0.1:5555").await.unwrap();
u.send(&[0u8;0]).await.unwrap();

也就是说socket可能应该用send/recv,而且发空包本身是一个被支持的操作。

于是我想着tokio_tcp是不是也可以用send,但是发现tokio_tcp的send无论数组是否为空,都不会发出任何包。

网络库的API应该用UdpSocketsend/recv那套吧?用文件io的read/write这套应该不太合理?

zonyitoo commented 3 months ago

KCP协议是一种流协议,因此与它最相似的就是 TcpStream ,因此设计了 KcpStream 。相类似的就是 TcpStream 实现了 AsyncReadAsyncWriteKcpStream 也实现了相同的接口。

底层用 UdpSocket 只是一般都这么用,实际上并没有规定一定要用 UDP 。目前我在尝试把底层实现交给用户来自定义,不一定要用 UdpSocket

对于这里提到的问题,是一种基于 KCP 实现的一种特例,它在 stream=false 时允许使用类似 Datagram 协议的方式来发包,但包的顺序又是固定的(与UDP完全不同)。因此我上面建议的是另外做两个接口 sendrecv 来支持这样的特性(仅 stream=false 时有意义)

TheCGDF commented 3 months ago

KCP协议是一种流协议,因此与它最相似的就是 TcpStream

试了下TcpStream在空包上处理方式确实和UDPSocket有些不一样:

let mut t = TcpStream::connect("127.0.0.1:6666").await.unwrap();
t.write_all(&[0u8;0]).await.unwrap();

TcpStream在进入write_all之后也跳过了空包的处理,然后TcpStream被析构的时候会发一个EOF包(KcpStream好像没有这个行为?),

从而服务端TcpListener那边在TcpStream生命周期结束后才会read出来一个len() ==0的空数组(显然太迟了)

let bytes_read = stream.read(&mut buffer).unwrap();

我也觉得send/recv方案可能好些,或者加个KcpSocket(对应UdpSocket)来区分流模式与非流模式。。。

zonyitoo commented 3 months ago
  1. TcpStream 在析构时没有 “发出一个 EOF 包”,请了解 TCP 协议原理。
  2. KcpStream 没有这个行为,是因为 KCP 协议没有 FIN 和 RST
TheCGDF commented 3 months ago

学到了。🙏

zonyitoo commented 3 months ago

当前就有 send

https://github.com/Matrix-Zhang/tokio_kcp/blob/bf3466b74ec8a93622998e776e2e42fa70fcb9d3/src/stream.rs#L84-L87

recv ,可以直接用

https://github.com/Matrix-Zhang/tokio_kcp/blob/bf3466b74ec8a93622998e776e2e42fa70fcb9d3/src/stream.rs#L140-L143

TheCGDF commented 3 months ago

当前就有 send

上面我说过了。。。tokio_tcp的send无论数组是否为空,都不会发出任何包。

zonyitoo commented 3 months ago

你是说 write_all 吧,send 是单独的函数

TheCGDF commented 3 months ago

你是说 write_all 吧,send 是单独的函数

重新测了下,之前测错了,应该是send只有长度为0时会发出包。

    let config = KcpConfig::default();
    let server_addr = "127.0.0.1:5555".parse::<SocketAddr>().unwrap();
    let mut stream = KcpStream::connect(&config, server_addr).await.unwrap();
    stream.send(&[0u8; 10]).await.unwrap();

这里的[0u8; 10]改成[0u8; 0]就能发出包,但是改成1或者其他什么数字就发不出去。

不知是我代码哪里写错了还是什么问题?

zonyitoo commented 3 months ago

在代码中加了一个这样的 test ,没有问题

#[cfg(test)]
mod test {
    use crate::KcpListener;

    use super::*;

    #[tokio::test]
    async fn test_stream_echo() {
        let config = KcpConfig::default();
        let server_addr = "127.0.0.1:5555".parse::<SocketAddr>().unwrap();

        let mut listener = KcpListener::bind(config.clone(), server_addr).await.unwrap();
        let listener_hdl = tokio::spawn(async move {
            loop {
                let (mut stream, peer_addr) = listener.accept().await.unwrap();
                println!("accepted {}", peer_addr);

                tokio::spawn(async move {
                    let mut buffer = [0u8; 8192];
                    loop {
                        match stream.recv(&mut buffer).await {
                            Ok(n) => {
                                println!("server recv: {:?}", &buffer[..n]);
                                stream.send(&buffer[..n]).await.unwrap();
                                println!("server sent: {:?}", &buffer[..n]);
                            }
                            Err(err) => {
                                println!("recv error: {}", err);
                                break;
                            }
                        }
                    }
                });
            }
        });

        let mut stream = KcpStream::connect(&config, server_addr).await.unwrap();

        let test_payload = b"HELLO WORLD";
        stream.send(test_payload).await.unwrap();
        println!("client sent: {:?}", test_payload);

        let mut recv_buffer = [0u8; 1024];
        let recv_n = stream.recv(&mut recv_buffer).await.unwrap();
        println!("client recv: {:?}", &recv_buffer[..recv_n]);
        assert_eq!(recv_n, test_payload.len());
        assert_eq!(&recv_buffer[..recv_n], test_payload);

        listener_hdl.abort();
    }
}

其中会输出

client sent: [72, 69, 76, 76, 79, 32, 87, 79, 82, 76, 68]
accepted 127.0.0.1:49853
server recv: [72, 69, 76, 76, 79, 32, 87, 79, 82, 76, 68]
server sent: [72, 69, 76, 76, 79, 32, 87, 79, 82, 76, 68]
client recv: [72, 69, 76, 76, 79, 32, 87, 79, 82, 76, 68]
test stream::test::test_stream_echo ... ok

应该是你测试的问题,你直接这么测,update() Task 都还没来得及跑一次就程序就已经退出了吧。反而写 [0u8; 0] 会有问题,服务端会 recv() 不出来

zonyitoo commented 3 months ago

recv() 读不出来是因为把 0 等同于 RecvQueueEmpty 了,这个 Fix 很简单

https://github.com/Matrix-Zhang/tokio_kcp/blob/bf3466b74ec8a93622998e776e2e42fa70fcb9d3/src/skcp.rs#L207

TheCGDF commented 3 months ago

确实是我测试方法不对,加了

tokio::time::sleep(std::time::Duration::from_secs(1)).await;

就能发出去了

这样看来就只剩recv()需要被修了🥲

zonyitoo commented 3 months ago

不过用发一个空的来表示一种特殊的功能,似乎并不是一种稳定靠谱的做法。但凡后面多发一个字节的数据过去,读出来就不是空的

TheCGDF commented 3 months ago

不过用发一个空的来表示一种特殊的功能,似乎并不是一种稳定靠谱的做法。但凡后面多发一个字节的数据过去,读出来就不是空的

但毕竟kcp协议没有禁止。。。

还有个,当conv为0的时候,服务端会重新生成一个随机的conv,这个行为好像不是kcp标准的行为?

有次随机生成的conv正好为0,导致连不上服务器,调试了好久都没复现,翻了代码才发现tokio_kcp里面有这么个逻辑😂。

zonyitoo commented 3 months ago

本身也没规定conv要怎么生成

TheCGDF commented 3 months ago

本身也没规定conv要怎么生成

主要是有这样的逻辑是不是在文档或者注释里标一下比较好。。。不然出了bug还挺难复现的。。。

TheCGDF commented 3 months ago

recv在超时的时候也是返回一个n=0而不是error,接收方怎么判断是发送方发了空数据还是超时了呢?

zonyitoo commented 3 months ago

判断不了。本身读到size=0表示的就是EOF,session超时本身这里的设计是想表示为关闭,关闭就向上层返回EOF表示结束。你这里一定要读到一个size=0的数据,很难做。

TheCGDF commented 3 months ago

判断不了。本身读到size=0表示的就是EOF,session超时本身这里的设计是想表示为关闭,关闭就向上层返回EOF表示结束。你这里一定要读到一个size=0的数据,很难做。

超时无法返回一个Timeout的Error吗?🥲

zonyitoo commented 3 months ago

可以实现,更好的办法是不是应该让你设置成不超时,由你自己来处理

TheCGDF commented 3 months ago

你是指像这样↓,自己设置一个last_received来记录上一次recv的时间,然后在tokio::select里自己判断和处理超时吗?

let mut last_received = Utc::now();
tokio::select! {
    _ = async {
        //最多超时两秒,否则发送队列会被塞爆,send会被阻塞
        let ms = (last_received + Duration::seconds(2) - Utc::now()).num_milliseconds();
        tokio::time::sleep(
            if ms <= 0 {
                std::time::Duration::ZERO
            } else {
                std::time::Duration::from_millis(ms as u64)
            }
        ).await;
    } =>{
        //session timeout
        break;
    }

    result = stream.recv(&mut buffer) => {
        last_received = Utc::now();
        //handle buffer
    }
}

我个人感觉如果能提供个超时的KcpResult还是挺有用的,这样后面如果有兴趣支持try_send的话也可以派得上用场。。。

在已有session_expire这一设定的前提下,还得send/recv前再多写一层超时检查的逻辑,总感觉有点丑陋。。。

zonyitoo commented 2 months ago

不用那么复杂,直接创建一个 Sleep 出来,每次如果收到了消息就把它 reset 一下就好了。

实际上可能更好的是:

loop {
    let n = match tokio::time::timeout(stream.recv(&mut buffer)) {
        Ok(Ok(n)) => n,
        Ok(Err(err)) => ... // socket error,
        Err(..) => ... // timedout
    };
}

这样写不是挺优雅的,没必要 select 。

我个人感觉如果能提供个超时的KcpResult还是挺有用的,这样后面如果有兴趣支持try_send的话也可以派得上用场。。。

研究了一下写起来可能会有点丑陋,tokio::time::timeout 的方案更优。因为实际上我想要规避的是如果业务代码没有主动去写超时,那么会直接爆内存。但只要业务代码写了,session 自动超时是没有必要的。

TheCGDF commented 2 months ago

这样写不是挺优雅的,没必要 select 。

我之前没说清楚,用select是因为还要从(联机)游戏的主世界channelreceiver接收主世界发来的逻辑帧。

主世界每15ms更新一次逻辑帧然后通过channel推送给世界里的所有玩家。

(这种模式应该是比较普遍的,我看一些其他issue里的代码也采用了这种模式,比如:https://github.com/Matrix-Zhang/tokio_kcp/issues/33

let mut last_received = Utc::now();
loop{
    tokio::select! {
        //接收主世界channel每隔15ms发来的逻辑帧
        recieved = world_receiver.recv() => {
            //将逻辑帧序列化为二进制
            if tokio::time::timeout(std::time::Duration::from_secs(2), stream.send(&response)).await.is_err(){
                //逻辑帧的send只能超时两秒
                break;
            }
        }

        _ = async {
            //recv最多等待两秒,否则发送队列会被塞爆,send会被阻塞
            let ms = (last_received + Duration::seconds(2) - Utc::now()).num_milliseconds();
            tokio::time::sleep(
                if ms <= 0 {
                    std::time::Duration::ZERO
                } else {
                    std::time::Duration::from_millis(ms as u64)
                }
            ).await;
        } =>{
            //session timeout
            break;
        }

        //这里如果使用tokio::time::timeout会永远触发不了timeout
        //因为会被主世界的逻辑帧抢先
        result = stream.recv(&mut buffer) => {
            last_received = Utc::now();
            //handle buffer
        }
    }
}

一共需要手动处理两个超时

在这种情况下,给recv套上tokio::time::timeout的方案应该是用不了的,因为主世界的channel总是会在15ms内发来消息,recvtokio::time::timeout超时永远不会触发。所以我目前能想到的方案就是弄一个不会被select中其他分支影响的last_received变量记录最后一次收到消息的时间。

tokio_kcp这边如果要设置超时的KcpResult,我猜应该也会需要弄一个last_received变量记录最后一次收到消息的时间?

tokio::time::timeout应该只适用于比较简单的一问一答?