safakgur / socket-awaitable

An easy-to-use library that provides socket utilities for taking advantage of async/await operators.
MIT License
41 stars 15 forks source link

seems got connections/sockets/objects release problem #2

Open imkow opened 10 years ago

imkow commented 10 years ago

After testing with many connections, the server can not accept connections as many as it runs first time. the amount of subsequent connections depends on the total reusable SocketAwaitable minus the prior connected clients. The shutdown SocketAwaitable objects were put back to the pool.

Any ideas?

giulianob commented 10 years ago

Do you have some test code? It has been working for me. On Jun 19, 2014 10:43 AM, "imkow" notifications@github.com wrote:

After testing with many connections, the server can not accept connections as many as it runs first time. the amount of subsequent connections depends on the total reusable SocketAwaitable minus the prior connected clients. The shutdown SocketAwaitable objects were put back to the pool.

Any ideas?

— Reply to this email directly or view it on GitHub https://github.com/safakgur/Dawn.SocketAwaitable/issues/2.

imkow commented 10 years ago

I'm using 'Next' branch

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Runtime;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Dawn.Net.Sockets;

namespace SocketAwaitableTest
{
    class Program
    {
        public static GCLatencyMode DefaultLatencyMode = GCSettings.LatencyMode;
        static void Main(string[] args)
        {

            //             Process.GetCurrentProcess().PriorityClass = ProcessPriorityClass.AboveNormal;
            var gcServer = GCSettings.IsServerGC;
            int minWorkerThreads = 0, minIOCPThreads = 0;
            ThreadPool.GetMinThreads(out minWorkerThreads, out minIOCPThreads);
            ThreadPool.SetMinThreads(minWorkerThreads * 3, minIOCPThreads * 8);
            var server = new Server();
            server.OnDisconnected += async (server1, socket) =>
            {
                Console.WriteLine("disconnect {0}");

            };

            server.Init();
            server.ListenAsync(59876);

            Task.Run(
                async () =>
                {
                    while (true)
                    {
                        await Task.Delay(TimeSpan.FromSeconds(30));
                        Console.WriteLine("\nConnections: {0}", server.Connections);

                        if (server.Connections < 10)
                            GC.Collect();

                        if (server.Connections < 100)
                            GC.Collect(1);

                    }

                });
            Console.Write("Press Enter to Exit...");
            Console.ReadLine();
        }
    }

    class Server
    {

        private readonly SocketAwaitablePool _pool = new SocketAwaitablePool(10000);

        private readonly BlockingBufferManager _bufferManager = new BlockingBufferManager(8192, 10000);
        private Socket _listenSocket;
        public event Action<Server, Socket> OnDisconnected;
        public int Connections;

        public void Init()
        {

            foreach (var socketAwaitable in _pool)
            {
                socketAwaitable.DisconnectReuseSocket = true;
            }
        }

        public async Task ListenAsync(int port)
        {

            //             var a = pool.Take();
            _listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            _listenSocket.ReceiveBufferSize = 819200;
            _listenSocket.SendBufferSize = 819200;

            _listenSocket.Bind(new IPEndPoint(IPAddress.Any, port));
            _listenSocket.Listen(500);

            Console.WriteLine("\r\n\r\n*************************\r\n** Server is listening @{0}**\r\n*************************\r\n", port);
            await AcceptAsync(_listenSocket);

        }

        private async Task AcceptAsync(Socket s)
        {

            SocketAwaitable a = null;// = this.pool.Take();
            a = _pool.Take();

            try
            {

                while (true)
                {

                    Console.WriteLine("Accepting Connection {0} ...");

                    var sa = await s.AcceptAsync(a);

                    if (sa == SocketError.Success)
                    {
                       Interlocked.Increment(ref Connections);
                        //                 a.AcceptSocket.ReceiveBufferSize = 819200;
                        //                 a.AcceptSocket.SendBufferSize = 819200;
                        //                 a.AcceptSocket.NoDelay = true;

                        this.OnAccepted(a.AcceptSocket); // No need to call `a.AcceptSocket = null` every time.
                    }
                    else
                    {

                        if (a.AcceptSocket != null)
                        {
                            //                             s.Disconnect(true);
                            a.AcceptSocket.Shutdown(SocketShutdown.Both);
                            a.AcceptSocket.Close(1);
                            //                             RaiseDisconnected(a.AcceptSocket);
                        }

                        if (_pool.Count < 10000)
                        {
                            a.Clear(); // Clear the awaitable arguments.
                            this._pool.Add(a); // Add the `SocketAwaitable` back to the pool.
                        }
                        else
                        {
                            if (!a.IsDisposed)
                                a.Dispose();
                        }

                    }

                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
            finally
            {
                //                 s.DisonnectAsync(a);
                if (_pool.Count < 10000)
                {
                    a.Clear(); // Clear the awaitable arguments.
                    this._pool.Add(a); // Add the `SocketAwaitable` back to the pool.
                }
                else
                {
                    if (!a.IsDisposed)
                        a.Dispose();
                }

                Interlocked.Decrement(ref Connections);

            }

            await AcceptAsync(s);

        }

        private async Task OnAccepted(Socket s)
        {
            await EchoAsync(s);
        }

        private async Task EchoAsync(Socket s)
        {
            var a = this._pool.Take(); // Take a `SocketAwaitable` from the pool.
            var b = this._bufferManager.GetBuffer(); // Take a buffer from the manager.
            a.Buffer = b; // Buffer is an ArraySegment<byte>.
            try
            {
                //                 GCSettings.LatencyMode = GCLatencyMode.LowLatency;

                while (true)
                {
                    var sa = await s.ReceiveAsync(a);

                    if (sa == SocketError.Success && a.Transferred.Count > 0)
                    {
                        int remainCount = 0;
                        a.Buffer = a.Transferred; // Set the buffer to send what is received.
                        while (true)
                        {
                            if (await s.SendAsync(a) != SocketError.Success)
                            {
                                if (a.AcceptSocket != null)
                                {
                                    a.AcceptSocket.Shutdown(SocketShutdown.Both);
                                    a.AcceptSocket.Close();
                                }

                                //                                 RaiseDisconnected(a.AcceptSocket);
                                return; // Return if can't send.
                            }

                            if (a.Buffer.Count == a.Transferred.Count)
                            {
                                if (a.AcceptSocket != null)
                                {
                                    a.AcceptSocket.Shutdown(SocketShutdown.Both);
                                    a.AcceptSocket.Close();
                                }
                                break; // Break if all the data is sent.
                            }

                            // Set the buffer to send the remaining data.

                            remainCount = a.Buffer.Count - a.Transferred.Count;
                            if (remainCount > 0)
                            {

                                a.Buffer = new ArraySegment<byte>(
                                    a.Buffer.Array,
                                    a.Buffer.Offset + a.Transferred.Count,
                                    remainCount);

                                // Set the original buffer back to continue receiving.

                            }

                            a.Buffer = b;
                        }

                    }
                    else
                    {
                        if (a.AcceptSocket != null)
                        {
                            a.AcceptSocket.Shutdown(SocketShutdown.Both);
                            a.AcceptSocket.Close();

                            // 
                            //                             RaiseDisconnected(a.AcceptSocket);
                        }

                        break;
                        // 
                    }

                }

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
            finally
            {
                if (a.AcceptSocket != null)
                {
                    a.AcceptSocket.Shutdown(SocketShutdown.Both);
                    a.AcceptSocket.Close();

                    // 
                }

                RaiseDisconnected(s);

                if (_pool.Count < 10000)
                {  
                    a.Clear(); // Clear the awaitable arguments.
                    this._pool.Add(a); // Add the `SocketAwaitable` back to the pool.
                }
                else
                {
                    if(!a.IsDisposed)
                        a.Dispose();
                }

                this._bufferManager.ReleaseBuffer(b); // Release the buffer. 
                GCSettings.LatencyMode = Program.DefaultLatencyMode;
                Interlocked.Decrement(ref Connections);

            }
        }

        void RaiseDisconnected(Socket s)
        {
            Action<Server, Socket> handler = OnDisconnected;
            if (handler != null)
            {
                handler(this, s);
            }

        }

    }
}