akkadotnet / akka.net

Canonical actor model implementation for .NET with local + distributed actors in C# and F#.
http://getakka.net
Other
4.73k stars 1.04k forks source link

[Akka.IO] TcpListener connection queue problem #5988

Open Arkatufus opened 2 years ago

Arkatufus commented 2 years ago

Version Information Version of Akka.NET? dev branch Which Akka.NET Modules? Akka.IO

Describe the bug The way TcpListener is accepting new connection is buggy. JVM managed TCP sockets through the nio channel selector pattern while we're doing it closer to bare metal by requesting actual sockets for each incoming connection.

The way it is done in JVM is to use an accepting limit that is being decremented each time a new channel is created, in our case we used SocketAsyncEventArgs as an async non-blocking way of accepting connections (https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka/IO/TcpListener.cs#L68-L79).

  1. In JVM all channels are passed to the TcpConnection class and have its lifetime managed by it, in ours, SAEA are stored inside an array and they are supposed to be disposed on TcpListener PostStop. But they're not. This array is being overwritten when a ResumeAccepting command is received, possibly creating a memory leak (https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka/IO/TcpListener.cs#L100-L103)
  2. Instead of being limited by the accept number, we're reusing SAEA objects (which is a really bad idea) and kept accepting new connections when new socket connections were made (https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka/IO/TcpListener.cs#L90-L97)

We can probably improve this this by ditching SAEA and use _socket.AcceptAsync(null, cancellationToken) instead. The SAEA would be managed by the method and all pending accept command can be cancelled using the token.

Aaronontheweb commented 2 years ago

We can probably improve this this by ditching SAEA and use _socket.AcceptAsync(null, cancellationToken) instead. The SAEA would be managed by the method and all pending accept command can be cancelled using the token.

Is that .NET 6 only?

Arkatufus commented 2 years ago

Not sure, I can see the code in .NET 5.0, would have to check the .NET Standard API to see if its there

Arkatufus commented 2 years ago

Nope, not available in .NET Standard 2.0. Introduced in .NET 5.0.

Aaronontheweb commented 2 years ago

We can do an #if #endif for that in the .NET 6 version in Akka.NET v1.5 then

F0b0s commented 1 year ago

I'm working with this issue and find that TcpListener and its PullMode params: _acceptLimit = bind.PullMode ? 0 : _tcp.Settings.BatchAcceptLimit; probably doesn't work as expected. If it mean that TcpListener should accept new connection only on Pull, then it doesn't work in this way. It will accept any new connection once it has accepted connection:

                case SocketEvent evt:
                    var saea = evt.Args;
                    if (saea.SocketError == SocketError.Success)
                        Context.ActorOf(Props.Create<TcpIncomingConnection>(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local));

                    saea.AcceptSocket = null;
                    if (!_socket.AcceptAsync(saea))
                        Self.Tell(new SocketEvent(saea));
                    return true;

_socket.AcceptAsync will be called automatically after accepting current connection.

Do we need to change it to real pull mode?

F0b0s commented 1 year ago

Second question about io.tcp.batch-accept-limit setting. It manages size of array of SocketAsyncEventArgs in TcpListener and documentations says:

The maximum number of connection that are accepted in one go,
higher numbers decrease latency, lower numbers increase fairness on
the worker-dispatcher
batch-accept-limit = 10

If we take a look at MSDN example of SocketAsyncEventArgs usage at this page we can see that it use only one object for socket.AcceptAsync() because its usage is very short and all io operations is performed on connected socket:

SocketAsyncEventArgs readEventArgs = m_readWritePool.Pop();
readEventArgs.UserToken = e.AcceptSocket;

// As soon as the client is connected, post a receive to the connection
bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(readEventArgs);

Moreover, we have this code in TcpListener

case SocketEvent evt:
    var saea = evt.Args;
    if (saea.SocketError == SocketError.Success)
        Context.ActorOf(Props.Create<TcpIncomingConnection>(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local));

    saea.AcceptSocket = null;
    if (!_socket.AcceptAsync(saea))
        Self.Tell(new SocketEvent(saea));
    return true;

If usage of SAEA is very short in fact we can use only one element of array SAEA. My proposal is to remove array of SAEA and mark io.tcp.batch-accept-limit setting as obsolete. Then create single SAEA object and hardly use it inside TcpListener.

Arkatufus commented 1 year ago

We can't reduce the SAEA to a single SAEA. The point of using asynchronous event model is to make sure that the server socket can accept multiple incoming connection asynchronously without blocking.

This is described in the MS documentation:

In the new System.Net.Sockets.Socket class enhancements, asynchronous socket operations are described by reusable SocketAsyncEventArgs objects allocated and maintained by the application. High-performance socket applications know best the amount of overlapped socket operations that must be sustained. The application can create as many of the SocketAsyncEventArgs objects that it needs. For example, if a server application needs to have 15 socket accept operations outstanding at all times to support incoming client connection rates, it can allocate 15 reusable SocketAsyncEventArgs objects for that purpose.

Arkatufus commented 1 year ago

I'm working with this issue and find that TcpListener and its PullMode params: _acceptLimit = bind.PullMode ? 0 : _tcp.Settings.BatchAcceptLimit; probably doesn't work as expected. If it mean that TcpListener should accept new connection only on Pull, then it doesn't work in this way. It will accept any new connection once it has accepted connection:

                case SocketEvent evt:
                    var saea = evt.Args;
                    if (saea.SocketError == SocketError.Success)
                        Context.ActorOf(Props.Create<TcpIncomingConnection>(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local));

                    saea.AcceptSocket = null;
                    if (!_socket.AcceptAsync(saea))
                        Self.Tell(new SocketEvent(saea));
                    return true;

_socket.AcceptAsync will be called automatically after accepting current connection.

Do we need to change it to real pull mode?

Wouldn't AcceptAsync() be actually needed to start the socket? It just returns a bool to signal if the connection has been accepted, if false then then connection is still being negotiated (handshake) and we can't use the socket yet.

F0b0s commented 1 year ago

We can't reduce the SAEA to a single SAEA. The point of using asynchronous event model is to make sure that the server socket can accept multiple incoming connection asynchronously without blocking.

Ok, i was confused by MS example in the same post.

Wouldn't AcceptAsync() be actually needed to start the socket? It just returns a bool to signal if the connection has been accepted, if false then then connection is still being negotiated (handshake) and we can't use the socket yet.

Yes, but after you call AcceptAsync(saea) you ready to accept new connection and it would be accepted and processed because you have Completed handler. It doesn't wait for next Pull. Take a look:

                case SocketEvent evt:
                    var saea = evt.Args;
                    if (saea.SocketError == SocketError.Success)
                        Context.ActorOf(Props.Create<TcpIncomingConnection>(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local));

                    saea.AcceptSocket = null;
                    if (!_socket.AcceptAsync(saea))
                        Self.Tell(new SocketEvent(saea));

SocketEvent here means we already got a connection, but we call AcceptAsync(saea) and Self.Tell(new SocketEvent(saea)); in the same handler, we don't wait for Pull. Do we need to change it to real Pull mode?

F0b0s commented 1 year ago

@Arkatufus

Arkatufus commented 1 year ago

You're right that we're not implementing the tcp listener pull mode correctly, in fact, it is an undocumented "feature" that we do not fully support. What we're doing right now is to have the TCP port accept N number of connection attempt all the time, where N equals to akka.io.tcp.batch-accept-limit.

If you're going to implement this feature inside Akka, there are several things to consider.

Using a single saea only works if the TcpListener is listening with pull mode turned on. Removing the other saea will change the TcpListener behavior in that only the first connection attempt will be honored, any other concurrent connection attempt will be refused. This is a very dangerous behavior change as it have a very high chance of breaking existing user implementation in the wild.

In the Scala version, the TCP port will only accept N numbers of connection and will stop accepting when N number of connections have been made. They decrement _acceptLimit every time a connection has been made and stops accepting new connections when it reaches 0, the user has to issue a ResumeAccepting tell to the TcpListener to have it start accepting new connections again. With pull mode turned on, they simply set _acceptLimit to 0. We can not change our implementation behavior to match the scala behavior because that have the potential to break existing user implementation in the wild.

You would need to keep these in mind when you try to implement the pull mode in Akka.IO.Tcp, which isn't trivial. We have to honor backward compatibility because we could not afford to break existing implementations.

F0b0s commented 10 months ago

Ok, we're not implemented pull mode correctly, so, in my opinion, if we are not going to change this logic, Tcp.ResumeAccepting message is useless(because we always accept new connections):

                case Tcp.ResumeAccepting resumeAccepting:
                    _acceptLimit = resumeAccepting.BatchSize;
                    // TODO: this is dangerous, previous async args are not disposed and there's no guarantee that they're not still receiving data
                    _saeas = Accept(_acceptLimit).ToArray();
                    return true;

we can just rewrite it like this:

                case Tcp.ResumeAccepting resumeAccepting:
                    _acceptLimit = resumeAccepting.BatchSize;
                    return true;

and it should work. Is it what are we need to do in this issue?

Aaronontheweb commented 10 months ago

@Arkatufus @F0b0s maybe we should just remove the functionality altogether?

F0b0s commented 10 months ago

@Aaronontheweb what functionality do you mean?