chronoxor / NetCoreServer

Ultra fast and low latency asynchronous socket server & client C# .NET Core library with support TCP, SSL, UDP, HTTP, HTTPS, WebSocket protocols and 10K connections problem solution
https://chronoxor.github.io/NetCoreServer
MIT License
2.76k stars 575 forks source link

Proxy Behaviour - High amount of throughput causes buffer to not be emptied correctly #248

Open b0ykoe opened 1 year ago

b0ykoe commented 1 year ago

I've established a "Proxy" behaviour with NetCoreServer, code below. While this works great I've found myself in a increasing memory situation. Observing the binary with dotMemory did lead me towards NetCoreServer.

I'm using .NET 7 and the latest NetCoreServer (7.0.0).

Stacktrace from dotMemory, file attached.

System.Byte[]
  Objects : n/a
  Bytes   : 3306768968

 100%  Reserve • 3,08 GB / 3,08 GB • NetCoreServer.Buffer.Reserve(Int64)
  >99,9%  Append • 3,08 GB / - • NetCoreServer.Buffer.Append(ReadOnlySpan<T>)
    >99,9%  SendAsync • 3,08 GB / - • NetCoreServer.TcpSession.SendAsync(ReadOnlySpan<T>)
      >99,9%  SendAsync • 3,08 GB / - • NetCoreServer.TcpSession.SendAsync(Byte[], Int64, Int64)
        >99,9%  OnReceived • 3,08 GB / - • SilkroadSecurityFake_impl.FakeRemoteClient.OnReceived(Byte[], Int64, Int64)
          >99,9%  ProcessReceive • 3,08 GB / - • NetCoreServer.TcpClient.ProcessReceive(SocketAsyncEventArgs)
             80,2%  OnAsyncCompleted • 2,47 GB / - • NetCoreServer.TcpClient.OnAsyncCompleted(Object, SocketAsyncEventArgs)
               80,2%  RunInternal • 2,47 GB / - • System.Threading.ExecutionContext.RunInternal(ExecutionContext, ContextCallback, Object)
                 80,2%  <.cctor>b__176_0 • 2,47 GB / - • System.Net.Sockets.SocketAsyncEventArgs+<>c.<.cctor>b__176_0(UInt32, UInt32, NativeOverlapped)
                   80,2%  Invoke • 2,47 GB / - • System.Threading.PortableThreadPool+IOCompletionPoller+Callback.Invoke(PortableThreadPool+IOCompletionPoller+Event)
                     80,2%  Execute • 2,47 GB / - • System.Threading.ThreadPoolTypedWorkItemQueue<T, TCallback>.Execute()
                       80,2%  Dispatch • 2,47 GB / - • System.Threading.ThreadPoolWorkQueue.Dispatch()
                         80,2%  WorkerThreadStart • 2,47 GB / - • System.Threading.PortableThreadPool+WorkerThread.WorkerThreadStart()
                          ►  42,0%  [AllThreadsRoot] • 1,29 GB / - • [AllThreadsRoot]
                           38,2%  RunWorker • 1,18 GB / - • System.Threading.Thread+StartHelper.RunWorker()
                             38,2%  Run • 1,18 GB / - • System.Threading.Thread+StartHelper.Run()
                               38,2%  StartCallback • 1,18 GB / - • System.Threading.Thread.StartCallback()
                                ►  38,2%  [AllThreadsRoot] • 1,18 GB / - • [AllThreadsRoot]

#stacktrace

dotMemory file: dotMemory - SilkroadSecurityFake_impl.zip

The dotMemory file contains a test where the Packet object and parsing was still enabled. Sadly the memory leak behaviour does not change while disabling it.

Testcase: There about 400 clients standing moving inside each other. With a request to the server and then a response to all 400 clients that this one client moved. The Proxy receives all 400 responses and sends them to the clients. Basically 160_000 Packets every tick. I couldn't notice any strange behaviour while the memory did raise.

Code: RemoteClient

public class FakeRemoteClient : TcpClient
{
    private FakeSession FakeSession { get; }
    // private readonly Memory<byte> _memory;

    public FakeRemoteClient(FakeSession fakeSession, string remoteIp, int remotePort) : base(remoteIp, remotePort)
    {
        FakeSession = fakeSession;
        OptionKeepAlive = true;
        OptionNoDelay = true;
        // _memory = new Memory<byte>(new byte[ushort.MaxValue]);
    }

    protected override void OnConnected()
    {
        Console.WriteLine($"Server connected a new session with Id {Id}");
    }

    protected override void OnDisconnected()
    {
        Console.WriteLine($"Server disconnected a session with Id {Id}");
        FakeSession.Disconnect();
    }

    // Receive from Server
    protected override void OnReceived(byte[] buffer, long offset, long size)
    {
        // var packet = PacketPool.Rent()
        //     .SetInitialData(buffer, size);

        // Console.WriteLine(packet.ToString());
        // Console.WriteLine($"FakeRemoteClient bytes: {BitConverter.ToString(buffer, (int)offset, (int)size)}");

        // buffer.CopyTo(_memory);
        // FakeSession.SendAsync(_memory.ToArray(), 0, size);
        // _memory.Span.Clear();

        FakeSession.SendAsync(buffer, offset, size);
        // packet.Return();
    }

    protected override void OnError(SocketError error)
    {
        Console.WriteLine($"Chat TCP client caught an error with code {error}");
        FakeSession.Disconnect();
    }
}

Server

public class FakeServer : TcpServer
{
    private readonly string _remoteIp;
    private readonly int _remotePort;

    public FakeServer(string localip, int localport, string remoteip, int remoteport) : base(localip, localport)
    {
        _remoteIp = remoteip;
        _remotePort = remoteport;
    }

    protected override TcpSession CreateSession() { return new FakeSession(this, _remoteIp, _remotePort); }

    protected override void OnError(SocketError error)
    {
        Console.WriteLine($"Chat TCP server caught an error with code {error}");
    }
}

Session

public class FakeSession : TcpSession
{
    private FakeRemoteClient FakeRemoteClient { get; }
    // private readonly Memory<byte> _memory;
    public FakeSession(TcpServer server, string remoteIp, int remotePort) : base(server)
    {
        FakeRemoteClient = new FakeRemoteClient(this, remoteIp, remotePort);
        FakeRemoteClient.ConnectAsync();
        // _memory = new Memory<byte>(new byte[ushort.MaxValue]);
    }

    protected override void OnConnected()
    {
        Console.WriteLine($"Client with Id {Id} connected!");
    }

    protected override void OnDisconnected()
    {
        Console.WriteLine($"Client with Id {Id} disconnected!");
        FakeRemoteClient.Disconnect();
    }

    // Receive from Client
    protected override void OnReceived(byte[] buffer, long offset, long size)
    {
        // var packet = PacketPool.Rent()
        //     .SetInitialData(buffer, size);

        // Console.WriteLine(packet.ToString());
        // Console.WriteLine($"FakeSession bytes: {BitConverter.ToString(buffer, (int)offset, (int)size)}");

        // buffer.CopyTo(_memory);
        // FakeRemoteClient.SendAsync(_memory.ToArray(), 0, size);
        // _memory.Span.Clear();

        FakeRemoteClient.SendAsync(buffer, offset, size);
        // packet.Return();
    }

    protected override void OnError(SocketError error)
    {
        Console.WriteLine($"Chat TCP session caught an error with code {error}");
        FakeRemoteClient.Disconnect();
    }
}

Program

public static void Main(string[] args)
    {
        var gw1 = new FakeServer("10.10.0.2", 5001, "10.3.10.140", 15779);
        var as1 = new FakeServer("10.10.0.2", 5002, "10.3.10.140", 16004);
        var as2 = new FakeServer("10.10.0.2", 5003, "10.3.10.141", 16040);

        gw1.OptionReceiveBufferSize = 2048;
        gw1.OptionSendBufferSize = 2048;
        gw1.OptionNoDelay = true;

        as1.OptionReceiveBufferSize = 2048;
        as1.OptionSendBufferSize = 2048;
        as1.OptionNoDelay = true;

        as2.OptionReceiveBufferSize = 2048;
        as2.OptionSendBufferSize = 2048;
        as2.OptionNoDelay = true;

        // Start the server
        Console.Write("Server starting...");
        gw1.Start();
        as1.Start();
        as2.Start();
        Console.WriteLine("Done!");

        Console.WriteLine("Press Enter to stop the server or '!' to restart the server...");

        // Perform text input
        for (;;)
        {
            string line = Console.ReadLine();
            if (string.IsNullOrEmpty(line))
                break;

            // Restart the server
            if (line == "!")
            {
                Console.Write("Server restarting...");
                gw1.Restart();
                as1.Restart();
                as2.Restart();
                Console.WriteLine("Done!");
                continue;
            }
        }

        // Stop the server
        Console.Write("Server stopping...");
        gw1.Stop();
        as1.Stop();
        as2.Stop();
        Console.WriteLine("Done!");
    }
b0ykoe commented 1 year ago

Some investigation further:

the issue seems to start here and leading into public virtual bool SendAsync(ReadOnlySpan<byte> buffer) https://github.com/chronoxor/NetCoreServer/blob/master/source/NetCoreServer/TcpSession.cs#L314

this leads into the Buffer#Append function and later on into the Reserve function https://github.com/chronoxor/NetCoreServer/blob/master/source/NetCoreServer/Buffer.cs#L194 https://github.com/chronoxor/NetCoreServer/blob/master/source/NetCoreServer/Buffer.cs#L194

and my hot take is that this one will only ever increase the size but never decrease it anymore. https://github.com/chronoxor/NetCoreServer/blob/master/source/NetCoreServer/Buffer.cs#L122