kerryjiang / SuperSocket

SuperSocket is a light weight, cross platform and extensible socket server application framework.
Apache License 2.0
3.94k stars 1.14k forks source link

一个奇怪的消息发送问题 #735

Closed x0gundam1133 closed 4 months ago

x0gundam1133 commented 4 months ago

之前版本beta.22 升级了版本beta.23 问题一样存在 有个很奇怪的问题 2个程序在同台电脑上传递消息,IP使用的127.0.0.1 A程序发送消息 Connection.SendAsync(new ReadOnlyMemory(buffers)).DoNotAwait(); B程序接收消息 public ValueTask Handle(IAppSession session, ProtobufInfo package, CancellationToken cancellationToken) 打印发送和接收的时间对比,正常情况都是几毫秒就能收到数据,偶尔会出现延迟0.6-1秒才能收到数据

x0gundam1133 commented 4 months ago

尝试设置 receiveBufferSize 和sendBufferSize 也解决不了问题

kerryjiang commented 4 months ago

能写一段单元测试重现问题吗?比如说循环Echo 1000遍,如果接收返回时间超过10ms就失败。

x0gundam1133 commented 4 months ago

写了个单独的测试代码,版本都用了beta23 ,目前测试情况如下: 1 本机测试没问题 2 虚拟机测试没问题 然后又用项目再测试了下 3 本机项目运行没问题,换了其他物理机运行也没问题 4 虚拟机上项目运行必出延迟问题,而且比较频繁,换一个物理机上的虚拟机也一样延迟 这下问题就头疼了,主要是A应用发送消息到B应用接收都是supersocket接口,没其他自己的逻辑代码了

x0gundam1133 commented 4 months ago

提取了去年最早的2.0版本beta.8代码 问题一样存在。。

kerryjiang commented 4 months ago

加个测试用例,在虚拟机上(能够重现问题的环境)跑。然后把你的测试用例分享出来。

x0gundam1133 commented 4 months ago

测试用例搞定了,另外也完全独立写了测试APP,在虚拟机上都无法重现问题。 不知道受其他什么影响导致,是否有其他查问题的建议?

kerryjiang commented 4 months ago

Where is your test code?

x0gundam1133 commented 4 months ago

upload to qq group

x0gundam1133 commented 4 months ago

以下是项目打印LOG,直接在send接口调用和recv接收接口处打印 因为跨进程了所以打印time为DateTime.Now.TimeOfDay.TotalSeconds Gate 2024-07-02 22:08:23,735 [.NET TP Worker] INFO SuperSocketService - GateSend msgid: 15008 time :79703.7352055 Logic 2024-07-02 22:08:24,739 [.NET TP Worker] INFO SuperSocketService - LogicRecv msgid: 15008 time :79704.7397662 Logic 2024-07-02 22:08:24,740 [.NET TP Worker] INFO SuperSocketService - LogicSend msgid: 20008 time: 79704.7403157 Gate 2024-07-02 22:08:24,751 [.NET TP Worker] INFO SuperSocketService - GateRecv msgid: 20008 time :79704.751815

x0gundam1133 commented 4 months ago

我江源代码引项目,并在PipeConnection的发送接收打印log Logger.LogInformation($"supersocket recv1 time:{DateTime.Now.TimeOfDay.TotalSeconds}"); var bytesRead = await FillPipeWithDataAsync(memory, cancellationToken).ConfigureAwait(false); Logger.LogInformation($"supersocket recv2 size:{bytesRead} time:{DateTime.Now.TimeOfDay.TotalSeconds}");

                Logger.LogInformation($"supersocket send1 time:{DateTime.Now.TimeOfDay.TotalSeconds}");
                var len = await SendOverIOAsync(buffer, CancellationToken.None).ConfigureAwait(false); ;
                Logger.LogInformation($"supersocket send2 size:{len} time:{DateTime.Now.TimeOfDay.TotalSeconds}");
打印结果如下,Logic接收是很快的 ,但是从底层传递到IPackageHandler的接口之间不知道为什么延迟了那么多 Gate 2024-07-03 13:11:42,858 [.NET TP Worker] INFO  IConnection - supersocket recv2 size:22 time:47502.8584043
Gate 2024-07-03 13:11:42,858 [.NET TP Worker] INFO  IConnection - supersocket recv1 time:47502.8584902
Gate 2024-07-03 13:11:42,861 [.NET TP Worker] INFO  SuperSocketService - GateSend msgid: 15008 size:33 time :47502.8611115
Gate 2024-07-03 13:11:42,861 [.NET TP Worker] INFO  IConnection - supersocket send1 time:47502.8612225
Gate 2024-07-03 13:11:42,861 [.NET TP Worker] INFO  IConnection - supersocket send2 size:33 time:47502.8613853
Logic 2024-07-03 13:11:42,861 [.NET TP Worker] INFO  IConnection - supersocket recv2 size:33 time:47502.86158
Logic 2024-07-03 13:11:42,861 [.NET TP Worker] INFO  IConnection - supersocket recv1 time:47502.8616623
Logic 2024-07-03 13:11:43,561 [.NET TP Worker] INFO  SuperSocketService - LogicRecv msgid: 15008 time :47503.5619249
Logic 2024-07-03 13:11:43,562 [.NET TP Worker] INFO  SuperSocketService - LogicSend msgid:20008 size:89 time: 47503.5621652
Logic 2024-07-03 13:11:43,562 [.NET TP Worker] INFO  IConnection - supersocket send1 time:47503.5626357
Logic 2024-07-03 13:11:43,562 [.NET TP Worker] INFO  IConnection - supersocket send2 size:89 time:47503.562856
Gate 2024-07-03 13:11:43,563 [.NET TP Worker] INFO  IConnection - supersocket recv2 size:89 time:47503.5633888
Gate 2024-07-03 13:11:43,563 [.NET TP Worker] INFO  IConnection - supersocket recv1 time:47503.5635426
Gate 2024-07-03 13:11:43,574 [.NET TP Worker] INFO  SuperSocketService - GateRecv msgid: 20008 time :47503.5739918
Gate 2024-07-03 13:11:43,574 [.NET TP Worker] INFO  IConnection - supersocket send1 time:47503.5741234
Gate 2024-07-03 13:11:43,574 [.NET TP Worker] INFO  IConnection - supersocket send2 size:71 time:47503.5742357
x0gundam1133 commented 4 months ago

我江源代码引项目,并在PipeConnection的发送接收打印log Logger.LogInformation($"supersocket recv1 time:{DateTime.Now.TimeOfDay.TotalSeconds}"); var bytesRead = await FillPipeWithDataAsync(memory, cancellationToken).ConfigureAwait(false); Logger.LogInformation($"supersocket recv2 size:{bytesRead} time:{DateTime.Now.TimeOfDay.TotalSeconds}");

                Logger.LogInformation($"supersocket send1 time:{DateTime.Now.TimeOfDay.TotalSeconds}");
                var len = await SendOverIOAsync(buffer, CancellationToken.None).ConfigureAwait(false); ;
                Logger.LogInformation($"supersocket send2 size:{len} time:{DateTime.Now.TimeOfDay.TotalSeconds}");
打印结果如下,Logic接收是很快的 ,但是从底层传递到IPackageHandler的接口之间不知道为什么延迟了那么多 Gate 2024-07-03 13:11:42,858 [.NET TP Worker] INFO  IConnection - supersocket recv2 size:22 time:47502.8584043
Gate 2024-07-03 13:11:42,858 [.NET TP Worker] INFO  IConnection - supersocket recv1 time:47502.8584902
Gate 2024-07-03 13:11:42,861 [.NET TP Worker] INFO  SuperSocketService - GateSend msgid: 15008 size:33 time :47502.8611115
Gate 2024-07-03 13:11:42,861 [.NET TP Worker] INFO  IConnection - supersocket send1 time:47502.8612225
Gate 2024-07-03 13:11:42,861 [.NET TP Worker] INFO  IConnection - supersocket send2 size:33 time:47502.8613853
Logic 2024-07-03 13:11:42,861 [.NET TP Worker] INFO  IConnection - supersocket recv2 size:33 time:47502.86158
Logic 2024-07-03 13:11:42,861 [.NET TP Worker] INFO  IConnection - supersocket recv1 time:47502.8616623
Logic 2024-07-03 13:11:43,561 [.NET TP Worker] INFO  SuperSocketService - LogicRecv msgid: 15008 time :47503.5619249
Logic 2024-07-03 13:11:43,562 [.NET TP Worker] INFO  SuperSocketService - LogicSend msgid:20008 size:89 time: 47503.5621652
Logic 2024-07-03 13:11:43,562 [.NET TP Worker] INFO  IConnection - supersocket send1 time:47503.5626357
Logic 2024-07-03 13:11:43,562 [.NET TP Worker] INFO  IConnection - supersocket send2 size:89 time:47503.562856
Gate 2024-07-03 13:11:43,563 [.NET TP Worker] INFO  IConnection - supersocket recv2 size:89 time:47503.5633888
Gate 2024-07-03 13:11:43,563 [.NET TP Worker] INFO  IConnection - supersocket recv1 time:47503.5635426
Gate 2024-07-03 13:11:43,574 [.NET TP Worker] INFO  SuperSocketService - GateRecv msgid: 20008 time :47503.5739918
Gate 2024-07-03 13:11:43,574 [.NET TP Worker] INFO  IConnection - supersocket send1 time:47503.5741234
Gate 2024-07-03 13:11:43,574 [.NET TP Worker] INFO  IConnection - supersocket send2 size:71 time:47503.5742357
x0gundam1133 commented 4 months ago

可以确定不用看gate的日志了 ,logic接收数据到传递到上层延迟了
2024-07-03 13:56:33,348 [.NET TP Worker] INFO IConnection - supersocket recv2 size:33 time:50193.3488561 2024-07-03 13:56:33,349 [.NET TP Worker] INFO IConnection - supersocket recv1 time:50193.3490576 2024-07-03 13:56:33,967 [.NET TP Worker] INFO SuperSocketService - HandlePackage time:50193.967511 2024-07-03 13:56:33,970 [.NET TP Worker] INFO SuperSocketService - LogicRecv msgid: 15008 time :50193.9704912 2024-07-03 13:56:33,970 [.NET TP Worker] INFO SuperSocketService - LogicSend msgid:20008 size:112 time: 50193.9707768 2024-07-03 13:56:33,971 [.NET TP Worker] INFO IConnection - supersocket send1 time:50193.9713833 2024-07-03 13:56:33,971 [.NET TP Worker] INFO IConnection - supersocket send2 size:112 time:50193.9715028

具体逻辑在SuperSocketService的private async ValueTask HandleSession(AppSession session, IConnection connection)接口

#if NET6_0_OR_GREATER
                using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(connection.ConnectionToken);
#endif

                await foreach (var p in packageStream)
                {
                    if (_packageHandlingContextAccessor != null)
                    {
                        _packageHandlingContextAccessor.PackageHandlingContext = new PackageHandlingContext<IAppSession, TReceivePackageInfo>(session, p);
                    }

#if !NET6_0_OR_GREATER
                    using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(connection.ConnectionToken);
#endif
                    Logger.LogInformation($"HandlePackage time:{DateTime.Now.TimeOfDay.TotalSeconds}");
                    await packageHandlingScheduler.HandlePackage(session, p, cancellationTokenSource.Token);

#if NET6_0_OR_GREATER
                    cancellationTokenSource.TryReset();
#endif
                }
x0gundam1133 commented 4 months ago

导致延迟的地方是从接收FillPipeWithDataAsync 到传递packageHandlingScheduler.HandlePackage 中间的逻辑有点绕,没太看懂

x0gundam1133 commented 4 months ago

再往下定位到了PipeConnectionBase 2024-07-03 14:18:14,761 [.NET TP Worker] INFO IConnection - supersocket recv2 size:33 time:51494.7612592 2024-07-03 14:18:14,761 [.NET TP Worker] INFO IConnection - supersocket recv1 time:51494.7614657 2024-07-03 14:18:15,416 [.NET TP Worker] INFO IConnection - RunAsync2 time:51495.4162788 2024-07-03 14:18:15,416 [.NET TP Worker] INFO SuperSocketService - HandlePackage time:51495.4164495

            while (!_cts.IsCancellationRequested)
            {
                Logger.LogInformation($"RunAsync1 time:{DateTime.Now.TimeOfDay.TotalSeconds}");
                var package = await packagePipe.ReadAsync().ConfigureAwait(false);
                Logger.LogInformation($"RunAsync2 time:{DateTime.Now.TimeOfDay.TotalSeconds}");
                if (package == null)
                {
                    yield break;
                }

                yield return package;
            }
x0gundam1133 commented 4 months ago

继续定位到PipeReader 与 PipeWriter的数据交互了 2024-07-03 15:02:11,789 [.NET TP Worker] INFO IConnection - supersocket recv size:33 time:54131.7891618 2024-07-03 15:02:11,789 [.NET TP Worker] INFO IConnection - writer FlushAsync time:54131.7893135 2024-07-03 15:02:12,794 [.NET TP Worker] INFO IConnection - reader ReadAsync time:54132.7949019 2024-07-03 15:02:12,795 [.NET TP Worker] INFO IConnection - Read time:54132.7950985 2024-07-03 15:02:12,795 [.NET TP Worker] INFO SuperSocketService - HandlePackage time:54132.7953468 2024-07-03 15:02:12,795 [.NET TP Worker] INFO IConnection - Write2 time:54132.7955658 2024-07-03 15:02:12,795 [.NET TP Worker] INFO IConnection - ReadPipe1 time:54132.7955976 2024-07-03 15:02:12,797 [.NET TP Worker] INFO SuperSocketService - LogicRecv msgid: 15008 time :54132.7977909

在接口FillPipeAsync中

                // Make the data available to the PipeReader
                var result = await writer.FlushAsync().ConfigureAwait(false);
                Logger.LogInformation($"writer FlushAsync time:{DateTime.Now.TimeOfDay.TotalSeconds}");

在接口ReadPipeAsync中

                    Logger.LogInformation($"ReadPipe1 time:{DateTime.Now.TimeOfDay.TotalSeconds}");
                    result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
                    Logger.LogInformation($"reader ReadAsync time:{DateTime.Now.TimeOfDay.TotalSeconds}");

这应该是微软的问题呢还是系统层面的问题呢?怎么解决头疼···

x0gundam1133 commented 4 months ago

修改了一下log输出 看得更清楚 2024-07-03 18:06:12,655 [.NET TP Worker] INFO IConnection - FillPipeWithDataAsync size:33 time:65172.6552277 2024-07-03 18:06:12,655 [.NET TP Worker] INFO IConnection - Input.writer FlushAsync time:65172.6553468 2024-07-03 18:06:13,680 [.NET TP Worker] INFO IConnection - Input.reader ReadAsync time:65173.6800612 2024-07-03 18:06:13,680 [.NET TP Worker] INFO SuperSocketService - HandlePackage time:65173.6801506

x0gundam1133 commented 4 months ago

从客户端发送协议,Gate接收同样会出现这个问题 , 与上层逻辑应该没有关系

kerryjiang commented 4 months ago

Could you paste your code here, especially the piece about these logs? 2024-07-03 18:06:12,655 [.NET TP Worker] INFO IConnection - Input.writer FlushAsync time:65172.6553468 2024-07-03 18:06:13,680 [.NET TP Worker] INFO IConnection - Input.reader ReadAsync time:65173.6800612

Looks like the logs above were not generated by the piece of code you pasted

Logger.LogInformation($"ReadPipe1 time:{DateTime.Now.TimeOfDay.TotalSeconds}");
result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
Logger.LogInformation($"reader ReadAsync time:{DateTime.Now.TimeOfDay.TotalSeconds}");
x0gundam1133 commented 4 months ago

in class class PipeConnection.cs

        internal virtual async Task FillPipeAsync(PipeWriter writer, ISupplyController supplyController, CancellationToken cancellationToken)
        {
            var options = Options;

            if (supplyController != null)
            {
                cancellationToken.Register(() =>
                {
                    supplyController.SupplyEnd();
                });
            }

            while (!cancellationToken.IsCancellationRequested)
            {
                try
                {
                    if (supplyController != null)
                    {
                        await supplyController.SupplyRequired().ConfigureAwait(false);

                        if (cancellationToken.IsCancellationRequested)
                            break;
                    }

                    var bufferSize = options.ReceiveBufferSize;
                    var maxPackageLength = options.MaxPackageLength;

                    if (bufferSize <= 0)
                        bufferSize = 1024 * 4; //4k

                    var memory = writer.GetMemory(bufferSize);

                    var bytesRead = await FillPipeWithDataAsync(memory, cancellationToken).ConfigureAwait(false);
                    Logger.LogInformation($"FillPipeWithDataAsync size:{bytesRead} time:{DateTime.Now.TimeOfDay.TotalSeconds}");

                    if (bytesRead == 0)
                    {
                        if (!CloseReason.HasValue)
                            CloseReason = Connection.CloseReason.RemoteClosing;

                        break;
                    }

                    UpdateLastActiveTime();

                    // Tell the PipeWriter how much was read
                    writer.Advance(bytesRead);
                }
                catch (Exception e)
                {
                    if (!IsIgnorableException(e))
                    {
                        if (!(e is OperationCanceledException))
                            OnError("Exception happened in ReceiveAsync", e);

                        if (!CloseReason.HasValue)
                        {
                            CloseReason = cancellationToken.IsCancellationRequested
                                ? Connection.CloseReason.LocalClosing : Connection.CloseReason.SocketError;
                        }
                    }
                    else if (!CloseReason.HasValue)
                    {
                        CloseReason = Connection.CloseReason.RemoteClosing;
                    }

                    break;
                }

                // Make the data available to the PipeReader
                var result = await writer.FlushAsync().ConfigureAwait(false);                
                Logger.LogInformation($"Input.writer FlushAsync time:{DateTime.Now.TimeOfDay.TotalSeconds}");

                if (result.IsCompleted)
                {
                    break;
                }
            }

            // Signal to the reader that we're done writing
            await writer.CompleteAsync().ConfigureAwait(false);
            // And don't allow writing data to outgoing pipeline
            await Output.Writer.CompleteAsync().ConfigureAwait(false);
        }

in PipeConnectionBase.cs

protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<TPackageInfo> packagePipe, CancellationToken cancellationToken)
{
    var pipelineFilter = _pipelineFilter as IPipelineFilter<TPackageInfo>;

    while (!cancellationToken.IsCancellationRequested)
    {
        ReadResult result;

        try
        {
            result = await reader.ReadAsync().ConfigureAwait(false);
            Logger.LogInformation($"Input.reader ReadAsync time:{DateTime.Now.TimeOfDay.TotalSeconds}");
            OnInputPipeRead(result);
        }
        catch (Exception e)
        {
            if (!IsIgnorableException(e) && !(e is OperationCanceledException))
                OnError("Failed to read from the pipe", e);

            break;
        }

        var buffer = result.Buffer;

        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        if (result.IsCanceled)
        {
            break;
        }

        var completed = result.IsCompleted;

        try
        {
            if (buffer.Length > 0)
            {
                var needReadMore = ReaderBuffer(ref buffer, pipelineFilter, packagePipe, out consumed, out examined, out var currentPipelineFilter);

                if (currentPipelineFilter != null)
                {
                    pipelineFilter = currentPipelineFilter;
                }

                if (!needReadMore)
                {
                    completed = true;
                    break;
                }
            }

            if (completed)
            {
                break;
            }
        }
        catch (Exception e)
        {
            OnError("Protocol error", e);
            // close the connection if get a protocol error
            CloseReason = Connection.CloseReason.ProtocolError;
            Close();
            break;
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    reader.Complete();
    WriteEOFPackage();
}
x0gundam1133 commented 4 months ago

in SuperSocketService.cs

        private async ValueTask HandleSession(AppSession session, IConnection connection)
        {
            if (!await InitializeSession(session, connection))
                return;

            try
            {
                var pipelineFilter = _pipelineFilterFactory.Create(connection);
                pipelineFilter.Context = CreatePipelineContext(session);

                var packageStream = connection.RunAsync<TReceivePackageInfo>(pipelineFilter);

                await FireSessionConnectedEvent(session);

                var packageHandlingScheduler = _packageHandlingScheduler;

#if NET6_0_OR_GREATER
                using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(connection.ConnectionToken);
#endif

                await foreach (var p in packageStream)
                {
                    if (_packageHandlingContextAccessor != null)
                    {
                        _packageHandlingContextAccessor.PackageHandlingContext = new PackageHandlingContext<IAppSession, TReceivePackageInfo>(session, p);
                    }

#if !NET6_0_OR_GREATER
                    using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(connection.ConnectionToken);
#endif
                    Logger.LogInformation($"HandlePackage time:{DateTime.Now.TimeOfDay.TotalSeconds}");
                    await packageHandlingScheduler.HandlePackage(session, p, cancellationTokenSource.Token);

#if NET6_0_OR_GREATER
                    cancellationTokenSource.TryReset();
#endif
                }
            }
            catch (Exception e)
            {
                _logger.LogError(e, $"Failed to handle the session {session.SessionID}.");
            }
        }
x0gundam1133 commented 4 months ago

最新测试结果: intel 14700f 14490f, amd 1700x 5900x intel CPU物理机 不卡 intel CPU虚拟机 不卡 AMD CPU物理机 不卡 AMD CPU虚拟机 必卡

x0gundam1133 commented 4 months ago

找到问题所在了,排查了项目中所有的第三方库 结果移除掉FluentNHibernate和相关数据库逻辑就不延迟了 这个也只是mysql的orm库啊,不知道为啥冲突了

kerryjiang commented 4 months ago

thread leak issue caused by this lib?

x0gundam1133 commented 4 months ago

更换了dapper第三方库,延迟问题一样出来了 数据库模块仅使用了一个线程和一个链接,启动创建关闭退出,通过双交换队列来处理数据,运行中应该不存在线程泄露 我继续查,不过多半应该还是线程资源占用和上下文切换的问题

x0gundam1133 commented 4 months ago

解决了 查出来是旧框架的时候使用的一个系统线程管理接口问题 ThreadPool.QueueUserWorkItem 估计底层有冲突 改为async Task 来处理 就好了

x0gundam1133 commented 4 months ago

解决了 查出来是旧框架的时候使用的一个系统线程管理接口问题 ThreadPool.QueueUserWorkItem 估计底层有冲突 改为async Task 来处理 就好了

kerryjiang commented 4 months ago

Sounds like thread leak.