ntex-rs / ntex-mqtt

MQTT Client/Server framework for v5 and v3.1.1 protocols
Apache License 2.0
318 stars 68 forks source link

Issues with keep-alive after updating from 0.9 to 0.12 #161

Closed svanharmelen closed 11 months ago

svanharmelen commented 11 months ago

Hello 👋🏻

We've been using this crate for a little while now and thought it was time to update from 0.9 to 0.12. But ever since we did, we noticed our clients are reconnecting every 15 seconds. To make sure the "problem" still exists in the latest version we pulled this repository and used the crate from a local path, but this still gives the same disconnects.

We did some tests and enabled TRACE logging which shows us the following interesting bits:

2023-12-08T12:17:18Z TRACE] Starting mqtt v5 handshake
[2023-12-08T12:17:18Z TRACE] Keep alive 1: 10 // We've added this line to see if the keep-alive of the client was read correctly
[2023-12-08T12:17:18Z INFO ] new connection from client-id=be6b355e-2723-4dc5-814a-ab76e8bbd503
[2023-12-08T12:17:18Z TRACE] Sending: ConnectAck {
        session_present: false,
        reason_code: Success,
        session_expiry_interval_secs: None,
        receive_max: 65535,
        max_qos: AtLeastOnce,
        max_packet_size: None,
        assigned_client_id: None,
        topic_alias_max: 32,
        retain_available: true,
        wildcard_subscription_available: true,
        subscription_identifiers_available: true,
        shared_subscription_available: true,
        server_keepalive_sec: None,
        response_info: None,
        server_reference: None,
        auth_method: None,
        auth_data: None,
        reason_string: None,
        user_properties: [],
    }
[2023-12-08T12:17:18Z TRACE] Keep alive 2: 15 - None // This is again a line we added ourselves for additional debugging
[2023-12-08T12:17:18Z TRACE] Connection handshake succeeded
[2023-12-08T12:17:18Z TRACE] Connection handler is created, starting dispatcher
[2023-12-08T12:17:18Z DEBUG] Start keep-alive timer Seconds(15)

Now after this snippet you see a lot of publish packages coming in (we are testing this with a test client which continuously sends data) and then after 15 seconds we see this part:

[2023-12-08T12:19:29Z TRACE] Keep-alive error, stopping dispatcher
[2023-12-08T12:19:29Z TRACE] Dispatch v5 packet: DispatchItem::KeepAliveTimeout

And in between the publish messages we saw 2 of these:

[2023-12-08T12:19:16Z TRACE] Dispatch v5 packet: DispatchItem::Item((PingRequest, 0))
...
[2023-12-08T12:19:29Z TRACE] Dispatch v5 packet: DispatchItem::Item((PingRequest, 0))

The client we are using to test this with, didn't change after we updated this crate from 0.9 to 0.12 so it's either caused by some (default) config that is changed, updated or introduced in 0.12, or its some kind of bug.

I will continue to debug this one myself as well, but I'm hoping this might sound familiar or sound like something you may have seen before? Any suggestions that might push us in the right direction in order to resolve the disconnects are very much appreciated.

Thanks! Sander

fafhrd91 commented 11 months ago

do you use 0.12.14? could you create reproducible example, I cannot reproduce with 0.12.14

svanharmelen commented 11 months ago

Yes, I'm using 0.12.14. If I switch back to 0.12.13 it works as expected so I'm currently looking at the changes between those versions. Being able to share a reproducible example will take some effort as I would need to extract a lot of bits and pieces from our code base to compile a standalone example.

So I'm trying to see if I can understand/follow the flag logic you tweaked and maybe spot something in there. Which I think I did... Let me open a PR do I can show you my suggested changes and if needed discuss them.

svanharmelen commented 11 months ago

@fafhrd91 please see #162 for a possible fix (or at least a starting point towards a fix 😉)

fafhrd91 commented 11 months ago

do you use client or client and server?

svanharmelen commented 11 months ago

I only use the server and am testing with the Paho MQTT client (in this case the one written in Go: github.com/eclipse/paho.golang)

fafhrd91 commented 11 months ago

could you post configuration for your mqttserver

svanharmelen commented 11 months ago

This is the main setup:

        let server = Server::build()
            .bind("mqtt", "127.0.0.1:8883, move |_| {
                chain_factory(ntex_tls::rustls::Acceptor::new(tls_acceptor.clone()))
                    .map_err(|_| MqttError::Service(ServerError {}))
                    .and_then(
                        MqttServer::new()
                            .v3(v3::MqttServer::new({
                                let session = session.clone();
                                move |handshake: v3::Handshake| {
                                    handshake_v3(handshake, session.clone())
                                }
                            })
                            .control(control_service_factory_v3())
                            .inflight(MQTT_MAX_IN_FLIGHT)
                            .inflight_size(MQTT_MAX_IN_FLIGHT_SIZE)
                            .publish(fn_factory_with_config(
                                |session: v3::Session<ClientSession>| {
                                    Ready::Ok::<_, ServerError>(fn_service(move |req| {
                                        publish_v3(session.clone(), req)
                                    }))
                                },
                            )))
                            .v5(v5::MqttServer::new({
                                let session = session.clone();
                                move |handshake: v5::Handshake| {
                                    handshake_v5(handshake, session.clone())
                                }
                            })
                            .control(control_service_factory_v5())
                            .max_inflight_size(MQTT_MAX_IN_FLIGHT_SIZE)
                            .receive_max(MQTT_MAX_IN_FLIGHT)
                            .publish(fn_factory_with_config(
                                |session: v5::Session<ClientSession>| {
                                    Ready::Ok::<_, ServerError>(fn_service(move |req| {
                                        publish_v5(session.clone(), req)
                                    }))
                                },
                            ))),
                    )
            })?

And these are the control factories:

pub fn control_service_factory_v3() -> impl ServiceFactory<
    v3::ControlMessage<ServerError>,
    v3::Session<ClientSession>,
    Response = v3::ControlResult,
    Error = ServerError,
    InitError = ServerError,
> {
    fn_factory_with_config(|_: v3::Session<ClientSession>| {
        Ready::Ok(fn_service(move |control| match control {
            v3::ControlMessage::Error(e) => Ready::Ok(e.ack()),
            v3::ControlMessage::ProtocolError(e) => Ready::Ok(e.ack()),
            v3::ControlMessage::Ping(p) => Ready::Ok(p.ack()),
            v3::ControlMessage::Disconnect(d) => Ready::Ok(d.ack()),
            v3::ControlMessage::Subscribe(mut s) => {
                s.iter_mut().for_each(|mut s| s.confirm(s.qos()));
                Ready::Ok(s.ack())
            }
            v3::ControlMessage::Unsubscribe(s) => Ready::Ok(s.ack()),
            v3::ControlMessage::Closed(c) => Ready::Ok(c.ack()),
            v3::ControlMessage::PeerGone(c) => Ready::Ok(c.ack()),
        }))
    })
}

pub fn control_service_factory_v5() -> impl ServiceFactory<
    v5::ControlMessage<ServerError>,
    v5::Session<ClientSession>,
    Response = v5::ControlResult,
    Error = ServerError,
    InitError = ServerError,
> {
    fn_factory_with_config(|_: v5::Session<ClientSession>| {
        Ready::Ok(fn_service(move |control| match control {
            v5::ControlMessage::Auth(a) => Ready::Ok(a.ack(v5::codec::Auth::default())),
            v5::ControlMessage::Error(e) => {
                Ready::Ok(e.ack(v5::codec::DisconnectReasonCode::UnspecifiedError))
            }
            v5::ControlMessage::ProtocolError(e) => Ready::Ok(e.ack()),
            v5::ControlMessage::Ping(p) => Ready::Ok(p.ack()),
            v5::ControlMessage::Disconnect(d) => Ready::Ok(d.ack()),
            v5::ControlMessage::Subscribe(mut s) => {
                s.iter_mut().for_each(|mut s| s.confirm(s.options().qos));
                Ready::Ok(s.ack())
            }
            v5::ControlMessage::Unsubscribe(s) => Ready::Ok(s.ack()),
            v5::ControlMessage::Closed(c) => Ready::Ok(c.ack()),
            v5::ControlMessage::PeerGone(c) => Ready::Ok(c.ack()),
        }))
    })
}

I cannot share the handles and publish functions without first having to refactor them... Does this help already?

fafhrd91 commented 11 months ago

I will need some time for investigation. do you set keep-alive in handshake handler?

svanharmelen commented 11 months ago

do you set keep-alive in handshake handler? not sure what that means, can you show me what that would look like?

Our handler hasn't changed between versions, so did some options change that I have to set?

fafhrd91 commented 11 months ago

I am about this field ConnectAck::server_keepalive_sec, but I see you don't set it. this could the problem, I will try to reproduce the problem

svanharmelen commented 11 months ago

I just tested with this in our handler, but it gives the same results:

    let keep_alive = handshake.packet().keep_alive;
    Ok(handshake.ack(session).keep_alive(keep_alive))
fafhrd91 commented 11 months ago

could you also post initial Connect packet

svanharmelen commented 11 months ago

Yes:

[2023-12-08T16:49:05Z INFO ] handshake packet: Connect {
        clean_start: true,
        keep_alive: 10,
        session_expiry_interval_secs: 0,
        auth_method: None,
        auth_data: None,
        request_problem_info: true,
        request_response_info: false,
        receive_max: None,
        topic_alias_max: 0,
        user_properties: [],
        max_packet_size: None,
        last_will: None,
        client_id: "be6b355e-2723-4dc5-814a-ab76e8bbd503",
        username: Some(
            "username",
        ),
        password: Some(
            b"password",
        ),
    }
fafhrd91 commented 11 months ago

i found bug, will prepare fix by tomorrow

svanharmelen commented 11 months ago

Ah, cool! Curious to see (and understand) the root cause and the fix 🙂

Thanks for your help so far!

fafhrd91 commented 11 months ago

0.12.15 is released, should fix this issue

svanharmelen commented 11 months ago

Thanks @fafhrd91! Just tested v0.12.15 and things work as expected again 🎉