awr / KafkaClient

Native C# client for Kafka
Apache License 2.0
5 stars 2 forks source link

Streams #22

Closed awr closed 7 years ago

awr commented 7 years ago

Really the successor to #15

ligu commented 7 years ago

Hi, I've checked the changes. I think introducing the ReconnectingSocket is not a good idea. This ties SocketTransport and SslTransport, they cant be independent. Connecting/reconnecting process should be different in case of SSL, because the creation of the stream and setting up the SSL connection should be also part of it, like this:

` public async Task ConnectAsync(CancellationToken cancellationToken) { if (_disposeToken.IsCancellationRequested) throw new ObjectDisposedException(nameof(StreamTransport));

        if (_socket?.Connected ?? cancellationToken.IsCancellationRequested) return;

        await _connectSemaphore.LockAsync( async () =>
        {
            if (_socket?.Connected ?? cancellationToken.IsCancellationRequested) return;
            var socket = _socket ?? CreateSocket();
            await _configuration.ConnectionRetry.AttemptAsync(
                //action
                async (attempt, timer) =>
                {
                    if (!socket.Connected)
                    {
                        _log.Info(() => LogEvent.Create($"Connecting to {_endpoint}"));
                        _configuration.OnConnecting?.Invoke(_endpoint, attempt, timer.Elapsed);

                        var connectTask = socket.ConnectAsync(_endpoint.Ip.Address, _endpoint.Ip.Port);
                        if (await connectTask.IsCancelled(_disposeToken.Token).ConfigureAwait(false))
                        {
                            throw new ObjectDisposedException(
                                $"Object is disposing (TcpSocket for endpoint {_endpoint})");
                        }

                        await connectTask.ConfigureAwait(false);
                        if (!socket.Connected) return RetryAttempt<Socket>.Retry;

                        _log.Info(() => LogEvent.Create($"Connection established to {_endpoint}"));
                        _configuration.OnConnected?.Invoke(_endpoint, attempt, timer.Elapsed);
                    }
                    else
                    {
                        _log.Debug(() => LogEvent.Create($"Socket already connected to {_endpoint}"));
                    }

                    var networkStream = new NetworkStream(_socket, false);

                    if (_configuration.SslConfiguration == null)
                    {
                        _stream = networkStream;
                        return new RetryAttempt<object>(null);
                    }

                    var sslStream = new SslStream(
                        networkStream,
                        false,
                        _configuration.SslConfiguration.RemoteCertificateValidationCallback,
                        _configuration.SslConfiguration.LocalCertificateSelectionCallback,
                        _configuration.SslConfiguration.EncryptionPolicy ?? EncryptionPolicy.RequireEncryption
                    );

                    try
                    {
                        await sslStream.AuthenticateAsClientAsync(_endpoint.Host).ConfigureAwait(false);
                        _log.Info(() => LogEvent.Create($"Successful SSL connection, SslProtocol:{sslStream.SslProtocol}, KeyExchange:{sslStream.KeyExchangeAlgorithm}.{sslStream.KeyExchangeStrength}, Cipher:{sslStream.CipherAlgorithm}.{sslStream.CipherStrength}, Hash:{sslStream.HashAlgorithm}.{sslStream.HashStrength}, Authenticated:{sslStream.IsAuthenticated}, MutuallyAuthenticated:{sslStream.IsMutuallyAuthenticated}, Encrypted:{sslStream.IsEncrypted}, Signed:{sslStream.IsSigned}"));
                        _stream = sslStream;
                    }
                    catch (Exception ex)
                    {
                        _log.Warn(() => LogEvent.Create(ex, $"SSL connection failed: {ex.Message}"));
                        sslStream.Dispose();
                    }

                    return new RetryAttempt<object>(null);
                },
                //onRetry
                (attempt, retry) => _log.Warn(() => LogEvent.Create($"Failed connection to {_endpoint}: Will retry in {retry}")),
                //onFinal
                attempt =>
                {
                    _log.Warn(() => LogEvent.Create($"Failed connection to {_endpoint} on attempt {attempt}"));
                    throw new ConnectionException(_endpoint);
                },
                //onException
                (ex, attempt, retry) =>
                {
                    if (_disposeToken.IsCancellationRequested) throw new ObjectDisposedException(nameof(StreamTransport), ex);
                    _log.Warn(() => LogEvent.Create(ex, $"Failed connection to {_endpoint}: Will retry in {retry}"));

                    if (ex is ObjectDisposedException || ex is PlatformNotSupportedException)
                    {
                        DisposeSocket();
                        _log.Info(() => LogEvent.Create($"Creating new socket to {_endpoint}"));
                        socket = CreateSocket();
                    }
                },
                //onFinalException
                (ex, attempt) =>
                {
                    _log.Warn(() => LogEvent.Create(ex, $"Failed connection to {_endpoint} on attempt {attempt}"));
                    if (ex is SocketException || ex is PlatformNotSupportedException)
                    {
                        throw new ConnectionException(_endpoint, ex);
                    }
                },
                cancellationToken
            ).ConfigureAwait(false);

        }, cancellationToken).ConfigureAwait(false);

    }`
awr commented 7 years ago

I'm not sure I really understand -- can you write a test to expose the issue?

ligu commented 7 years ago

The issue is that the SSL setup is currently not part of the reconnection logic. So, if there is an issue during the SSL setup, we end up with an opened, but unusable TCP stream, and the reconnection logic doesn't kicks in, and the upcoming read/write will fail. (Keep in mind also, that SSL config contains 2 callbacks, what should be implemented by the user of this library, so we can't even control the full process of the SSL setup.)

My other reason is merely theoretical/architectural: I think it's nicer/cleaner solution to keep the 2 quite different implementations of ITransport separated.

awr commented 7 years ago

To close the loop on this one: I've opened #24 to track updating to fix the reconnection bug so this doesn't get lost.