eclipse / paho.mqtt.python

paho.mqtt.python
Other
2.18k stars 723 forks source link

Prevent ack of received message until final #348

Closed HaraldGustafsson closed 9 months ago

HaraldGustafsson commented 5 years ago

Hi,

Would it be possible adapt paho so that qos=1 message are not acknowledge until after handling of the on_message call? I was thinking that in client.py: _handle_publish() we have the following code:

    elif message.qos == 1:
        rc = self._send_puback(message.mid)
        self._handle_on_message(message)
        return rc

Could it be changed to:

    elif message.qos == 1:
        try:
              self._handle_on_message(message)
        except SomeSuitableException:
              return MQTT_ERR_AGAIN # Would this be a good error?
        rc = self._send_puback(message.mid)
        return rc

And then allow _handle_on_message to pass on SomeSuitableException. Which should be some Paho defined exception. This would allow not acknowledge message if for example the callback fails to permanently store the message due to error in such storage.

vrst37 commented 5 years ago

While this is possible, why would you want to do this??

Imagine you block with a sleep(60) in on_message, then qos=1 ack is not sent to the broker. Then the broker keeps trying to send the message again and again, queuing the message in the library buffer. I don't think that behaviour is acceptable.

The problem in general terms is, you are trying to use protocol level handshaking concepts to signify application level signals. Which I think is bad, we should not confuse the two.

HaraldGustafsson commented 5 years ago

@vrst37 it is common for many broker protocols to allow waiting with ack until a message is processed. The general thinking is that it is not recieved persistently until e.g. stored. A broker should not resend a message while the connection is not lost. In Mqtt 5 it is even forbidden to resend while the connection is alive. The exception is to have a typical python way of signaling that something bad happened.

yatwang commented 5 years ago

@vrst37 I find that there is risk of message lost even with QoS 1 due to the PUBACK was sent back to the broker before calling the on_message callback function. If there is power outage, hardware or other system failure after the PUBACK and before the on_message callback is called, there will be message lost and it seems that there is no way to avoid it.

The Java implementation Eclipse Paho client sends the acknowledgement until the messageArrived method returns cleanly. org.eclipse.paho.client.mqttv3 - Interface MqttCallback

void messageArrived(java.lang.String topic, MqttMessage message) throws java.lang.Exception

This method is called when a message arrives from the server. This method is invoked synchronously by the MQTT client. An acknowledgment is not sent back to the server until this method returns cleanly.

Besides, I have also tried the mqtt.js client library and it also sends the PUBACK after the on message callback function returns. The MQTT client for Node.js

In other MQ products, they also support similar feature, e.g. RabbitMQ RabbitMQ tutorial - Work Queues

In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.

In the on_message function, programmers may want to write the message to Redis, MQ, Database, etc. with high availability allowing workers for further processing for building reliable and scalable systems.

yatwang commented 5 years ago

MQTT is an application layer protocol on top of the TCP transport layer protocol. If the PUBACK is not sent after the client or the broker handling the message, I think the TCP already provides reliable message delivery and the extra acknowledgement would not be needed. Just my two cents.

dalbani commented 4 years ago

Coming from a RabbitMQ / Google Cloud PubSub background, I find it strange as well that MQTT messages are automatically ACK'ed, even if a problem occurred in the handler. The fact that the Java version of the Paho client doesn't acknowledge in case of an error, seems a good reason to implement it as well in the Python client. Isn't it?

dalbani commented 4 years ago

I've looked at other Paho implementations, and it seems that the C implementation is the only (other) one that supports explicit acknowledgement, via the return value of the MQTTAsync_messageArrived function. (This functionality also exists in the synchronous version of the library.)

baldoalessandro commented 4 years ago

I found this https://github.com/eclipse/paho.mqtt.c/issues/522 to be something worth looking at! It seems to me that this has pushed people to revive the Python C binding of paho for the sake of conceptual form.

The problem in general terms is, you are trying to use protocol level handshaking concepts to signify application level signals. Which I think is bad, we should not confuse the two.

Please re-consider this, I would be more that happy to contribute code/tests

chrismaes87 commented 4 years ago

Hello, this is really a drawback for the paho-mqtt library compared to other libraries. We now have no way to guarantee that we will not lose any message. The solution proposed (processing before sending the ack message) would be perfect and doesn't seem complicated to implement.

This is a requirement for paho-mqtt to be production-grade! We cannot afford the risk to lose a message due to network glitches or other.

munluk commented 4 years ago

Hi @chrismaes87 , I just wanted make a note at this point. MQTT is not a message queue. The re-delivery of messages (probably depending on the MQTT version) is specified different than you might expect. Please read up on the official documentation for v3.1.1.

When a Client reconnects with CleanSession set to 0, both the Client and Server MUST re-send any unacknowledged PUBLISH Packets (where QoS > 0) and PUBREL Packets using their original Packet Identifiers [MQTT-4.4.0-1]. This is the only circumstance where a Client or Server is REQUIRED to redeliver messages.

If you read it, it sounds that if you cannot process a message, you can only assure that the broker will re-send the PUBLISH package IF you re-connect. Of course this happens only on publishes QoS > 0. This means that in case you do not want to acknowledge the PUBLISH with a PUBACK the broker will store it and he is only required to send the message to the client again when you re-connect.

Please correct me if I am wrong.

FirstGitCommentEver :)

HaraldGustafsson commented 4 years ago

@munluk this i already commented on in my comment above. Why you should not send the ack is due to if you get a fail crash or fail silent between receiving the message and handling it. When the client crash or hangs it will reconnect. You can't use this as a way to handle any exception, since at least you need to reconnect. But you won't lose the message with this change compared to how it is implemented today.

linuxbasic commented 3 years ago

The MQTT specifications says:

In the QoS 1 delivery protocol, the receiver MUST respond with a PUBACK packet containing the Packet Identifier from the incoming PUBLISH packet, having accepted ownership of the Application Message

Figure 4.2 – QoS 1 protocol flow diagram, non-normative example goes into more detail:

The receiver does not need to complete delivery of the Application Message before sending the PUBACK. When its original sender receives the PUBACK packet, ownership of the Application Message is transferred to the receiver.

I personally find this annoying and misleading to burry this small fact in the specs but that's the downside of a lightweight and telemetry-focused protocol. 🙈

Besides this "non-normative example" the specification is extremely vague about the definition of "accepting ownership of a message". I would prefer the application and not the messaging library to decide when the ownership of a message is accepted.

A "manual ack" mode would be extremely valuable (like NATS Streaming has). I think a "manual ack" mode would not conflict with the specification as one can easily include the processing of a message (e.g. store it in a database) into the "taking ownership process".

I understand that this change would be somewhat revolutionary in the MQTT library landscape, but I see a clear need and no obvious downsides in adding it as an optional feature.

munluk commented 3 years ago

@linuxbasic, the manual acknowledging is built in the java version of the paho client :wink: https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/IMqttAsyncClient.java#L823

petersilva commented 3 years ago

I have made a patch 6225aff21571d7bc8e2e1507954dea037bb1af46 It's ... I dunno, a dozen lines of code? most of the change is documenting the api change.

It adds auto_ack as an argument to the init function, defaulting to True (existing behaviour.) If you turn set it to False, then the library no longer provides acks.

messages already have the "mid" attribute defined. Just need to provide that as an argument to the new entry point ack(mid)

Also added an auto_ack(on=True) entry point to turn if off ... purpose of that was this bit of logic, in my client code:


               if hasattr( self.client, 'auto_ack' ):
                    self.client.auto_ack( False )
                    logger.info("Switching off auto_ack for higher reliability. Using explicit acknowledgements." )
                else:
                    logger.info("paho library without auto_ack support. Loses data every crash or restart." )

To give the ability to test for presence of the feature, and use it if it is there. Would be nice if people could try it out, and comment.

It is available in: https://github.com/MetPX/paho.mqtt.python

BBBSnowball commented 3 years ago

With the current implementation, puback says that the message has reached an intermediate point between the server and the application. If we say that the paho library is the receiver, this conforms to the spec ("arrives at the receiver at least once"). However, I would argue that it does so in a very narrow way that is next to useless. That way, QoS 1 slightly increases the chance of delivering the message but it doesn't give any guarantees that are meaningful to the application, i.e. the message can still get lost before it has any effect that is visible to the application.

There is some evidence that the authors of MQTT Version 3.1.1 wanted the narrow interpretation, which only focusses on network problems: They talk about "data loss on some older TCP networks" and they don't forbid resending messages while the TCP connection is alive. Both of that is changed in MQTT 5 (as has been pointed out by @HaraldGustafsson).

I would argue that the correct default behaviour would be to first call the handler and then puback the message. That way, we will handle it at least once (and maybe more than once if we are interrupted before the puback). Furthermore, the sender can use our puback messages to implement a sensible rate limit. While I strongly think that this is the correct default, it would be an incompatible change.

The argument is not so clear cut when the handler aborts with an exception: We would usually not puback and propagate the error. This will most likely lead to a reconnect and the message will be processed again. This is a good reaction in many cases, e.g. if handling failed due to temporary connection problems with the database. It wouldn't make much sense to lie and puback because the next message will run into the same problem. However, this will hamper any further progress if the exception is due to a persistent problem, e.g. one message contains an emoji and the application is not prepared to handle this. We would be better off skipping this message.

The approach taken by the PR is different: It doesn't change the default and it gives all the power to the application. It is also more powerful than simply changing the order of calls: The application may return from the callback and only later acknowledge the message. This is required for asyncio-mqtt, which puts the message into an asyncio queue to be handled later. Thus, the PR is necessary even if we decide to change the default behaviour (which I assume, we won't do). With that power, comes some responsibility: The application must make sure to always call ack() eventually. It is in a better position than the library because it can know which exceptions it can recover from. Still, this is no easy feat if the way to the usual ack() call involves several threads or tasks and has multiple failure points. For example, asyncio-mqtt will put messages into a bounded queue and it will drop messages if the queue is full.

Unfortunately, some parts of the MQTT 3.1.1 approach are still present in MQTT 5 (and there are good arguments for keeping it that way):

The non-normative part says that the first one is for the benefit of the subscriber (i.e. the application) but it wouldn't be unthinkable that a server takes advantage of that guarantee to optimize its tracking of unacknowledged messages. I think we can add this requirement to the help text and let the application deal with it. They are in a better position to do so.

For the second one: We either have to add a default on_message handler with if not self._auto_ack: self.ack(message.mid) or again put the onus onto the application. I'd say the latter is fine because if the application does add a handler, it has to deal with that anyway (we cannot know whether the handler failed to call ack() or plans to do so later).

petersilva commented 3 years ago

I would add that callbacks need to return quickly to avoid message loss (learned that the hard way, yup!) I think callbacks are synchronous with the library. My guess would be there are many applications where ensuring a message is properly received is more involved than just running the initial (abbreviated) callback.

martinscheffler commented 2 years ago

Just wanted to +1 this issue. I am really surprised that there is not a single python mqtt client library that lets me not send an ACK to the broker when storage to the database fails. I have a situation where lost messages can result in financial losses, so the chain of ACKs must be without gaps. Please please don't make me switch back to Java : (

petersilva commented 2 years ago

Just feel the need to point out later in that same section of the MQTT standard:

"By default, a Server MUST treat every Topic as an Ordered Topic when it is forwarding messages on Non‑shared Subscriptions. [MQTT-4.6.0-6]. A Server MAY provide an administrative or other mechanism to allow one or more Topics to not be treated as an Ordered Topic."

So the above stipulations about ordering apply to non-shared subscriptions. In my use case, all subscriptions are shared, so it does not apply, even in the standard. Also note that the next sentence gives permission for the server to permit out of order... without saying how to signal that to a client.

petersilva commented 2 years ago

fwiw... I'm also puzzled by the term "ordered Topic"... in a single connection, one could subscribe to multiple topics, either with multiple subscribe topics or with wildcards, it isn't clear to me that the broker is sending "an ordered Topic" at all. it is sending messages from many topics. It isn't clear to me that messages from different topics are considered part of the same "ordered Topic" or not.

petersilva commented 2 years ago

my mearning may be more clear thus... perhaps messages on topic A are 10x more expensive (in time) to process than topic B but 100x less frequent. so when you receive a message on A, in a stream of B, then it makes sense to acknowledge 10B's before the A you received, maximizing throughput. I read the spec. as the combined flow on A and B are a single ordered Topic, so it would insist that the A be acknowledged while the B's wait.

SebastianBerchtold commented 10 months ago

I would be interested in this, as apparently many others are. Is there any reason to not merge the recently updated pull request #753?

PierreF commented 9 months ago

To my understanding this issue was fixed with release 1.6.0 in commit https://github.com/eclipse/paho.mqtt.python/commit/9a4941efd758ca4231d4f16a8d3951686e24cb92: The ack of QoS = 1 message now occur after on_publish callback.

If the issue isn't fixed, feel free to reopen this issue.

petersilva commented 9 months ago

I'm not sure if this is worth re-opening, but until #753 is merged, providing for application controlled manual acknowledgements, I don't think all the cases that people are interested in related to this issue are addressed.

The most common/obvious case is back-pressure... if something being used for persistence is temporarily unavailable, then we want to apply back-pressure to the broker, and stop receiving new messages. The calls to on_message are automatic, and the acknowledgement of messages happens after on_message is called without regard to success. Incoming queue of messages will be drained into the current consumer, regardless of the ability to process them.

I don't know if such problems have been addressed in more recent code, but I had lots of issues with message loss when my on_message processing got complicated. In my usage of the library, on_messsage had to be extremely lightweight to avoid message loss. So having on_message not return as a means of controlling acknowledgements has not been an option in the past.

This is especially problematic when using shared subscriptions where processing is OK on one participant, but broken in the other. Rather than having the working subscriber process all messages, the current method will share the messages evenly among the share participants, because the library will call on_message (which must return quickly) and then acknowledge all the messages regardless of the ability to process them.

There is no way for an application to apply back-pressure, that is, to get the source to wait while the consumer has some temporary issue, or for different participants to consume at different rates.

There should be no reason to debate the need for manual application level acknowledgements since other language bindings (e.g. java) provide this same functionality. it has already been debated and accepted, and refusing to add it to the python binding is just hobbling python users when compared to users of other languages.

PierreF commented 8 months ago

Agree that manual ack could be added. I've closed the issue because the issue was about acking after on_message.

Side note on back pressure: I use to do back pressure by not returning from on the on_message callback, which... "work". It does because we don't ack the message but will cause the client to disconnect due to not responding to keepalive which is not perfect. Anyway #753 is merged.