hyper-systems / ocaml-mqtt

MQTT client for OCaml
https://hyper-systems.github.io/ocaml-mqtt
19 stars 5 forks source link

Passing client to on_message? #21

Open SkyWriter opened 1 month ago

SkyWriter commented 1 month ago

Hey, thanks for the wonderful work of picking up the abandoned repository! I'm really enjoying coding one of my hobby projects with OCaml.

So here's the thought:

module C = Mqtt_client

let sub_example () =
  let on_message ~topic payload = Lwt_io.printlf "%s: %s" topic payload in
  let%lwt () = Lwt_io.printl "Starting subscriber..." in
  let%lwt client = C.connect ~on_message ~id:"client-1" ~port [ host ] in
  C.subscribe [ ("topic-1", C.Atmost_once) ] client

Current API forces you to provide the on_message handler during connection, and this indeed seems to be a reasonable way to do it. However, I find myself in a situation where I need to use client in the on_message handler e.g. to reply back.

What do you think of adding an argument named client to the on_message handler function? This way one won't have to resort to storing client somewhere in the global state.

If you agree with the concept I might take a stab at implementing it.

rizo commented 1 month ago

Hey! I think it's a good addition.

It would also turn on_message consistent with on_disconnect and on_error, both of which get the client as the first non-labeled[^1] argument. It would also scale nicely to adding other hooks like on_publish, on_subscribe, etc.

The only downside is the fact that it introduces a breaking change, but I do think it's (1) worth it and (2) unavoidable if we want to follow the design described above.

Your contribution for this would be very welcome! :)

[^1]: My preference would be to have an unlabeled client argument because (1) the explicit type makes the argument fully unambiguous and (2) when people don't need the client they can do fun _c ~topic ... instead of fun ~client:_ ~topic ....

rizo commented 1 month ago

The implementation should be fairly straightforward, but, if you decide to implement it, don't hesitate to ask for help or pointers.

SkyWriter commented 1 month ago

I'm pretty sure I'm going to approach it in my spare time.

There's one problem though: when going down the path of storing the client instance after connection in a mutable ref and then later using it to send messages in the on_message handler I've stumbled upon a deadlock.

The reason for it seems to be the following.

When messages are processed the ordinary way every Atleast_once message is acknowledged by our library (https://github.com/hyper-systems/ocaml-mqtt/blob/ee6f43c284b35e7d5f83012fd97daffa99302581/lib/mqtt_client/Mqtt_client.ml#L90).

However, when I publish in my on_message handler with Atleast_once semantics our library, in turn, awaits for yet another ack (https://github.com/hyper-systems/ocaml-mqtt/blob/ee6f43c284b35e7d5f83012fd97daffa99302581/lib/mqtt_client/Mqtt_client.ml#L306). This time from the server. So here we are both stuck waiting for ack.

I got myself around this by scheduling publishing for later by mean of Lwt.async. I'm somewhat an OCaml newbie, so I'm not sure if that's the right way to approach things, but it worked.

All in all, it seems that exposing client in that indeterminate state is unsafe, albeit useful. So what I think we could do is change the return type of on_message from unit Lwt.t to (Mqtt_client.t -> unit Lwt.t) option Lwt.t to return an optional action to perform right after we finish handling the message.

Regretfully this still makes on_message different from on_disconnect and on_error in terms of signature.

What are your thoughts?

P.S. Sorry for a long-read!