Azure / azure-iot-sdk-node

A Node.js SDK for connecting devices to Microsoft Azure IoT services
https://docs.microsoft.com/en-us/azure/iot-hub/
Other
260 stars 226 forks source link

[Technical Question] Messages from edgeHub to IoT Edge modules may be dropped at the time of SAS token update. #1215

Open hiroyha1 opened 4 months ago

hiroyha1 commented 4 months ago

By default, a SAS token is renewed every 45 minutes, at that time disconnection and reconnection happen. The problem is that if a message is sent from the HUB at the time of reconnection, the message will be lost. How can I avoid this message drop?

In the C# SDK, reconnection does not seem to occur for IoT edge modules, and the same problem does not occur.

Code sample exhibiting the issue

Source module (C#)

    async Task SendEvents(CancellationToken cancellationToken)
    {
        int messageDelay = 100;
        int count = 1;
        int dataSize = 10;

        while (!cancellationToken.IsCancellationRequested)
        {
            string messageString = new string('*', dataSize);
            using Message message = new(Encoding.UTF8.GetBytes(messageString));
            message.ContentEncoding = "utf-8";
            message.ContentType = "text/plain";
            message.Properties.Add("sequenceNumber", count.ToString());
            _logger.LogInformation($"Send message: {count}, Size: [{dataSize}]");
            await _moduleClient!.SendEventAsync("output1", message, cancellationToken);
            count++;
            await Task.Delay(messageDelay, cancellationToken);
        }
    }

Sink module (Node.js)

'use strict';

var Transport = require('azure-iot-device-mqtt').Mqtt;
var Client = require('azure-iot-device').ModuleClient;

Client.fromEnvironment(Transport, function (err, client) {
  if (err) {
    console.error(err.toString());
    process.exit(-1);
  } else {
    let options = client._methodClient._options;
    options.tokenRenewal = {
      tokenValidTimeInSeconds: 10,
      tokenRenewalMarginInSeconds: 5
    };
    client.setOptions(options);
    client.open(function (err) {
      if (err) {
        console.error(err.toString());
        process.exit(-1);        
      }
      console.log('IoT Hub module client initialized');
      client.on('inputMessage', function (inputName, msg) {
        pipeMessage(client, inputName, msg);
      });
    });
  }
});

function pipeMessage(client, inputName, msg) {
  client.complete(msg, printResultFor('Receiving message'));

  if (inputName === 'input1') {
    var message = msg.getBytes().toString('utf8');
    console.log('message size: ' + message.length);
  }
}

function printResultFor(op) {
  return function printResult(err, res) {
    if (err) {
      console.log(op + ' error: ' + err.toString());
    }
    if (res) {
      console.log(op + ' status: ' + res.constructor.name);
    }
  };
}

Log (Sink Module)

2024-03-06T00:55:49.703Z azure-iot-device-mqtt:MqttTwinClient Message received on a topic we can ignore: devices/...
2024-03-06T00:55:49.804Z azure-iot-mqtt-base:MqttBase connected -> reconnecting (connected.updateSharedAccessSignature)
2024-03-06T00:55:49.805Z azure-iot-mqtt-base:MqttBase disconnecting mqtt client
2024-03-06T00:55:49.805Z azure-iot-mqtt-base:MqttBase removing all listeners
...
2024-03-06T00:55:49.813Z mqttjs:client end :: finish :: calling process.nextTick on closeStores
2024-03-06T00:55:49.813Z mqttjs:client end :: closeStores: closing incoming and outgoing stores
2024-03-06T00:55:49.813Z mqttjs:client end :: closeStores: emitting end
2024-03-06T00:55:49.813Z mqttjs:client end :: closeStores: invoking callback with args
2024-03-06T00:55:49.813Z azure-iot-mqtt-base:MqttBase mqtt client disconnected - reconnecting

Log (edgeHub)

<7> 2024-03-06 00:55:49.808 +00:00 [DBG] [Microsoft.Azure.Devices.Edge.Hub.Core.Routing.ModuleEndpoint] - Sending 1 message(s) to module iotedge1/ModuleC.                                                                                      
<7> 2024-03-06 00:55:49.808 +00:00 [DBG] [Microsoft.Azure.Devices.Edge.Hub.Core.Device.DeviceMessageHandler] - Sent message with correlation ID 0d64f7d1-be58-466e-8429-c09370c9a8c2 to iotedge1/ModuleC
<6> 2024-03-06 00:55:49.808 +00:00 [INF] [EdgeHub] - "Closing connection for device: iotedge1/ModuleC, , "
...
<4> 2024-03-06 00:56:19.809 +00:00 [WRN] [Microsoft.Azure.Devices.Edge.Hub.Core.Device.DeviceMessageHandler] - Did not receive ack for message 0d64f7d1-be58-466e-8429-c09370c9a8c2 from device/module iotedge1/ModuleC
<4> 2024-03-06 00:56:19.810 +00:00 [WRN] [Microsoft.Azure.Devices.Edge.Hub.Core.Routing.ModuleEndpoint] - Error sending messages to module iotedge1/ModuleC
System.TimeoutException: Message completion response not received
   at Microsoft.Azure.Devices.Edge.Hub.Core.Device.DeviceMessageHandler.SendMessageAsync(IMessage message, String input) in /home/hiroyha/iotedge/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/device/DeviceMessageHandler.cs:line 498
   at Microsoft.Azure.Devices.Edge.Hub.Core.Routing.ModuleEndpoint.ModuleMessageProcessor.ProcessAsync(ICollection`1 routingMessages, IDeviceProxy dp, CancellationToken token) in /home/hiroyha/iotedge/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/ModuleEndpoint.cs:line 166
vishnureddy17 commented 3 months ago

It looks like in your Node sample you are using Mqtt. What transport are you using in the C# case?

hiroyha1 commented 3 months ago

I use Mqtt in the C# case, too.

vishnureddy17 commented 3 months ago

Thank you for bringing this up, we will investigate. If you need to mitigate this issue immediately, it may be worth using Amqp as a workaround for now.