aws / aws-iot-device-sdk-python

SDK for connecting to AWS IoT from a device using Python.
Apache License 2.0
683 stars 426 forks source link

configureOfflinePublishQueueing(0) setting in AWSIoTMQTTClient object not working. #293

Closed kirklwilliams closed 3 years ago

kirklwilliams commented 3 years ago

==============================================================================

Python 2.7.16 Python 3.7.3 AWSIoTPythonSDK 1.4.9

Raspberry Pi reference 2020-05-27 Raspberry Pi 4 Model B Rev 1.1

Distributor ID: Raspbian Description: Raspbian GNU/Linux 10 (buster) Release: 10 Codename: buster

==============================================================================

I was asked to open this as an issue by AWS Support for which we have a support business plan for. The below is basically describing the same behavior I have seen in a couple of other links I have found here, but so that I am following instructions, I am creating this issue with my specifics.

==============================================================================

Just for or a little bit more background, the way we are using IoT MQTT is a little different in that the interaction with our application that is using MQTT is real-time, human interaction, where there is an immediate action (publish) followed by an immediate reaction (subscribe). For example, we have the Raspberry Pi with a scanner and display connected and the operator scans a LOGIN barcode. That (action) message immediately gets published to our back-end system. Our back-end system then publishes a message back to the Pi stating “SCAN USER ID”. The Raspberry Pi then (reaction) is subscribing and immediately receives that message and shows it on the display. This is one of the real-life examples, but this type of action/reaction is how our application works that is using MQTT so as you can see, when there are communication interruptions, it is crucial that we try and avoid “duplicate” messages when that interruption just happen to occur when our application is trying to publish a message and ensure we at least get 1 through after reconnection, but it is not duplicated.

With that in mind and after doing some more testing trying different combinations of configurations to best suit how we are using MQTT, it appears that using a setting of configureOfflinePublishQueuing(0) may work best for us. This is because, since we are actually perform multiple .publish() attempts until successful, this appears to at least get us one message through, though it appears there can still be a duplicate, but only a few.

Here are the connection setting we are currently using for this approach:

myAWSIoTMQTTClient = None

myAWSIoTMQTTClient = AWSIoTMQTTClient(iot_clientid) myAWSIoTMQTTClient.configureEndpoint(iot_host, iot_port) myAWSIoTMQTTClient.configureCredentials(iot_rootcapath, iot_privatekeypath, iot_certificatepath)

myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20) myAWSIoTMQTTClient.configureOfflinePublishQueueing(0) myAWSIoTMQTTClient.configureDrainingFrequency(2) myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) myAWSIoTMQTTClient.configureMQTTOperationTimeout(10)

Here is the code snippet around the .publish()

message_to_publish = '{"message": "3BF16D01-D0EA-450F-A297-29543CE11640~ttyUSB0~PASTREDTM~0000000CCFC4LOGIN\r", "messagetype": "FAASPUBLISHJOB", "urlsetid": 500} '

while True:

           try: 
                          myAWSIoTMQTTClient.publish(pub_topic, message_to_publish, 1)
           except AWSIoTExceptions.publishQueueFullException as iotqfe: 
                          time.sleep(1) 
                          continue 
           except AWSIoTExceptions.publishQueueDisabledException as iotqde: 
                          time.sleep(1) 
                          continue 
           except AWSIoTExceptions.publishTimeoutException as iottoe: 
                          time.sleep(1) 
                          continue 
           except Exception as e: 
                          time.sleep(1) 
                          continue

When using this method, this is how everything unfolds.

So without some type consistency with the setting, I am not feeling comfortable with the approach to try an ensure 1) at least one of the message to publish is successful and 2) that one message is not published more than once.

QUESTION: Would there be some reason that I am not aware of that would cause this inconsistent behavior with this setting? I have ran my sandbox application on both Windows and directly on our Raspberry Pi where our normal application runs and seen this happen on both platforms.

But when it works and expected results occur when the configureOfflinePublishQueueing(0) setting is recognized, it works good enough. Therefore, I went ahead and applied the configureOfflinePublishQueueing(0) setting to our real application (with MQTT logging turned on) and sure enough, even though the MQTT logger shows Configuring offline requests queueing: max queue size: 0, when I disconnect Wifi, the .publish() is just throwing constant publishTimeoutException exceptions versus the expected 1 publishTimeoutException, many publishQueueDisabledExceptionOffline and this lead to a bunch of those offline published that occurred during the loop until reconnection to publish the same message many times. I then stop my real application, run my sandbox application right then and there on the same Pi, and it works as expected. Try the real application again right away, and it doesn’t not work as expected even though both application have the configureOfflinePublishQueueing(0). So this also part of the inconsistency I was referring to.

There was also one suggestion in a link here where someone suggested emptying the publish queue myself in our application when the .publish() fails. So I tried the one suggestion of emptying the [._mqtt_core._internal_async_client._paho_client._out_messages queue] as well. Strange thing is just like with the configureOfflinePublishQueuing(o) setting, when I run my simple sandbox program right from Visual Code, this solution actually worked and stepping through the code via DEBUG I was emptying the queue. However, when I add the exact same code to our real application, it does not work and still just the same constant publishTimeoutException exceptions.

The one big difference between the sandbox and real application is that in the real application, all of the MQTT business logic is running in its own thread. I could be wrong, but it looks like there’s some thread safe logic in the SDK code that would restrict accessing objects, like this publish queue, from another thread so that could be why this approach doesn't work in the real application.

I also see the suggestion of trying the Python v2 SDK, but in reading open issues for it, it appears trying to control this configure offline publishing type behavior is happening in that version as well to where all messages are queued and there is no setting to control it in v2 like there is in v1. Therefore for us, we would basically still have the same problem as we do currently in v1.

In closing, since our application is an action/reaction type message exchange, you can easily see if messages were published more than one when not expected because our back-end system responds to each so if 2 “LOGIN” messages were published, 2 “SCAN USER ID” messages would be subscribed, so we would get charge for “duplicate” messages and we want to avoid that as well.

Thanks for all your help!

Kirk

jmklix commented 3 years ago

You should have gotten an answer with AWS Support. If not, or if you have any more questions please comment here or open a new issue.

github-actions[bot] commented 3 years ago

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see. If you need more assistance, please either tag a team member or open a new issue that references this one. If you wish to keep having a conversation with other community members under this issue feel free to do so.