Azure / azure-iot-sdk-csharp

A C# SDK for connecting devices to Microsoft Azure IoT services
Other
464 stars 493 forks source link

In some circumstances DeviceClient.SendEventAsync can go wild, consume all available memory #3342

Closed ccruden-aspire closed 9 months ago

ccruden-aspire commented 1 year ago

Context

Description of the issue

We have a service whose only job is to read messages from files in a directory and write the contents of those files to IoT hub messages. It has been in service at dozens of sites for several years transferring gigabytes a day without seeing this problem. However, starting a day ago, it seems two IoT hubs have entered a state which seems to cause a message sent with SendEventAsync to block / loop, and consume all available memory. In seconds the process goes from using 70M of memory to 25G and either has to be killed off or the machine rebooted. This only seems to occur under particular circumstances:

Code sample exhibiting the issue

It's impossible to isolate this to code since the issue is only happening at a couple of production sites and we don't have access to experiment with those IoT hubs. Relevant code included below. The service involves pulling from a BlockingCollection of IQueueMessages and awaiting a call to SendToHub.

    private const int MaxMessageSize = 262144;

    public bool IsSending { get; private set; }
    private DeviceClient _IOTHubClient;
    private ILogger<IotHubClient> _log;
    private SemaphoreSlim _ioHubLock;
    private DateTime firstDisabled;
    public event EventHandler UnrecoverableError;
    private int messagesSent;
    private int bytesSent;
    public IotHubClient(ILogger<IotHubClient> log)
    {
        _log = log;
        _ioHubLock = new SemaphoreSlim(1, 1);
        IsSending = true; //so that we at least try once to process some files on startup            
    }

    public string ConnectionString { get; set; }
    public string ContentType { get; set; }
    public string ContentEncoding { get; set; }
    public IDictionary<string, string> Properties { get; set; }

    private async Task ConnectToHub()
    {
        if (_IOTHubClient == null)
        {
            try
            {
                await _ioHubLock.WaitAsync();
                if (_IOTHubClient == null)
                {
                    (string host, string device) = GetHostAndDevice(ConnectionString);
                    _log.LogDebug("IOTHubConsumer Store And Forward created for host {host}, device {device}", host, device);
                    DeviceClient client = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt);
                    await client.OpenAsync();
                    IRetryPolicy retryPolicy = new ExponentialBackoff(int.MaxValue, TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(10), TimeSpan.FromMilliseconds(100));
                    client.SetRetryPolicy(retryPolicy);
                    client.SetConnectionStatusChangesHandler(ConnectionStatusChangedHandler);
                    _IOTHubClient = client;
                    _log.LogDebug("Successfully opened IOT hub");
                    firstDisabled = DateTime.MaxValue;
                    IsSending = true;
                }
            }
            catch (Exception ex)
            {
                _log.LogError(ex, "Exception opening IOT hub");
                _IOTHubClient = null;
                throw;
            }
            finally
            {
                _ioHubLock.Release();
            }
        }
    }

    private async Task DisconnectClient()
    {
        IsSending = false;
        DeviceClient local = _IOTHubClient;
        _IOTHubClient = null;
        if (local != null)
        {
            await local.CloseAsync();
            local.Dispose();
        }
    }

    private Task Bail()
    {
        IsSending = false;
        UnrecoverableError?.Invoke(this, new EventArgs());
        return Task.CompletedTask;
    }

    private async void ConnectionStatusChangedHandler(ConnectionStatus status, ConnectionStatusChangeReason reason)
    {
        switch (status)
        {
            case ConnectionStatus.Connected:
                _log.LogDebug("Successfully connected to IOT hub");
                IsSending = true;
                break;

            case ConnectionStatus.Disconnected_Retrying:
                _log.LogDebug("IOT hub disconnected - retrying.");
                IsSending = false;
                break;

            case ConnectionStatus.Disabled:
                _log.LogInformation("IOT hub disabled - reconnecting manually.");
                await DisconnectClient();
                break;

            case ConnectionStatus.Disconnected:
                switch (reason)
                {
                    case ConnectionStatusChangeReason.Bad_Credential:
                        _log.LogInformation("IOT hub credentials rejected. Retrying.");
                        try
                        {
                            DeviceClient client = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt);
                            await client.OpenAsync();
                            client.SetConnectionStatusChangesHandler(ConnectionStatusChangedHandler);
                            IRetryPolicy retryPolicy = new ExponentialBackoff(int.MaxValue, TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(10), TimeSpan.FromMilliseconds(100));
                            client.SetRetryPolicy(retryPolicy);
                            await _ioHubLock.WaitAsync();
                            _IOTHubClient = client;
                            _log.LogInformation("Retry successful.");
                            _ioHubLock.Release();
                        }
                        catch (Exception ex)
                        {
                            _log.LogCritical(ex, "Exception re-establishing connection.");
                            await Bail();
                        }
                        break;

                    case ConnectionStatusChangeReason.Client_Close:
                        _log.LogDebug("IOT hub closed gracefully.");
                        break;

                    case ConnectionStatusChangeReason.Communication_Error:
                        _log.LogInformation("IOT hub disconnected because of non-retryable exception. Restarting.");
                        await DisconnectClient();
                        break;

                    case ConnectionStatusChangeReason.Device_Disabled:
                        // This is gonna lose data, but at this point that's unavoidable.
                        if (firstDisabled - DateTime.Now > TimeSpan.FromMinutes(5))
                        {
                            _log.LogCritical($"The IOT hub device for the store and forward has been disabled or deleted for more than 5 minutes. Aborting.");
                            await Bail();
                        }
                        else
                        {
                            // Hopefully the device has just been temporarily disabled and it will be enabled before the data piling up kills us off.
                            if (firstDisabled == DateTime.MaxValue)
                            {
                                firstDisabled = DateTime.Now;
                            }
                            _log.LogCritical($"The IOT hub device for the store and forward has been disabled or deleted. Retrying, but it's not looking good.");
                            await DisconnectClient();
                        }
                        break;

                    case ConnectionStatusChangeReason.Retry_Expired:
                        _log.LogInformation("IOT hub disconnected because retry expired. Retrying more forcibly.");
                        await DisconnectClient();
                        break;

                    // No_Network is not used.
                    case ConnectionStatusChangeReason.No_Network:
                    // Expired_SAS_Token is not used
                    case ConnectionStatusChangeReason.Expired_SAS_Token:
                    default:
                        _log.LogError($"ConnectionStatus {status}, ConnectionStatusChangeReason {reason}: this should never happen. Contact support if you see this message.");
                        break;
                }
                break;

            default:
                _log.LogError($"ConnectionStatus {status}, ConnectionStatusChangeReason {reason}: this should never happen. Contact support if you see this message.");
                break;
        }
    }

    public async Task SendToHub(IQueuedMessage message)
    {
        try
        {
            await ConnectToHub();
            byte[] messageBytes = await message.GetBytes();
            if (messageBytes.Length == 0)
            {
                await message.MarkProcessed();
            }
            else if (messageBytes.Length > 0 && messageBytes.Length < MaxMessageSize)
            {
                using Message msg = new(messageBytes);
                if (!string.IsNullOrEmpty(ContentEncoding))
                {
                    msg.ContentEncoding = ContentEncoding;
                }
                if (!string.IsNullOrEmpty(ContentType))
                {
                    msg.ContentType = ContentType;
                }
                if (Properties != null)
                {
                    foreach(KeyValuePair<string,string> pair in Properties)
                    {
                        msg.Properties[pair.Key] = pair.Value;
                    }
                }

                await _IOTHubClient.SendEventAsync(msg);
                _log.LogDebug($"Sent {messageBytes.Length} byte message to IoTHub");
                Interlocked.Increment(ref messagesSent);
                Interlocked.Add(ref bytesSent, messageBytes.Length);
                await message.MarkProcessed();
            }
            else
            {
                await message.MarkPoisoned();
            }
        }
        catch (Exception ex)
        {
            _log.LogWarning(ex, "Error encountered while sending data to IoTHub");
            await message.Requeue();
        }
    }

Console log of the issue

The service's log at Debug level ends on a line with "Sent # byte message to IoTHub" several seconds before the process had to be killed off. Before that, that message was occurring multiple times a second. adc_000001.zip

ccruden-aspire commented 1 year ago

On a hunch, I tried replacing the DotNetty 0.7.0 assemblies included with MADC 1.40 with DotNetty 0.7.5 assemblies, and that appears to have resolved the main problem in this issue - the giant memory consumption. The issue where the service stops and starts sending regularly is still present, but that's probably IoT hub related...

romandres commented 11 months ago

I'm encountering a similar issue when trying to connect a device using OpenAsync() that is disabled in the IoT Hub. The process then allocates all available memory in almost no time.

I'm on

Updating to DotNetty 0.7.5 seems to have fixed that issue, thanks @ccruden-aspire!

timtay-microsoft commented 9 months ago

We've upgraded the dotnetty dependency to the latest in #3401, so I'll mark this as fix checked in

timtay-microsoft commented 9 months ago

This issue has been fixed as of this release, so I'll close this thread