dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.14k stars 2.04k forks source link

Non-blocking ClusterClient on startup #8162

Open AndreiShenets opened 2 years ago

AndreiShenets commented 2 years ago

Background

Imagine that you have a Web API that starts in Kubernetes. Part of its functionality involves Orleans and other can be used independently. When you start such Web API and Orleans Silo is not running then ClusterClient throws some "Cannot connect error" and terminates execution. The behavior can be improved if you register IClientConnectionRetryFilter which is not registered by default, doesn't have any public default implementation and doesn't have mentions in the documentation. The filter at least gives you ability to do retries but because of the internal implementation using of it leads to blocking of your service at start before Host with Silo is up.

As result our hypothetic Web API is continuously being restarted by Kubernetes either because of the startup failure or because of liveness prob.

Proposal

Also it is mostly not expected that a silo host is down at the moment of a client startup, to improve the scenario with the partial system degradation I would propose to improve the behavior.

ClusterClient should have at least an option to switch it to non-blocking startup mode. In this mode the client should continuously retry to connect to the host in background. That shouldn't prevent from starting other background services and an application themself.

Also it would be great to have a default implementation for IClientConnectionRetryFilter. For example with exponential retries.

I can help and contribute with PR but I need to understand first if my vision matches to the vision of package development team.

Workaround

The behavior can be improved like following but for me that way is a hack. Also the workaround can be broken with further changes inside of Orleans implementation when I would like to have some stable build-in solution.

public static class OrleansWebApplicationBuilderExtensions
{
    public static void ConfigureLocalOrleansClient(this WebApplicationBuilder builder)
    {
        builder.Services.AddOrleansClient(
            clientBuilder =>
            {
...   
            }
        );

        builder.Services.PreventStartupBlockingByOrleansClient();
    }

    public static void PreventStartupBlockingByOrleansClient(this IServiceCollection services)
    {
        ServiceDescriptor[] existingHostedServiceDescriptors =
            services
                .Where(descriptor => descriptor.ServiceType == typeof(IHostedService))
                .ToArray();

        services.RemoveAll(typeof(IHostedService));

        services.AddTransient<ClusterClientHostedServiceWrapper>();

        foreach (ServiceDescriptor descriptor in existingHostedServiceDescriptors)
        {
            ServiceDescriptor newDescriptor =
                new (
                    typeof(IHostedService),
                    sp =>
                    {
                        object targetService =
                            descriptor.ImplementationType is not null
                                ? sp.GetRequiredService(descriptor.ImplementationType)
                                : descriptor.ImplementationFactory is not null
                                    ? descriptor.ImplementationFactory(sp)
                                    : descriptor.ImplementationInstance
                                    ?? throw new NotSupportedException();

                        if (targetService is not IClusterClient clusterClient)
                        {
                            return targetService;
                        }

                        ClusterClientHostedServiceWrapper wrapper =
                            sp.GetRequiredService<ClusterClientHostedServiceWrapper>();

                        wrapper.Assign(
                            clusterClient as IHostedService
                            ?? throw new NotSupportedException(
                                $"Something went wrong and {nameof(IClusterClient)} is not {nameof(IHostedService)} anymore"
                            )
                        );

                        return wrapper;

                    },
                    descriptor.Lifetime
                );

            services.Add(newDescriptor);
        }
    }
}
public sealed class ClusterClientHostedServiceWrapper : IHostedService
{
    private static readonly RetryStrategy RetryStrategy = new ExponentialBackoffRetryStrategy(
        int.MaxValue,
        TimeSpan.FromSeconds(0),
        TimeSpan.FromSeconds(120),
        TimeSpan.FromSeconds(2)
    );

    private readonly ILogger<ClusterClientHostedServiceWrapper> _logger;
    private readonly CancellationTokenSource _cancellationTokenSource = new ();
    private IHostedService? _hostedService;

    public ClusterClientHostedServiceWrapper(ILogger<ClusterClientHostedServiceWrapper> logger)
    {
        _logger = logger;
    }

    public void Assign(IHostedService hostedService)
    {
        _hostedService = hostedService;
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        if (_hostedService == null)
        {
            throw new InvalidOperationException(
                $"The hosted service is not initialized. {nameof(Assign)} must be called first"
            );
        }

        _ = Task.Run(
            async () =>
            {
                int retryCount = 0;
                bool connected = false;

                while (
                    !_cancellationTokenSource.Token.IsCancellationRequested
                    && !connected
                )
                {
                    try
                    {
                        await _hostedService.StartAsync(_cancellationTokenSource.Token);
                        connected = true;
                    }
                    catch (OperationCanceledException)
                    {
                        throw;
                    }
                    catch (Exception ex)
                    {
                        RetryCondition? retryCondition = RetryStrategy.GetShouldRetryHandler().Invoke(retryCount++, ex);
                        if (retryCondition.RetryAllowed)
                        {
                            _logger.LogWarning(ex, "Failed to connect to Orleans cluster on attempt {RetryCount}", retryCount);
                            try
                            {
                                await Task.Delay(retryCondition.DelayBeforeRetry, _cancellationTokenSource.Token);
                            }
                            catch (OperationCanceledException)
                            {
                            }
                        }
                    }
                }
            },
            _cancellationTokenSource.Token
        );

        return Task.CompletedTask;
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        if (_hostedService == null)
        {
            throw new InvalidOperationException(
                $"The hosted service is not initialized. {nameof(Assign)} must be called first"
            );
        }

        try
        {
            _cancellationTokenSource.Cancel();
            await _hostedService.StopAsync(cancellationToken);
        }
        catch (OrleansException error)
        {
            _logger.LogWarning(error, "Error while gracefully disconnecting from Orleans cluster. Will ignore and continue to shutdown");
        }
    }
}
hnazari13701371 commented 1 month ago

hi , i also encounter same issue !