Azure / iotedge

The IoT Edge OSS project
MIT License
1.45k stars 458 forks source link

Unable to requeue Abandoned messages when using MQTT #900

Closed raghavan20 closed 4 years ago

raghavan20 commented 5 years ago

I have an Edge device that runs two custom modules a publisher and a consumer. A publisher is a simulator module that continuously produces messages. The consumer module receives messages from publisher module through edgeHub routing and it POSTs to a third-party API. A test is made to ensure when messages could not be sent to third-party API, the messages are queued so that the same message can be retried without loss.

I have been talking with some tech people from IoTHub Docs team and I have tried various changes which I will elaborate and also include logs.

Below code demonstrates two points

def receive_message_callback(message, hubManager):
    message_buffer = message.get_bytearray()
    size = len(message_buffer)
    message_text = message_buffer[:size].decode('utf-8')

    data = json.loads(message_text)

    if ingester.push(data):
        return IoTHubMessageDispositionResult.ACCEPTED
    else:
        print('Abandoning message: {}'.format(message_text[:200]))
        return IoTHubMessageDispositionResult.ABANDONED

def __init__(self, protocol=IoTHubTransportProvider.MQTT):
    self.client_protocol = protocol
    self.client = IoTHubModuleClient()
    self.client.create_from_environment(protocol)
    self.client.set_module_twin_callback(module_twin_callback, self)
    self.client.set_option("messageTimeout", MESSAGE_TIMEOUT)

    interval = 100
    print("setting retry policy, value={}".format(interval))
    self.client.set_retry_policy(IoTHubClientRetryPolicy.RETRY_INTERVAL, interval)

    returned_retry_policy = self.client.get_retry_policy()
    print("Got retry policy: strategy={}, timeout={}".format(returned_retry_policy.retryPolicy,
                                                             returned_retry_policy.retryTimeoutLimitInSeconds))

    self.client.set_message_callback("metrics", receive_message_callback, self)

Edge Hub config that sets longer TTL

"$edgeHub": {
  "properties.desired": {
    "schemaVersion": "1.0",
    "storeAndForwardConfiguration": {
      "timeToLiveSecs": 2592000 
    },
  ...
  }
}

Logs highlighting that messages are not retried when messages could not be processed and even after they are marked ABANDONED

Message received: total calls: 424
Unable to ingest measurements - measurements: [{'value': '115', 'timestamp': '2019-02-20T07:56:50+00:00', 'alternateId': 'AirConditioner_1.Humidity'}, {'value': '183', 'timestamp': '2019-02-20T07:56:50+00:00', 'alternateId': 'AirConditioner_2.Humidity'}], error: HTTPSConnectionPool(host='<upstream-domain>', port=443): Max retries exceeded with url: /api/measurements/for/alternate-id/all (Caused by ConnectTimeoutError(<urllib3.connection.VerifiedHTTPSConnection object at 0x7ff1defa1f28>, 'Connection to <upstream-domain> timed out. (connect timeout=10)'))
Abandoning message: [{'dateTime': '2019-02-20T07:56:50+00:00', 'value': '115', 'tag': 'AirConditioner_1.Humidity'}, {'dateTime': '2019-02-20T07:56:50+00:00', 'value': '183', 'tag': 'AirConditioner_2.Humidity'}]
Message received: total calls: 425
Unable to ingest measurements - measurements: [{'value': '165', 'timestamp': '2019-02-20T07:56:55+00:00', 'alternateId': 'AirConditioner_1.Humidity'}, {'value': '142', 'timestamp': '2019-02-20T07:56:55+00:00', 'alternateId': 'AirConditioner_2.Humidity'}], error: HTTPSConnectionPool(host='<upstream-domain>', port=443): Max retries exceeded with url: /api/measurements/for/alternate-id/all (Caused by ConnectTimeoutError(<urllib3.connection.VerifiedHTTPSConnection object at 0x7ff1def9d828>, 'Connection to <upstream-domain> timed out. (connect timeout=10)'))
Abandoning message: [{'dateTime': '2019-02-20T07:56:55+00:00', 'value': '165', 'tag': 'AirConditioner_1.Humidity'}, {'dateTime': '2019-02-20T07:56:55+00:00', 'value': '142', 'tag': 'AirConditioner_2.Humidity'}]
Message received: total calls: 426
ingesting measurements: "[{'value': '240', 'timestamp': '2019-02-20T07:57:00+00:00', 'alternateId': 'AirConditioner_1.Humidity'}, {'value': '273', 'timestamp': '2019-02-20T07:57:00+00:00', 'alternateId': 'AirConditioner_2.Hum" ...
Message received: total calls: 427
ingesting measurements: "[{'value': '252', 'timestamp': '2019-02-20T07:57:05+00:00', 'alternateId': 'AirConditioner_1.Humidity'}, {'value': '284', 'timestamp': '2019-02-20T07:57:05+00:00', 'alternateId': 'AirConditioner_2.Hum" ...
Message received: total calls: 428
ingesting measurements: "[{'value': '188', 'timestamp': '2019-02-20T07:57:10+00:00', 'alternateId': 'AirConditioner_1.Humidity'}, {'value': '237', 'timestamp': '2019-02-20T07:57:10+00:00', 'alternateId': 'AirConditioner_2.Hum" ...
Message received: total calls: 429
ingesting measurements: "[{'value': '195', 'timestamp': '2019-02-20T07:57:15+00:00', 'alternateId': 'AirConditioner_1.Humidity'}, {'value': '190', 'timestamp': '2019-02-20T07:57:15+00:00', 'alternateId': 'AirConditioner_2.Hum" ...
Message received: total calls: 430
ingesting measurements: "[{'value': '271', 'timestamp': '2019-02-20T07:57:20+00:00', 'alternateId': 'AirConditioner_1.Humidity'}, {'value': '192', 'timestamp': '2019-02-20T07:57:20+00:00', 'alternateId': 'AirConditioner_2.Hum" ...
...

Context (Environment)

Device (Host) Operating System

Architecture

amd64

Container Operating System

Docker on Ubuntu

Runtime Versions

iotedged

iotedge 1.0.6.1 (3fa6cbef8b7fc3c55a49a622735eb1021b8a5963)

Edge Agent

mcr.microsoft.com/azureiotedge-agent 1.0

Edge Hub

mcr.microsoft.com/azureiotedge-hub 1.0

Docker

server: 18.06.1-ce

ancaantochi commented 5 years ago

Hi @raghavan20,

Thanks for reporting the issue. I will try to reproduce it and come back to you.

varunpuranik commented 5 years ago

@raghavan20 - I have tested this scenario with a module running the C# SDK and Abandoned messages will be re-queued and resent to the module by the EdgeHub as expected. I will try with the Node.js sdk and get back to you.

Also note that the retry policy you are using only affects the _client _side__ behavior. So, for example, it will kick in when the client is sending messages and the sending fails. It does not affect the server side behavior. The EdgeHub uses an exponential backoff policy, which is currently not configurable.

raghavan20 commented 5 years ago

Thanks for your help and clarifying on retry policy. I use Python SDK for developing custom module.

ancaantochi commented 5 years ago

Hi @raghavan20, It looks like there is a bug with MQTT that the message is resent from edgehub but it is not received by the module. AMQP works, can you use AMQP from your module as a workaround?

raghavan20 commented 5 years ago

Hi @ancaantochi. I can try AMQP; can you point me to a code sample that can consume messages with appropriate queue / route declarations?

I tried to switch just the protocol in my custom module and I got few errors and looks like I have to change some other bits of code / configuration.

Code

    def __init__(self, protocol=IoTHubTransportProvider.AMQP):
        self.client_protocol = protocol
        self.client = IoTHubModuleClient()
        self.client.create_from_environment(protocol)

Error

Error: Time:Tue Mar  5 06:00:50 2019 File:/usr/sdk/src/c/iothub_client/src/iothubtransportamqp.c Func:IotHubTransportAMQP_Subscribe_InputQueue Line:163 AMQP does not support input queues                     
Error: Time:Tue Mar  5 06:00:50 2019 File:/usr/sdk/src/c/iothub_client/src/iothub_client_core_ll.c Func:IoTHubClientCore_LL_SetInputMessageCallbackImpl Line:2819 IoTHubTransport_Subscribe_InputQueue failed  
Unexpected error IoTHubClient.set_input_message_callback, IoTHubClientResult.ERROR from IoTHub
avranju commented 5 years ago

At this time, unfortunately, the only option is for you to wait for the IoT Hub C SDK to gain support for AMQP on Edge. This work has been scheduled but I don't have dates for you yet.

raghavan20 commented 5 years ago

Okay. Is the bug with MQTT requeueing to be fixed sometime soon?

avranju commented 5 years ago

With MQTT message disposition isn't supported at all. This feature is supported only on AMQP and AMQP support is not available in the C SDK as of now which the Python SDK relies upon. We don't have a good story at this time unfortunately.

cc @tameraw @massand

lt72 commented 4 years ago

Closing the issue for now, please feel free to reopen if necessary.

alaendle commented 4 years ago

Since I encountered the same problem trying to implement a transactional consumer with the C SDK. I need to use the C SDK since I'm enforced to use other libraries that are only available for C - and I need at-least-once semantic of message processing; so what are my options? As far as I get it the problem with not redelivering abandoned messages using MQTT still persists - and trying to use AMQP raises errors even trying to connect to the local edge hub. cc @avranju @lt72