Open flxo opened 5 years ago
Yeah this is a pain point. I'm thinking on how to fix this but every method I come up with has some disadvantage
This existing design idea is that your receiver should always be faster than your sender and it's user's responsibility to make sure that this happens. Having an error helps a lot but I'm not sure of the design yet. Any inputs will help :).
We could block the channel's sender asynchronously but the contention will go all the way back to tcp socket.
The correct way to fix the problem of lost messages is to activate the hand check between the broker and the client. (Subscription with Qos1 or Qos2).
When subscribing with Qos1 or Qos2, the broker sends only a limited number of messages to the client. See max_inflight_messages setting [https://mosquitto.org/man/mosquitto-conf-5.html] Further messages are not sent until the client has acknowledged them. To achieve this, the bug "Disable auto acks" [https://github.com/AtherEnergy/rumqtt/issues/101] would have to be fixed.
If I remember correctly the 'in flight' feature is only for publishers. The broker doesn't know about a 'in flight' setting of a subscriber. Otherwise the Connect
or Subscribe
packet would need to have a field for telling the broker about. So a subscribed client has to surrender to whatever the broker does.
@tekjar What's wrong about applying backpressure on the broker connection (in case of QOS1/2)? Then it's up to the broker to (maybe) do the same to publishers. Not sure if the MQTT spec is saying something about that.
If the client drops the notification handle rumqtt
could stop pushing notifications to the notification channel tx. This would be more or less sugar since a application that subscribes should also take care for receiving.
I think there is currently nothing else than Publishes
reported. Right? A little bit off-topic but maybe notifications about the connection state are interesting to the app.
What's wrong about applying backpressure on the broker connection (in case of QOS1/2)? Then it's up to the broker to (maybe) do the same to publishers. Not sure if the MQTT spec is saying something about that
Thought about this. It seems correct to push the backpressure to the broker instead of failing in rumqtt. I think broker drops new publishes to this client if this is the case (need to verify the spec).
We could use tokio/futures channel to context switch the event loop when the notification queue is full. But we'll lose the flexibility of crossbeam select!
.
Hi @flxo. develop
branch now disconnects the client if the notification receiver doens't keep up with incoming notifications. Do you to check if this solves your issue? We can also try exploring poll_fn
which reads from network stream only when crossbeam channel has space but I'm not sure about the underlying polling behavior (e.g. crossbeam rx can't wake up network polling).
The current method seemed like a an option with minimal impact to me.
NOTE: Notification channel capacity is very low now (10). You might want to tweak that
Upon higher load conditions incoming notifications are silently dropped (unless logging is configured). The
try_send
inhandle_notification
will return afull
.https://github.com/AtherEnergy/rumqtt/blob/7a07a00b779d172b14639161b0d99df7b284335f/src/client/connection.rs#L491-L499
The consumer has no possibility to detect or handle this. I recently did some load testing and stumbled across this. If it would be changed to a blocking
send
probably the full event loop might get blocked which is probably not a good idea. Next the notifications must be handled in that case.