nanomq / nanomq

An ultra-lightweight and blazing-fast Messaging broker/bus for IoT edge & SDV
https://nanomq.io
MIT License
1.6k stars 193 forks source link

QoS 1 messages lost when bridging connection is down a few seconds #1873

Open thedmi opened 2 months ago

thedmi commented 2 months ago

Describe the bug

We're running NanoMQ in a bridged setup where topics prefixed with cloud/ are forwarded to our cloud MQTT broker (managed EMQX cloud instance). In case the network connection dies, there is a period during which messages are lost, even though they specify QoS 1. Specifically, messages are lost while the connection has not yet been discovered to be down. Once the bridge connection is found to be down, messages are properly stored and then transmitted once the connection is restored.

Setup:

Publisher - NanoMQ - EMQX Cloud - Consumer

Expected behavior

QoS 1 messages that reach NanoMQ while the bridge connection is nonfunctional are stored. Later, when the bridge connection becomes available again, these messages are transmitted.

Actual Behavior

Messages are lost. The following log entry is logged:

WARN  /nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:1166 mqtt_ctx_cancel_send: Warning : msg 18 lost due to timeout!

To Reproduce

  1. Start setup with all connections working
  2. Disconnect network between NanoMQ and EMQX Cloud
  3. Within a few seconds, publish a message from Publisher to NanoMQ that matches the bridge forward config
  4. Wait until the NanoMQ logs indicate that the bridge connection is down
  5. Restore the network connection between NanoMQ and EMQX Cloud

Environment Details

JaylinYu commented 2 months ago

Thanks for your detailed issue elaboration. This is not a bug, but an internal design by far, and we are considering to expose more params for user to optimize.

The log WARN /nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:1166 mqtt_ctx_cancel_send: Warning : msg 18 lost due to timeout! is produced by a newly added feature: called timeout cancel of bridging aio.Which means the maximum wait time for each QoS msg acknowledgment. But it doesn't actually mean the msg is lost; just means it is tired of waiting for the ack from the remote broker. Before 0.22.4, there is no such feature, you will only see bridging to xxxxx aio busy! msg lost! Ctx: xx. because we set 3 seconds wait time for each bridging aio here: nng_aio_set_timeout(node->bridge_aio[index], 3000); (broker.c line 289).

To explain further, also give you a hints about how to finely tuned it: QoS msg will go into the inflight window (max_send_queue_len) first if you are dealing with a busy network, then silently wait for its time to come. During waiting, It will be dropped only if the inflight window is full and new QoS msg keep comming. Once it exceeds the max wait time of QoS Ack, it will be excluded from a hashmap (QoS awaiting queue) which is different from inflight window. but it only means client will not try to resend it anymore.

Therefore, by current internal design, bridging client(nanosdk) will only resend the msg within the max timeout period, by default is 3 seconds, will expose it to be configurable. only log like ctx is already cached! drop msg and pipe is busy and lmq is full indicates a msg lost.

JaylinYu commented 2 months ago

Fundamentally speaking, it is a duplicate with https://github.com/nanomq/nanomq/issues/1864. Will bring back the QoS msg retry and expose more configurable params of bridging in 0.22.8

thedmi commented 2 months ago

@JaylinYu Thanks for the insights. Could you clarify which configuration I need to ensure QoS 1 messages are always retried in broker connections? I still don't understand where the error is in my configuration.

JaylinYu commented 2 months ago

The QoS msg retry is not enabled yet. For now, you can just increase max_send_queue_len

now you can have a try on 0.22.7 with https://nanomq.io/docs/en/latest/config-description/bridges.html#mqtt-bridges-cache

thedmi commented 2 months ago

Thanks for your feedback and the suggested config change. I now retried with 0.22.7 and max_send_queue_len set, but the message is still lost. Some new error logs "resending..." / "actually resending..." appeared, but the message is never transmitted after the connection is reestablished.

broker-1  | 2024-08-26 07:43:10 [33] INFO  /nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:664 mqtt_timer_cb: Send pingreq (sock0x952a18)(10000ms)
broker-1  | 2024-08-26 07:43:20 [32] INFO  /nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:664 mqtt_timer_cb: Send pingreq (sock0x952a18)(10000ms)
broker-1  | 2024-08-26 07:43:30 [33] INFO  /nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:664 mqtt_timer_cb: Send pingreq (sock0x952a18)(10000ms)
broker-1  | 2024-08-26 07:43:40 [27] INFO  /nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:664 mqtt_timer_cb: Send pingreq (sock0x952a18)(10000ms)

### DISCONNECT HERE ###

broker-1  | 2024-08-26 07:43:45 [33] ERROR /nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:682 mqtt_timer_cb: resending QoS msg 5
broker-1  | 2024-08-26 07:43:45 [33] ERROR /nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:687 mqtt_timer_cb: actually resending QoS msg 5
broker-1  | 2024-08-26 07:43:50 [29] ERROR /nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:682 mqtt_timer_cb: resending QoS msg 5
broker-1  | 2024-08-26 07:43:50 [29] ERROR /nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:687 mqtt_timer_cb: actually resending QoS msg 5

### RECONNECT HERE ###

broker-1  | 2024-08-26 07:43:54 [43] INFO  /nanomq/nng/src/mqtt/transport/tls/mqtt_tls.c:667 mqtts_tcptran_pipe_recv_cb: aio result Connection shutdown
broker-1  | 2024-08-26 07:43:54 [43] WARN  /nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:784 mqtt_recv_cb: MQTT client recv error 136!
broker-1  | 2024-08-26 07:43:54 [43] INFO  /nanomq/nanomq/apps/broker.c:341 server_cb: RECV aio result: 31
broker-1  | 2024-08-26 07:43:54 [47] INFO  /nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:636 mqtt_timer_cb: Timer aio error!
broker-1  | 2024-08-26 07:43:54 [43] INFO  /nanomq/nanomq/apps/broker.c:354 server_cb: bridge connection closed with reason 31
broker-1  | 
broker-1  | 2024-08-26 07:43:54 [43] INFO  /nanomq/nanomq/apps/broker.c:341 server_cb: RECV aio result: 7
broker-1  | 2024-08-26 07:43:54 [50] WARN  /nanomq/nanomq/bridge.c:1085 bridge_tcp_disconnect_cb: bridge client disconnected! RC [139] 
broker-1  | 
broker-1  | 2024-08-26 07:43:54 [26] WARN  /nanomq/nng/src/mqtt/transport/tls/mqtt_tls.c:1606 mqtts_tcptran_ep_connect: reconnect in 792
broker-1  | 2024-08-26 07:43:55 [40] INFO  /nanomq/nng/src/mqtt/transport/tls/mqtt_tls.c:488 mqtts_tcptran_pipe_nego_cb: Set max packet size as 1048576
broker-1  | 2024-08-26 07:43:55 [40] INFO  /nanomq/nanomq/bridge.c:1040 bridge_tcp_connect_cb: Bridge [tls+mqtt-tcp://REDACTED:8883] connected! RC [0]
broker-1  | 2024-08-26 07:43:55 [40] INFO  /nanomq/nanomq/bridge.c:1052 bridge_tcp_connect_cb: Bridge client subscribed topic REDACTED (qos 1 rap 0 rh 0).
broker-1  | 2024-08-26 07:43:55 [34] INFO  /nanomq/nanomq/apps/broker.c:361 server_cb: bridge client is connected!
broker-1  | 2024-08-26 07:43:55 [40] INFO  /nanomq/nng/src/core/dialer.c:403 dialer_connect_cb: dialer connect cb rv0 (nil)
broker-1  | 2024-08-26 07:43:55 [43] INFO  /nanomq/nanomq/bridge.c:310 send_callback: bridge: subscribe aio result 0
broker-1  | 2024-08-26 07:43:55 [43] INFO  /nanomq/nanomq/bridge.c:312 send_callback: bridge: suback code 1 
broker-1  | 2024-08-26 07:44:05 [33] INFO  /nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:664 mqtt_timer_cb: Send pingreq (sock0x952a18)(10000ms)
broker-1  | 2024-08-26 07:44:15 [48] INFO  /nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:664 mqtt_timer_cb: Send pingreq (sock0x952a18)(10000ms)
(continues...)
JaylinYu commented 1 month ago

Hi @thedmi Thanks for your feedback. sorry for my miss. Those new error logs are for debugging purposes, which means it tries to resend msgs. But, the un-acked QoS msgs are indeed lost if bridging recreates a new TCP connection. This is why you see no msg is re-transmitted after the connection is restored. 0.22.7 only resend qos msg within one connection lifecycle.

Now I have move the msg cache to socket level in 0.22.8, you shall be able to see msg retransmission after a bridging connection is restored.

sevi-l commented 1 month ago

I'm using version 0.22.8-Full. It seems this bug still exists.

2024-09-11 12:10:54 [17] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:666: Send pingreq (sock0x6440768aff28)(10000ms)
2024-09-11 12:10:57 [49] WARN  /home/runner/work/nanomq/nanomq/nng/src/sp/transport/mqtt/broker_tcp.c:679: nni aio recv error!! Connection shutdown
2024-09-11 12:10:57 [49] WARN  /home/runner/work/nanomq/nanomq/nng/src/sp/transport/mqtt/broker_tcp.c:963: tcptran_pipe_recv_cb: recv_error rv: 139
2024-09-11 12:11:04 [39] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:689: resending QoS msg 10
2024-09-11 12:11:09 [26] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:689: resending QoS msg 10
2024-09-11 12:11:11 [52] WARN  /home/runner/work/nanomq/nanomq/nng/src/sp/transport/mqtt/broker_tcp.c:307: nego aio error: Connection shutdown
2024-09-11 12:11:11 [52] ERROR /home/runner/work/nanomq/nanomq/nng/src/sp/transport/mqtt/broker_tcp.c:465: connect nego error rv:(31)
2024-09-11 12:11:13 [73] WARN  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:1175: Warning : QoS action of msg 10 is canceled due to timeout!

message 10 is never sent out, even if the bridge comes back online. Also, does it make sense to attempt sending if the connection to the bridge is lost?

in the example it is a qos 2 message

JaylinYu commented 1 month ago

I'm using version 0.22.8-Full. It seems this bug still exists.


2024-09-11 12:10:54 [17] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:666: Send pingreq (sock0x6440768aff28)(10000ms)

2024-09-11 12:10:57 [49] WARN  /home/runner/work/nanomq/nanomq/nng/src/sp/transport/mqtt/broker_tcp.c:679: nni aio recv error!! Connection shutdown

2024-09-11 12:10:57 [49] WARN  /home/runner/work/nanomq/nanomq/nng/src/sp/transport/mqtt/broker_tcp.c:963: tcptran_pipe_recv_cb: recv_error rv: 139

2024-09-11 12:11:04 [39] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:689: resending QoS msg 10

2024-09-11 12:11:09 [26] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:689: resending QoS msg 10

2024-09-11 12:11:11 [52] WARN  /home/runner/work/nanomq/nanomq/nng/src/sp/transport/mqtt/broker_tcp.c:307: nego aio error: Connection shutdown

2024-09-11 12:11:11 [52] ERROR /home/runner/work/nanomq/nanomq/nng/src/sp/transport/mqtt/broker_tcp.c:465: connect nego error rv:(31)

2024-09-11 12:11:13 [73] WARN  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:1175: Warning : QoS action of msg 10 is canceled due to timeout!

message 10 is never sent out, even if the bridge comes back online. Also, does it make sense to attempt sending if the connection to the bridge is lost?

in the example it is a qos 2 message

It says loud and clear, msg 10 is canceled. Plz read through Docs and set timeout interval correspondingly.

https://nanomq.io/docs/en/latest/config-description/bridges.html#mqtt-bridges-cache

does it make sense to attempt sending if the connection to the bridge is lost?

Not sure what you mean. if you prefer not to resend the msg after bridge is lost. I disabled the QoS msg retry before due to same reason as this. But it cannot satisfy everyone. Or you think your example is trying to resend msg While bridge channel is disconnected? This is not true, your log only says local clients are closing connections. Not the bridge.

sevi-l commented 1 month ago

I analyzed the problem further. Since we need a broker that can survive short/long network disconnection, I'm breaking the network connection during the test.

  1. NanoMQ running on an Iot device. Bridge is connected to a central broker over the network, messages flow directly from localhost into nanoMQ and are forwarded via the bridge over the network config:

    bridges.mqtt.emqx1 = {
    server = "mqtt-tcp://central_broker:1884"    # MQTT server address
    proto_ver = 4                           # MQTT protocol version
    clientid = "bridge_client"              # Client ID for the bridge
    keepalive = "10s"                       # Ping interval for the bridge
    clean_start = false                     # Clean start flag for the bridge
    
    forwards = [                            # Topics that need to be forwarded to the remote MQTT server
    {
      remote_topic = ""                   # empty string forwards topics with same topic name
      local_topic = "#"
    }
    ]  
    
    max_parallel_processes = 2              # Maximum number of parallel processes for handling outstanding requests
    max_send_queue_len = 32                 # Maximum number of message send queue length
    max_recv_queue_len = 128                # Maximum number of message receive queue length
    
    resend_interval = 100 
    resend_wait = 20000
    cancel_timeout  = 60000
    }
  2. Network connection to the central broker (via bridge) is interrupted. Messages from localhost are still flowing in, but cannot be forwarded to the bridge. The keepalive is at 10 seconds (see config). When a message arrives while the bridge is still unaware that the connection to the server is lost, it immediately tries to resend the messages over the unavailable bridge connection.
    resending QoS msg 10 per documentation, this should be preventable by setting resend_wait to a value higher than keepalive, however it does not have an impact. it starts immediately.

  3. With a large cancel_timeout the bridge is stuck in a resend loop and never finds out that the central broker is not reachable, neither by pinging it (which it does not while retrying the messages) nor by recognizing that connection which is used to transmit the message is closed.

sevi-l commented 1 month ago

Setting the 'cancel_timeout' to 60 seconds.

This is the behavior: The bridge does not recognize that the connection has been lost while trying to resend, once the messages are cancelled, it resumes pinging and recognizes the disconnect. At this point the messages are gone and not resent upon reconnect

2024-09-13 07:43:35 [36] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:666: Send pingreq (sock0x5905115d0138)(10000ms)
2024-09-13 07:43:45 [39] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:666: Send pingreq (sock0x5905115d0138)(10000ms)
2024-09-13 07:43:55 [44] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:666: Send pingreq (sock0x5905115d0138)(10000ms)
2024-09-13 07:44:00 [16] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:689: resending QoS msg 1
2024-09-13 07:44:05 [19] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:689: resending QoS msg 1
2024-09-13 07:44:10 [17] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:689: resending QoS msg 1
2024-09-13 07:44:15 [20] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:689: resending QoS msg 1
2024-09-13 07:44:20 [11] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:689: resending QoS msg 1
2024-09-13 07:44:25 [22] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:689: resending QoS msg 1
2024-09-13 07:44:30 [31] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:689: resending QoS msg 1
2024-09-13 07:44:35 [27] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:689: resending QoS msg 1
2024-09-13 07:44:40 [30] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:689: resending QoS msg 1
2024-09-13 07:44:45 [35] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:689: resending QoS msg 1
2024-09-13 07:44:50 [36] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:689: resending QoS msg 1
2024-09-13 07:44:55 [29] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:689: resending QoS msg 1
2024-09-13 07:44:58 [57] WARN  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:1175: Warning : QoS action of msg 1 is canceled due to timeout!
2024-09-13 07:44:59 [63] WARN  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:1175: Warning : QoS action of msg 2 is canceled due to timeout!
2024-09-13 07:45:00 [65] WARN  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:1175: Warning : QoS action of msg 3 is canceled due to timeout!
2024-09-13 07:45:05 [42] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:666: Send pingreq (sock0x5905115d0138)(10000ms)
2024-09-13 07:45:15 [49] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:666: Send pingreq (sock0x5905115d0138)(10000ms)
2024-09-13 07:45:20 [43] WARN  /home/runner/work/nanomq/nanomq/nng/src/mqtt/protocol/mqtt/mqtt_client.c:648: MQTT Timeout and disconnect
JaylinYu commented 1 month ago

Thanks for your testing. There is indeed a bug in resend_wait. I forget to set a timestamp for each bridging msg at the first place.

However, resend action will not block the health checker, even in a crazy resend pace, which shall be the outcome by setting resend_interval = 100, but not present in your logs. That is confusing. Anyway, I have fixed the resend_wait bug, plz have a try on it.

sevi-l commented 1 month ago

thank you for the fix for resend_wait, I'll try it once it is released.

for the test you see above, I've set meaningful values so the log is not getting huge

resend_interval = 5000 cancel_timeout = 60000 keepalive = "10s"

so 5 seconds between retries, 60 seconds total retry, ping should happen every 10 seconds.

JaylinYu commented 1 month ago

now it has been merged and released