vernemq / vernemq

A distributed MQTT message broker based on Erlang/OTP. Built for high quality & Industrial use cases. The VerneMQ mission is active & the project maintained. Thank you for your support!
https://vernemq.com
Apache License 2.0
3.22k stars 393 forks source link

Lost messages when client abnormal disconnection #768

Closed boboxiaodd closed 6 years ago

boboxiaodd commented 6 years ago

Environment

Expected behavior

Actual behaviour

Shutdown client all network connect, i check the logs , client is online publish some messages to client (Qos=2), i check the logs , client offline after 1 min

when client open network , it don't recv offline messages

ioolkos commented 6 years ago

Hi @boboxiaodd thanks a lot for opening the issue! I'm not 100% sure I understand the actual steps your're going through. As a first question, do you do the following 3 things to ensure that VerneMQ will store offline messages for you?

boboxiaodd commented 6 years ago

@ioolkos clean_session = false Qos = 2

i kill the client process , client is disconnect immediately (vmq-admin session ...) ,and send message to client, client wakeup , can recv all message

but client abnormal disconnection, offline message lost In 1 minutes

larshesel commented 6 years ago

Can you show us how to reproduce this with the mosquitto_pub/mosquitto_sub commands and list the complete list of steps how to do it?

boboxiaodd commented 6 years ago

@larshesel @ioolkos keepalive = 60s clean_session =false Qos=2 This is my test 1 :

00:00 Shutdown client all network connect, 00:00 vmq-admin session show this client is online 00:00 pushlish msg1 to client 00:01 pushlish msg2 to client 00:02 pushlish msg3 to client ... 00:10 Open client network , and client don't recv msg1,msg2,msg3

it's wrong

This is my test 2:

00:00 Shutdown client all network connect, 00:00 vmq-admin session show this client is online 00:00 pushlish msg1 to client 00:01 pushlish msg2 to client 00:02 pushlish msg3 to client .... 01:00 console show : @vmq_mqtt_fsm:connected:418 client ..... stopped due to keepalive expired 01:01 pushlish msg4 to client 01:02 pushlish msg5 to client ... 01:10 Open client network , and client recv all of msg1 - msg 5

it's correct

ioolkos commented 6 years ago

@boboxiaodd thanks, I can't really conclude what you are doing unfortunately. 2 things to note:

what do you mean by " 00:00 Shutdown client all network connect"?

boboxiaodd commented 6 years ago

@ioolkos Shutdown client all network connect is mean: client is phone , setting phone in Airplane mode ,or close wiffi and Mobile network

the server can't catch the network disconnect Is it the only when trigger keepalive expired , then vernemq save offline message ?

ioolkos commented 6 years ago

@boboxiaodd ok, thanks, got what you mean. For test 1, try to determine what the Client lib does. Are you sure it's setting up a new connection? Does the client do correct session state handling related to QoS 2? (can you tell us what Client lib it is?)

larshesel commented 6 years ago

You could also try to trace the client-id of the client which is supposed to receive the messages using vmq-admin trace client client-id=<theclientid>. That might shed some light on what's happening in the two scenarios.

boboxiaodd commented 6 years ago

@ioolkos @larshesel

this is test1 trace log ..

vmq-admin trace client client-id=c_41671194_744869294

#client is phone , setting phone in Airplane mode ,or close wifi and Mobile network#
#send msg1 , msg2 to client#
<7251.31844.3> MQTT SEND: CID: "c_41671194_744869294" PUBLISH(d0, q2, r0, m2, "u_41671194/message") with payload: msg1
<7251.31844.3> MQTT SEND: CID: "c_41671194_744869294" PUBLISH(d0, q2, r0, m3, "u_41671194/message") with payload: msg2

#client open wifi and Mobile network#
#client  reconnect to vernemq#
New session with PID <7251.32209.3> found for client "c_41671194_744869294"
<7251.32209.3> MQTT RECV: CID: "c_41671194_744869294" CONNECT(c: c_41671194_744869294, v: 4, u: 41671194, p: f26b4f027d5c0d64237e1e47489c941c, cs: 0, ka: 60)
<7251.32209.3> Calling auth_on_register({{58,37,40,29},64891},{[],<<"c_41671194_744869294">>},41671194,f26b4f027d5c0d64237e1e47489c941c,false) 
<7251.32209.3> Hook returned "ok"
<7251.32209.3> MQTT SEND: CID: "c_41671194_744869294" CONNACK(sp: 1, rc: 0)

#client don't recv msg1 and msg2#

this is test2 trace log:

vmq-admin trace client client-id=c_41671194_744869294
#client is phone , setting phone in Airplane mode ,or close wifi and Mobile network#
#send msg1 , msg2 to client#
<7251.32345.3> MQTT SEND: CID: "c_41671194_744869294" PUBLISH(d0, q2, r0, m3, "u_41671194/message") with payload: msg1
<7251.32345.3> MQTT SEND: CID: "c_41671194_744869294" PUBLISH(d0, q2, r0, m4, "u_41671194/message") with payload: msg2

#maybe retry publish msg#
<7251.32345.3> MQTT SEND: CID: "c_41671194_744869294" PUBLISH(d1, q2, r0, m3, "u_41671194/message") with payload: msg1
<7251.32345.3> MQTT SEND: CID: "c_41671194_744869294" PUBLISH(d1, q2, r0, m4, "u_41671194/message") with payload:msg2

<7251.32345.3> MQTT SEND: CID: "c_41671194_744869294" PUBLISH(d1, q2, r0, m3, "u_41671194/message") with payload: msg1
<7251.32345.3> MQTT SEND: CID: "c_41671194_744869294" PUBLISH(d1, q2, r0, m4, "u_41671194/message") with payload:msg2

<7251.32345.3> MQTT SEND: CID: "c_41671194_744869294" PUBLISH(d1, q2, r0, m3, "u_41671194/message") with payload: msg1
<7251.32345.3> MQTT SEND: CID: "c_41671194_744869294" PUBLISH(d1, q2, r0, m4, "u_41671194/message") with payload: msg2

<7251.32345.3> MQTT SEND: CID: "c_41671194_744869294" PUBLISH(d1, q2, r0, m3, "u_41671194/message") with payload: msg1

#keepalive expired stop client#
<7251.32345.3> Trace session for c_41671194_744869294 stopped

#client open wifi , and reconnect to vernemq#
New session with PID <7251.32414.3> found for client "c_41671194_744869294"
<7251.32414.3> MQTT RECV: CID: "c_41671194_744869294" CONNECT(c: c_41671194_744869294, v: 4, u: 41671194, p: f26b4f027d5c0d64237e1e47489c941c, cs: 0, ka: 60)
<7251.32414.3> Calling auth_on_register({{58,37,40,29},25398},{[],
                                                               <<"c_41671194_744869294">>},41671194,f26b4f027d5c0d64237e1e47489c941c,false) 
<7251.32414.3> Hook returned "ok"

<7251.32414.3> MQTT SEND: CID: "c_41671194_744869294" CONNACK(sp: 1, rc: 0)

#offline msg1 , msg2 recv..
<7251.32414.3> MQTT SEND: CID: "c_41671194_744869294" PUBLISH(d1, q2, r0, m2, "u_41671194/message") with payload: msg2
<7251.32414.3> MQTT SEND: CID: "c_41671194_744869294" PUBLISH(d1, q2, r0, m1, "u_41671194/message") with payload: msg1
<7251.32414.3> MQTT RECV: CID: "c_41671194_744869294" PUBREC(m1)
<7251.32414.3> MQTT SEND: CID: "c_41671194_744869294" PUBREL(m1)
<7251.32414.3> MQTT RECV: CID: "c_41671194_744869294" PUBREC(m2)
<7251.32414.3> MQTT SEND: CID: "c_41671194_744869294" PUBREL(m2)
<7251.32414.3> MQTT RECV: CID: "c_41671194_744869294" PUBCOMP(m1)
<7251.32414.3> MQTT RECV: CID: "c_41671194_744869294" PUBCOMP(m2)

Two test the only difference is attach keepalive expired

chriswue commented 6 years ago

Out of interest I tried this:

On host1:

u@host1:~/$ mosquitto_sub -h host2 -q 1 -c -i "my_client" -t "foo"

Then unplug network cable on host1

On host2:

u@host2:~/$ mosquitto_pub -q 1 -t "foo" -m "1"
u@host2:~/$ mosquitto_pub -q 1 -t "foo" -m "2"
u@host2:~/$ mosquitto_pub -q 1 -t "foo" -m "3"

u@host2:~/$ mosquitto_sub -q 1 -c -i "my_client" -t "foo"
1
2
3

Tracelog:

u@host2:~/$ docker exec broker vmq-admin trace client client-id="my_client"
No sessions found for client "my_client"
New session with PID <7251.473.0> found for client "my_client"
<7251.473.0> MQTT RECV: CID: "my_client" CONNECT(c: my_client, v: 3, u: undefined, p: undefined, cs: 0, ka: 60)
<7251.473.0> MQTT SEND: CID: "my_client" CONNACK(sp: 1, rc: 0)
<7251.473.0> MQTT RECV: CID: "my_client" SUBSCRIBE(m1) with topics:
    q:1, t: "foo"
<7251.473.0> Calling auth_on_subscribe(undefined,{[],<<"my_client">>}) with topics:
    q:1, t: "foo"
<7251.473.0> Hook returned "ok"

# here the network cable get unplugged from host1 on which the client is running
# publish 3 messages with the network not connected

<7251.473.0> MQTT SEND: CID: "my_client" SUBACK(m1, qt[1])
<7251.473.0> MQTT SEND: CID: "my_client" PUBLISH(d0, q1, r0, m1, "foo") with payload:
    1
<7251.473.0> MQTT SEND: CID: "my_client" PUBLISH(d0, q1, r0, m2, "foo") with payload:
    2
<7251.473.0> MQTT SEND: CID: "my_client" PUBLISH(d0, q1, r0, m3, "foo") with payload:
    3

# here client reconnects (from a different host since network to the original host is still down)

New session with PID <7251.509.0> found for client "my_client"
<7251.509.0> MQTT RECV: CID: "my_client" CONNECT(c: my_client, v: 3, u: undefined, p: undefined, cs: 0, ka: 60)
<7251.473.0> Trace session for my_client stopped
<7251.509.0> MQTT SEND: CID: "my_client" CONNACK(sp: 1, rc: 0)
<7251.509.0> MQTT SEND: CID: "my_client" PUBLISH(d1, q1, r0, m3, "foo") with payload:
    3
<7251.509.0> MQTT SEND: CID: "my_client" PUBLISH(d1, q1, r0, m2, "foo") with payload:
    2
<7251.509.0> MQTT SEND: CID: "my_client" PUBLISH(d1, q1, r0, m1, "foo") with payload:
    1
<7251.509.0> MQTT RECV: CID: "my_client" SUBSCRIBE(m1) with topics:
    q:1, t: "foo"
<7251.509.0> Calling auth_on_subscribe(undefined,{[],<<"my_client">>}) with topics:
    q:1, t: "foo"
<7251.509.0> Hook returned "ok"
<7251.509.0> MQTT SEND: CID: "my_client" SUBACK(m1, qt[1])
<7251.509.0> MQTT RECV: CID: "my_client" PUBACK(m1)
<7251.509.0> MQTT RECV: CID: "my_client" PUBACK(m2)
<7251.509.0> MQTT RECV: CID: "my_client" PUBACK(m3)

The queued messages get delivered without the keep-alive expiring in between. Above is QoS level 1, but repeated with level 2 yields the same result. Also I noticed mosquitto defaults to protocol v3 (3.1) - repeating the exercise with v4 (3.11) again yields the same result.

Only other difference I can see: In my experiment, once the client re-connects, there is this in the log:

Trace session for my_client stopped

Logged for the previous client session immediately after the new session has started. Not sure if you removed that during copy-n-paste or if there is something missing. I'm using the current docker image which is 1.4.1

boboxiaodd commented 6 years ago

@chriswue i did't see "Trace session for my_client stopped" maybe the reason is i set:

allow_multiple_sessions = on
chriswue commented 6 years ago

@boboxiaodd: What is queue_deliver_mode set to? The way I understand allow_multiple_sessions is that vernemq will distribute messages to both client sessions. In your first case the first session is still active since the keep alive hasn't expired. I suspect that the messages sent while only the first session was active are allocated to that session.

What happens in the first case after the keep-alive timeout - will the first session eventually disappear?

I think you should probably just disable that setting. Form the docs:

Sometimes consumers get overwhelmed by the number of messages they receive. VerneMQ can loadbalance between multiple consumer instances subscribed to the same topic with the same ClientId.

Given that your clients are mobile phones this seems an unlikely use-case (since you probably can't push a huge amount of messages to a phone client anyway)

boboxiaodd commented 6 years ago

@chriswue queue_deliver_mode is default

#queue_deliver_mode = balance

In first case after keepalive timeout , and reconnect to vernemq can receive offline msg

i set allow_multiple_sessions = true to kickout other device ,because client don't know what's reason of disconnect by sever

larshesel commented 6 years ago

If you're using allow_multiple_session=on then I think the behaviour makes sense. In the case where the session expires you have only one session for the client id at a time. When the session expires the offline messages are stored in the queue. In the other case you have two sessions for the same client-id at the same time and the offline messages are likely still being held by the first session.

boboxiaodd commented 6 years ago

but new session connected , old connection did't tagger keepalive timeout, then msg lost.

chriswue commented 6 years ago

I can confirm that:

vmq trace:

# subscribed client
Starting trace for 1 existing sessions for client "my_client" with PIDs
    [<7251.446.0>]
<7251.446.0> MQTT RECV: CID: "my_client" PINGREQ()
<7251.446.0> MQTT SEND: CID: "my_client" PINGRESP()

# unplug network cable to client   
# publish some messages
<7251.446.0> MQTT SEND: CID: "my_client" PUBLISH(d0, q2, r0, m1, "foo") with payload:
    1
<7251.446.0> MQTT SEND: CID: "my_client" PUBLISH(d0, q2, r0, m2, "foo") with payload:
    2
<7251.446.0> MQTT SEND: CID: "my_client" PUBLISH(d0, q2, r0, m3, "foo") with payload:
    3

# start new subscriber client with same client_id and same topic subscription

New session with PID <7251.532.0> found for client "my_client"
<7251.532.0> MQTT RECV: CID: "my_client" CONNECT(c: my_client, v: 3, u: undefined, p: undefined, cs: 0, ka: 60)
<7251.532.0> MQTT SEND: CID: "my_client" CONNACK(sp: 1, rc: 0)
<7251.532.0> MQTT RECV: CID: "my_client" SUBSCRIBE(m1) with topics:
    q:2, t: "foo"
<7251.532.0> Calling auth_on_subscribe(undefined,{[],<<"my_client">>}) with topics:
    q:2, t: "foo"
<7251.532.0> Hook returned "ok"
<7251.532.0> MQTT SEND: CID: "my_client" SUBACK(m1, qt[2])

# first attempt to deliver messages to stale session (not timed out yet)

<7251.446.0> MQTT SEND: CID: "my_client" PUBLISH(d1, q2, r0, m1, "foo") with payload:
    1
<7251.446.0> MQTT SEND: CID: "my_client" PUBLISH(d1, q2, r0, m2, "foo") with payload:
    2
<7251.446.0> MQTT SEND: CID: "my_client" PUBLISH(d1, q2, r0, m3, "foo") with payload:
    3

# second attempt to deliver messages to stale session (not timed out yet)

<7251.446.0> MQTT SEND: CID: "my_client" PUBLISH(d1, q2, r0, m1, "foo") with payload:
    1
<7251.446.0> MQTT SEND: CID: "my_client" PUBLISH(d1, q2, r0, m2, "foo") with payload:
    2
<7251.446.0> MQTT SEND: CID: "my_client" PUBLISH(d1, q2, r0, m3, "foo") with payload:
    3

# stale session has timed out due to keep alive expired:

<7251.446.0> Trace session for my_client stopped

# messages are obviously lost now 

<7251.532.0> MQTT RECV: CID: "my_client" PINGREQ()
<7251.532.0> MQTT SEND: CID: "my_client" PINGRESP()
<7251.532.0> MQTT RECV: CID: "my_client" PINGREQ()
<7251.532.0> MQTT SEND: CID: "my_client" PINGRESP()

One could argue that this is a bug - while allow_multiple_sessions is a non-standard feature and as such its exact behaviour not defined by the standard the current implementation can easily lead to lost messages:

larshesel commented 6 years ago

Hi,

I think there's an issue there alright, though not sure exactly what the problem is yet. Everything works if I reconnect the client (same client-id) after the first has timed out - then the messages are delivered. If another client with the same client-id is online while the first is timing out then it (or a later client with the same client-id) won't receive the messages.

Anyway, that said, I'd recommend using shared subscriptions for client load-balancing instead of multiple-sessions as it's a standard feature (albeit backported from MQTTv5), is simpler to understand (as there's a spec for it) and scales better (each subscriber has it's own queue process).

chriswue commented 5 years ago

@larshesel: Just as a last comment to this:

I think it would be good to at least update the documentation for the option to label it with a clear warning that if it is being used and multiple clients are connected that in the event of one client becoming disconnected for some reason message loss is inevitable and that therefor this setup should only ever be used with QoS 0 messages.