ce-rust / cerk

CloudEvents Router with a Microkernel
Apache License 2.0
20 stars 5 forks source link

MQTT Port: Support for QoS 1/2 #71

Open linuxbasic opened 3 years ago

linuxbasic commented 3 years ago

The MQTT Port currently only supports QoS 0 (At Most Once Delivery Guarantee). The reason is that the used MQTT library doesn't allow the delay of the PUBACK message until the message is processed (see https://github.com/eclipse/paho.mqtt.c/issues/522).

We tried to patch the Cand rust library (see https://github.com/eclipse/paho.mqtt.c/compare/master...ce-rust:feat/ack-after-processing and https://github.com/eclipse/paho.mqtt.rust/compare/master...ce-rust:feat/ack-after-processing-0.8.0). This patch allowed us to block in the message callback till the message was successfully delivered over another port.

But then we run into another problem: The C library uses the same threads and mutex for all the connections. In the case there are multiple MQTT Ports, the blocking of the message handler results in the blocking of the other connections => Dead Lock.

We then tried to fork the process before opening the connections. This way the different ports would not share the same threads and mutex. We tried to implement that in the port crate (see https://github.com/ce-rust/cerk/tree/feat/mqtt-port-subprocess) but unfortunately, forking the process in that stage of the router lifecycle results in unexpected behavior (see https://thorstenball.com/blog/2014/10/13/why-threads-cant-fork/).

Possible solutions:

  1. find MQTT library that supports the delay of PUBACK
  2. write a MQTT library that supports the delay of PUBACK
  3. implement a Scheduler component for the router that is not based on threads but on processes. (requires major changes in cerk, e.g. BrokerEvent must be serializable.
Lazzaretti commented 3 years ago

Current workaround: use the cerk_port_mqtt_mosquitto port