chirpstack / chirpstack-gateway-bridge

ChirpStack Gateway Bridge abstracts Packet Forwarder protocols into Protobuf or JSON over MQTT.
https://www.chirpstack.io
MIT License
423 stars 272 forks source link

Subscriptions are not obtained upon Mqtt disconnects / or connection resets #183

Closed JohnRoesler closed 3 years ago

JohnRoesler commented 3 years ago

What happened?

Some unexpected behaviors when connections are interupted to the MQTT broker (EMQX v4.1.0 in my case).

Inconsistently following a connection error like:

time="2020-10-22T17:11:09Z" level=error msg="mqtt: connection error" error="read tcp IP:38074->IP:31709: read: connection reset by peer"

time="2020-10-22T18:36:01Z" level=error msg="mqtt: connection error" error=EOF

time="2020-10-22T18:36:17Z" level=error msg="mqtt: connection error" error="write tcp IP:33194->IP:31709: write: broken pipe"

The subscriptions for each gateway topic to MQTT broker are not re-subscribed. When I enabled debug and the paho logging, I could see the subscriptions attempt to be re-added, but it would log this line and then never log it for the 2nd gateway not obtain either subscription.

What did you expect?

I would expect the subscriptions to be re-obtained. This may be a bug with the paho client.

I am currently testing gateway bridge at master + paho client at master.

Steps to reproduce this issue

Steps:

1. 2. 3. 4.

Could you share your log output?

Your Environment

Component Version
Application Server v?.?.?
Network Server
Gateway Bridge master
Chirpstack API
Geolocation
Concentratord
brocaar commented 3 years ago

@JohnRoesler it might be valid that you only see https://github.com/brocaar/chirpstack-gateway-bridge/blob/master/internal/integration/mqtt/backend.go#L179, because of https://github.com/brocaar/chirpstack-gateway-bridge/blob/master/internal/integration/mqtt/backend.go#L184.

Thus when the gateway is already in b.gateways, then it does not trigger https://github.com/brocaar/chirpstack-gateway-bridge/blob/master/internal/integration/mqtt/backend.go#L191.

However, a re-connect should also trigger this function: https://github.com/brocaar/chirpstack-gateway-bridge/blob/1062b1d7f980c4695c62284afd9b12987d9d9a37/internal/integration/mqtt/backend.go#L333.

This function is registered here: https://github.com/brocaar/chirpstack-gateway-bridge/blob/1062b1d7f980c4695c62284afd9b12987d9d9a37/internal/integration/mqtt/backend.go#L129.

Thus, on every (re)connect, the ChirpStack Gateway Bridge will iterate over b.gateways and should subscribe to each individual gateway topic.

JohnRoesler commented 3 years ago

Would it best to continue discussion here or on the forum? https://forum.chirpstack.io/t/subscriber-for-event-ack-disconnected/9412/3

JohnRoesler commented 3 years ago

@brocaar I am seeing this again...

time="2020-11-04T06:12:30Z" level=debug msg="integration/mqtt: set gateway subscription called" gateway_id=ID subscribe=true
time="2020-11-04T06:12:34Z" level=debug msg="backend/semtechudp: received udp packet from gateway" addr="addr:38513" protocol_version=2 type=PullData
time="2020-11-04T06:12:34Z" level=debug msg="backend/semtechudp: sending udp packet to gateway" addr="10.244.2.1:38513" protocol_version=2 type=PullACK
time="2020-11-04T06:12:34Z" level=debug msg="integration/mqtt: set gateway subscription called" gateway_id=ID subscribe=true
time="2020-11-04T06:12:41Z" level=debug msg="backend/semtechudp: received udp packet from gateway" addr="addr:4346" protocol_version=2 type=PullData
time="2020-11-04T06:12:41Z" level=debug msg="backend/semtechudp: sending udp packet to gateway" addr="addr:4346" protocol_version=2 type=PullACK
time="2020-11-04T06:12:41Z" level=debug msg="integration/mqtt: set gateway subscription called" gateway_id=ID subscribe=true
time="2020-11-04T06:12:42Z" level=info msg="integration/mqtt: connected to mqtt broker"
time="2020-11-04T06:12:42Z" level=info msg="integration/mqtt: subscribing to topic" qos=0 topic="gateway/ID/command/#"
time="2020-11-04T06:12:44Z" level=debug msg="backend/semtechudp: received udp packet from gateway" addr="addr:38513" protocol_version=2 type=PullData
time="2020-11-04T06:12:44Z" level=debug msg="backend/semtechudp: sending udp packet to gateway" addr="addr:38513" protocol_version=2 type=PullACK

what's odd here is that we don't see a connection error, but simply that it was connected again. And we can see it attempt to subscribe to the gateway, but the set gateway subscriptions never start occurring.

JohnRoesler commented 3 years ago

It seems that when the connection is closed in an unexpected manner by either our EMQX broker or something in between - that the state of the subscriptions is hung up somewhere. When it tries to resubscribe, it never makes it past the first of two gateways that it should subscribe to and never errors.

JohnRoesler commented 3 years ago

When the connection gets terminated in an undesirable way, like reset by peer, the subscriptions are blocked in some way - whether that's a bug with the paho auto reconnect logic or something in the gateway bridge I'm not sure yet.

As a temporary fix for the issue I am seeing (which I am also investing from the mqtt broker side as to why the connection resets are occurring), I set the auto reconnect to false and handle creating a new connection in the connection lost handler.

internal/integration/mqtt/backend.go

package mqtt

// ...

    b.clientOpts.SetAutoReconnect(!conf.Integration.MQTT.RecreateClientOnConnLost)

// ...

func (b *Backend) onConnectionLost(c paho.Client, err error) {
    mqttDisconnectCounter().Inc()
    log.WithError(err).Error("mqtt: connection error")

    if !b.clientOpts.AutoReconnect {
        b.disconnect()
        b.connectLoop()
    }
}
brocaar commented 3 years ago

Please note that one side-effect of setting SetAutoReconnect to false is that the paho library does not seem to persist messages when using QoS > 0 until they can be published.

@JohnRoesler have you already tried updating the paho library to the latest version to see if that makes any difference?

JohnRoesler commented 3 years ago

Yes I already tried updating to the latest version of paho, same issue occurred.

And yes I am aware of the side effect that we would lose QRS0 messages with auto reconnect off. We are sending all confirmed messages, so if the bridge loses connection we’re out of luck anyway.

brocaar commented 3 years ago

Thanks for the confirmation. So far I haven't been able to reproduce this with Mosquitto (I restarted mosquitto, killed it and pulled the ethernet plug out of my gateway), but tomorrow I'll try with EMQX. Maybe there is a slight difference in the way how the handshake is performed?

To make sure my environment is as close to yours, did you make any modifications to this config?

    # Clean session
    #
    # Set the "clean session" flag in the connect message when this client
    # connects to an MQTT broker. By setting this flag you are indicating
    # that no messages saved by the broker for this client should be delivered.
    clean_session=true

    # Client ID
    #
    # Set the client id to be used by this client when connecting to the MQTT
    # broker. A client id must be no longer than 23 characters. When left blank,
    # a random id will be generated. This requires clean_session=true.
    client_id=""
JohnRoesler commented 3 years ago

clean_session=true yes, and the client_id we set equal to a location identifier and the epoch seconds for when it started <location-seconds>.

I am trying to find a way to trigger a connection reset from the emqx broker to recreate this scenario - no luck on that front yet.

JohnRoesler commented 3 years ago

Captured some additional logging using the code change I have above,

we can see a time when it worked as I'd expect with the disconnect, and then connect, and then another time where it lost connection and didn't reconnect

properly reconnects

time="2020-11-09T14:33:32Z" level=debug msg="integration/mqtt: set gateway subscription called" gateway_id=id subscribe=true
time="2020-11-09T14:33:32Z" level=debug msg="[pinger]   ping check 1.9998006" module=mqtt
time="2020-11-09T14:33:33Z" level=debug msg="[pinger]   ping check 2.999755597" module=mqtt
time="2020-11-09T14:33:33Z" level=debug msg="[pinger]   keepalive sending ping" module=mqtt
time="2020-11-09T14:33:33Z" level=debug msg="[net]      startIncoming Received Message" module=mqtt
time="2020-11-09T14:33:33Z" level=debug msg="[net]      startIncomingComms: got msg on ibound" module=mqtt
time="2020-11-09T14:33:33Z" level=debug msg="[net]      startIncomingComms: received pingresp" module=mqtt
time="2020-11-09T14:33:33Z" level=debug msg="[net]      logic waiting for msg on ibound" module=mqtt
time="2020-11-09T14:33:34Z" level=debug msg="[pinger]   ping check 0.999821607" module=mqtt
time="2020-11-09T14:33:35Z" level=debug msg="[pinger]   ping check 1.999794159" module=mqtt
time="2020-11-09T14:33:36Z" level=debug msg="backend/semtechudp: received udp packet from gateway" addr="ip:42266" protocol_version=2 type=PullData
time="2020-11-09T14:33:36Z" level=debug msg="backend/semtechudp: sending udp packet to gateway" addr="ip:42266" protocol_version=2 type=PullACK
time="2020-11-09T14:33:36Z" level=debug msg="integration/mqtt: set gateway subscription called" gateway_id=id subscribe=true
time="2020-11-09T14:33:36Z" level=debug msg="[pinger]   ping check 2.999790178" module=mqtt
time="2020-11-09T14:33:36Z" level=debug msg="[pinger]   keepalive sending ping" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      incoming complete" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      startIncomingComms: got msg on ibound" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      logic waiting for msg on ibound" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      startIncomingComms: ibound complete" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      startIncomingComms goroutine complete" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      outgoing waiting for an outbound message" module=mqtt
time="2020-11-09T14:33:37Z" level=error msg="[client]   Connect comms goroutine - error triggered EOF" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   internalConnLost called" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   stopCommsWorkers called" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[router]   matchAndDispatch exiting" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[pinger]   keepalive stopped" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   startCommsWorkers output redirector finnished" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      outgoing waiting for an outbound message" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      outgoing waiting for an outbound message" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      outgoing comms stopping" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   internalConnLost waiting on workers" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      startComms closing outError" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   stopCommsWorkers waiting for workers" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   stopCommsWorkers waiting for comms" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   comms goroutine done" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   stopCommsWorkers done" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   internalConnLost workers stopped" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[msgids]   cleaned up" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   internalConnLost complete" module=mqtt
time="2020-11-09T14:33:37Z" level=error msg="mqtt: connection error" error=EOF
time="2020-11-09T14:33:37Z" level=warning msg="[client]   Disconnect() called but not connected (disconnected/reconnecting)" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   stopCommsWorkers called" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   stopCommsWorkers done (not running)" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   Connect()" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[store]    memorystore initialized" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   about to write new connect msg" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   socket connected to broker" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   Using MQTT 3.1.1 protocol" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      connect started" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      received connack" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   startCommsWorkers called" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   client is connected/reconnected" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      incoming started" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      startIncomingComms started" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      outgoing started" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      startComms started" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   startCommsWorkers done" module=mqtt
time="2020-11-09T14:33:37Z" level=warning msg="[store]    memorystore wiped" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   exit startClient" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      logic waiting for msg on ibound" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      startIncomingComms: inboundFromStore complete" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      logic waiting for msg on ibound" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[pinger]   keepalive starting" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      outgoing waiting for an outbound message" module=mqtt
time="2020-11-09T14:33:37Z" level=info msg="integration/mqtt: connected to mqtt broker"
time="2020-11-09T14:33:37Z" level=info msg="integration/mqtt: subscribing to topic" qos=0 topic="gateway/id/command/#"
time="2020-11-09T14:33:37Z" level=debug msg="[client]   enter Subscribe" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 1 topics: [gateway/id/command/#]" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   sending subscribe message, topic: gateway/id/command/#" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[client]   exit Subscribe" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      obound priority msg to write, type *packets.SubscribePacket" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      outgoing waiting for an outbound message" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      startIncoming Received Message" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      startIncomingComms: got msg on ibound" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[store]    memorystore del: message 1 was deleted" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      startIncomingComms: received suback, id: 1" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      startIncomingComms: granted qoss [0]" module=mqtt
time="2020-11-09T14:33:37Z" level=debug msg="[net]      logic waiting for msg on ibound" module=mqtt
time="2020-11-09T14:33:37Z" level=info msg="integration/mqtt: subscribing to topic" qos=0 topic="gateway/id/command/#"

Lack of reconnection

time="2020-11-09T17:01:28Z" level=info msg="integration/mqtt: subscribing to topic" qos=0 topic="gateway/id/command/#"
time="2020-11-09T17:01:28Z" level=debug msg="[client]   enter Subscribe" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 2 topics: [gateway/id/command/#]" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[client]   sending subscribe message, topic: gateway/id/command/#" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[client]   exit Subscribe" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      obound priority msg to write, type *packets.SubscribePacket" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      outgoing waiting for an outbound message" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="backend/semtechudp: received udp packet from gateway" addr="ip:1716" protocol_version=2 type=PushData
time="2020-11-09T17:01:28Z" level=debug msg="backend/semtechudp: received udp packet from gateway" addr="ip:44171" protocol_version=2 type=PushData
time="2020-11-09T17:01:28Z" level=debug msg="backend/semtechudp: sending udp packet to gateway" addr="ip:1716" protocol_version=2 type=PushACK
time="2020-11-09T17:01:28Z" level=info msg="integration/mqtt: publishing event" event=up qos=0 topic=gateway/id/event/up uplink_id=5ad776ee-042e-4b07-a3d7-1287be108bed
time="2020-11-09T17:01:28Z" level=debug msg="[client]   enter Publish" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[client]   sending publish message, topic: gateway/id/event/up" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      obound msg to write 0" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      obound wrote msg, id: 0" module=mqtt
time="2020-11-09T17:01:28Z" level=info msg="integration/mqtt: publishing event" event=up qos=0 topic=gateway/id/event/up uplink_id=bffa1996-a73d-4fd1-8053-628de48eb4a7
time="2020-11-09T17:01:28Z" level=debug msg="[client]   enter Publish" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[client]   sending publish message, topic: gateway/id/event/up" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="backend/semtechudp: sending udp packet to gateway" addr="ip:44171" protocol_version=2 type=PushACK
time="2020-11-09T17:01:28Z" level=debug msg="[net]      outgoing waiting for an outbound message" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      obound msg to write 0" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      obound wrote msg, id: 0" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      outgoing waiting for an outbound message" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      startIncoming Received Message" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      startIncomingComms: got msg on ibound" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      startIncomingComms: received pingresp" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      logic waiting for msg on ibound" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      startIncomingComms: got msg on ibound" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      logic waiting for msg on ibound" module=mqtt
time="2020-11-09T17:01:28Z" level=error msg="[client]   Connect comms goroutine - error triggered EOF" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[client]   internalConnLost called" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[client]   stopCommsWorkers called" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      incoming complete" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[client]   internalConnLost waiting on workers" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[client]   stopCommsWorkers waiting for workers" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      outgoing waiting for an outbound message" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      outgoing waiting for an outbound message" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[client]   startCommsWorkers output redirector finnished" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      startIncomingComms: ibound complete" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      startIncomingComms goroutine complete" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[router]   matchAndDispatch exiting" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[pinger]   keepalive stopped" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[client]   stopCommsWorkers waiting for comms" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      outgoing waiting for an outbound message" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      outgoing comms stopping" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[net]      startComms closing outError" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[client]   comms goroutine done" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[client]   stopCommsWorkers done" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[client]   internalConnLost workers stopped" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[msgids]   cleaned up" module=mqtt
time="2020-11-09T17:01:28Z" level=debug msg="[client]   internalConnLost complete" module=mqtt
time="2020-11-09T17:01:28Z" level=error msg="mqtt: connection error" error=EOF
time="2020-11-09T17:01:28Z" level=error msg="integration/mqtt: subscribe gateway error" error="subscribe topic error: connection lost before Subscribe completed" gateway_id=id
time="2020-11-09T17:01:29Z" level=info msg="integration/mqtt: subscribing to topic" qos=0 topic="gateway/id/command/#"
time="2020-11-09T17:01:29Z" level=debug msg="[client]   enter Subscribe" module=mqtt
time="2020-11-09T17:01:29Z" level=error msg="integration/mqtt: subscribe gateway error" error="subscribe topic error: not Connected" gateway_id=id
time="2020-11-09T17:01:29Z" level=debug msg="backend/semtechudp: received udp packet from gateway" addr="ip:62082" protocol_version=2 type=PullData
time="2020-11-09T17:01:30Z" level=info msg="integration/mqtt: subscribing to topic" qos=0 topic="gateway/id/command/#"
time="2020-11-09T17:01:30Z" level=debug msg="[client]   enter Subscribe" module=mqtt
time="2020-11-09T17:01:30Z" level=error msg="integration/mqtt: subscribe gateway error" error="subscribe topic error: not Connected" gateway_id=id
time="2020-11-09T17:01:31Z" level=info msg="integration/mqtt: subscribing to topic" qos=0 topic="gateway/id/command/#"
time="2020-11-09T17:01:31Z" level=debug msg="[client]   enter Subscribe" module=mqtt
JohnRoesler commented 3 years ago

I believe the issue I am seeing is that the backend gets locked by the call to SetGatewaySubscription from the forwarder.go file, which is trying to set the gateway subscriptions, then the connection to the broker is lost. The SetGatewaySubscription then enters an infinite for loop and continuously fails to subscribe because the broker isn't connected. In the case of the code I was testing above, we never see the disconnect() and connect() funcs trigger because they can't obtain the lock on the backend.

I am going to do some testing with the following additions of IsConnected checks to the SetGatewaySubscription func:

// SetGatewaySubscription (un)subscribes the given gateway.
func (b *Backend) SetGatewaySubscription(subscribe bool, gatewayID lorawan.EUI64) error {
    if !b.conn.IsConnected() {
        return errors.New("mqtt not connected")
    }
    b.Lock()
    defer b.Unlock()

    log.WithFields(log.Fields{
        "gateway_id": gatewayID,
        "subscribe":  subscribe,
    }).Debug("integration/mqtt: set gateway subscription called")

    _, ok := b.gateways[gatewayID]
    if ok == subscribe {
        return nil
    }

    for {
        if !b.conn.IsConnected() {
            return errors.New("mqtt not connected")
        }

        if subscribe {

I created some simple code to prove this locking behavior in go will not detect a deadlock but continue to spin the loop forever:

package main
import (
    "log"
    "sync"
    "time"
)
type my struct {
    sync.RWMutex
    val int
}
func task1(m *my) {
    m.Lock()
    defer m.Unlock()
    for {
        if m.val > 0 {
            break
        }
        log.Println("sleeping for 5 seconds")
        time.Sleep(5 * time.Second)
    }
}
func main() {
    m := my{}
    go task1(&m)
    log.Println("trying to access locked val")
    m.Lock()
    defer m.Unlock()
    test := m.val
    log.Println(test)
}
brocaar commented 3 years ago

Fyi: I have just merged in some other improvements, during an other project we have found some bottlenecks in how the channels were setup. These channels have been removed and callbacks executed in go routines are now used. This does not solve this issue however, I'm currently testing various scenarios and I might have found the potential issue. I'm currently testing some modifications.

Note that in the original implementation, the mutex might not be the issue. While SetGatewaySubscription would hold the lock until the (un)subscribe is completed, this function should finish once re-connected. This means that onConnected will be able to acquire the lock only after SetGatewaySubscription has been completed, but that is fine. The potential race is with connect, as this also tries to acquire a lock. So if SetGatewaySubscription is not able to (un)subscribe because the client is disconnected, then connect is blocked forever and there is a deadlock.

I'm going to make some modifications and let you know as soon as I have something to test with :)

brocaar commented 3 years ago

@JohnRoesler I'm looking forward to your feedback on https://github.com/brocaar/chirpstack-gateway-bridge/commit/6cc9cafd4b1f18aa5ab3fbf394ac61ab8e042ab6 (https://github.com/brocaar/chirpstack-gateway-bridge/tree/subscribe_refactor). Could you try to reproduce your issue with these changes?

brocaar commented 3 years ago

I have pushed an other commit. I forgot to init the new map variable :man_facepalming: (and the test failed)

JohnRoesler commented 3 years ago

@brocaar I will get this deployed and monitor it. It will likely take a week before I would be comfortable saying it's good given it hasn't reproduced with any regularity! With how the timing needed to line up perfectly for the mutex to lock up. Ah well, thank you very much for the quick work and I'll report updates as I have them!

brocaar commented 3 years ago

Thanks @JohnRoesler for testing this!

With how the timing needed to line up perfectly for the mutex to lock up.

For this reason, I have split up the mutex into different variables. It was used and for connection purposes, and to guard against concurrent access to the gateways map.

andryyy commented 3 years ago

I deployed an EMQX cluster after I read about it in this thread and will test it, too. :)

JohnRoesler commented 3 years ago

@brocaar yes I saw the addition of mutexes and I thought it looked quite good.

@andryyy thank you for testing also!

JohnRoesler commented 3 years ago

I am still seeing the issue of gateway bridges not keeping/re-obtaining their subscriptions, but not in the same way now.

Things to note about my config: I have clean_session=true and using qos=0.

Successful reconnect and re-subscribe

time="2020-11-13T17:55:04Z" level=error msg="mqtt: connection error" error=EOF
time="2020-11-13T17:55:04Z" level=debug msg="[client]   enter reconnect" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[client]   about to write new connect msg" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[client]   socket connected to broker" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[client]   Using MQTT 3.1.1 protocol" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[net]      connect started" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[net]      received connack" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[client]   client is reconnected" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[net]      incoming started" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[pinger]   keepalive starting" module=mqtt
time="2020-11-13T17:55:04Z" level=info msg="integration/mqtt: connected to mqtt broker"
time="2020-11-13T17:55:04Z" level=debug msg="[net]      logic started" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[net]      logic waiting for msg on ibound" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[net]      outgoing started" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[net]      outgoing waiting for an outbound message" module=mqtt
time="2020-11-13T17:55:04Z" level=info msg="integration/mqtt: subscribing to topic" qos=0 topic="gateway/gateway-id1/command/#"
time="2020-11-13T17:55:04Z" level=debug msg="[client]   enter Subscribe" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 0 topics: [gateway/gateway-id1/command/#]" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[client]   exit Subscribe" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[net]      obound priority msg to write, type *packets.SubscribePacket" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[net]      outgoing waiting for an outbound message" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[net]      Received Message" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[net]      logic got msg on ibound" module=mqtt
time="2020-11-13T17:55:04Z" level=warning msg="[store]    memorystore del: message 1 not found" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[net]      received suback, id: 1" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[net]      granted qoss [0]" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[net]      logic waiting for msg on ibound" module=mqtt
time="2020-11-13T17:55:04Z" level=info msg="integration/mqtt: subscribing to topic" qos=0 topic="gateway/gateway-id1/command/#"
time="2020-11-13T17:55:04Z" level=debug msg="[client]   enter Subscribe" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 0 topics: [gateway/gateway-id1/command/#]" module=mqtt
time="2020-11-13T17:55:04Z" level=debug msg="[client]   exit Subscribe" module=mqtt

Unsuccessful - reconnect, but no attempt to re-subscribe

time="2020-11-13T19:05:05Z" level=error msg="mqtt: connection error" error="read tcp ip:48750->ip:1883: read: connection reset by peer"
time="2020-11-13T19:05:05Z" level=debug msg="[client]   enter reconnect" module=mqtt
time="2020-11-13T19:05:05Z" level=debug msg="[client]   about to write new connect msg" module=mqtt
time="2020-11-13T19:05:05Z" level=debug msg="[client]   socket connected to broker" module=mqtt
time="2020-11-13T19:05:05Z" level=debug msg="[client]   Using MQTT 3.1.1 protocol" module=mqtt
time="2020-11-13T19:05:05Z" level=debug msg="[net]      connect started" module=mqtt
time="2020-11-13T19:05:06Z" level=debug msg="[net]      received connack" module=mqtt
time="2020-11-13T19:05:06Z" level=debug msg="[client]   client is reconnected" module=mqtt
time="2020-11-13T19:05:06Z" level=debug msg="[net]      incoming started" module=mqtt
time="2020-11-13T19:05:06Z" level=debug msg="[pinger]   keepalive starting" module=mqtt
time="2020-11-13T19:05:06Z" level=info msg="integration/mqtt: connected to mqtt broker"
time="2020-11-13T19:05:06Z" level=debug msg="[net]      logic started" module=mqtt
time="2020-11-13T19:05:06Z" level=debug msg="[net]      logic waiting for msg on ibound" module=mqtt
time="2020-11-13T19:05:06Z" level=debug msg="[net]      outgoing started" module=mqtt
time="2020-11-13T19:05:06Z" level=debug msg="[net]      outgoing waiting for an outbound message" module=mqtt
time="2020-11-13T19:05:06Z" level=debug msg="backend/semtechudp: received udp packet from gateway" addr="ip:20513" protocol_version=2 type=PushData
time="2020-11-13T19:05:06Z" level=info msg="integration/mqtt: publishing event" event=stats qos=0 stats_id=6ba5cba6-702b-4f77-a968-e5727d7cccd5 topic=gateway/gateway-id1/event/stats

I'm trying to trace through why this might happen since the gatewaysSubscribed is emptied in the onconnected handler.

JohnRoesler commented 3 years ago

Changed my config for clean_session=false and saw once case of a bridge not reconnecting.

Never reconnects

time="2020-11-14T08:10:36Z" level=error msg="[net]      incoming stopped with error EOF" module=mqtt
time="2020-11-14T08:10:36Z" level=debug msg="[pinger]   ping check 17.199184392" module=mqtt
time="2020-11-14T08:10:36Z" level=error msg="[pinger]   pingresp not received, disconnecting" module=mqtt
time="2020-11-14T08:10:36Z" level=debug msg="backend/semtechudp: received udp packet from gateway" addr="ip:18599" protocol_version=2 type=PullData
time="2020-11-14T08:10:36Z" level=debug msg="integration/mqtt: set gateway subscription" gateway_id=gateway-1 subscribe=true
time="2020-11-14T08:10:36Z" level=debug msg="backend/semtechudp: sending udp packet to gateway" addr="ip:45275" protocol_version=2 type=PushACK
time="2020-11-14T08:10:36Z" level=debug msg="backend/semtechudp: sending udp packet to gateway" addr="ip:20792" protocol_version=2 type=PushACK
time="2020-11-14T08:10:36Z" level=debug msg="backend/semtechudp: sending udp packet to gateway" addr="ip:18599" protocol_version=2 type=PullACK
time="2020-11-14T08:10:36Z" level=debug msg="backend/semtechudp: sending udp packet to gateway" addr="ip:45275" protocol_version=2 type=PushACK
time="2020-11-14T08:10:36Z" level=debug msg="backend/semtechudp: sending udp packet to gateway" addr="ip:45275" protocol_version=2 type=PushACK
time="2020-11-14T08:10:36Z" level=debug msg="backend/semtechudp: sending udp packet to gateway" addr="ip:18599" protocol_version=2 type=PullACK
time="2020-11-14T08:10:36Z" level=info msg="integration/mqtt: publishing event" event=up qos=0 topic=gateway/gateway-1/event/up uplink_id=88905364-87af-46a6-83e6-6e767ef13741
time="2020-11-14T08:10:36Z" level=debug msg="[client]   enter Publish" module=mqtt
time="2020-11-14T08:10:36Z" level=debug msg="[client]   sending publish message, topic: gateway/gateway-1/event/up" module=mqtt
time="2020-11-14T08:10:36Z" level=info msg="integration/mqtt: publishing event" event=up qos=0 topic=gateway/gateway-2/event/up uplink_id=eb85eb2c-123e-4039-b4e8-9dd5158df8e0
time="2020-11-14T08:10:36Z" level=debug msg="[client]   enter Publish" module=mqtt
time="2020-11-14T08:10:36Z" level=info msg="integration/mqtt: publishing event" event=up qos=0 topic=gateway/gateway-2/event/up uplink_id=153b55c9-3bf3-4cc3-85ef-620f4a170702
time="2020-11-14T08:10:36Z" level=debug msg="[client]   enter Publish" module=mqtt
time="2020-11-14T08:10:36Z" level=debug msg="[client]   sending publish message, topic: gateway/gateway-2/event/up" module=mqtt
time="2020-11-14T08:10:36Z" level=debug msg="[client]   sending publish message, topic: gateway/gateway-2/event/up" module=mqtt
time="2020-11-14T08:10:40Z" level=debug msg="backend/semtechudp: received udp packet from gateway" addr="ip:13232" protocol_version=2 type=PullData
time="2020-11-14T08:10:40Z" level=debug msg="backend/semtechudp: sending udp packet to gateway" addr="ip:13232" protocol_version=2 type=PullACK
time="2020-11-14T08:10:40Z" level=debug msg="integration/mqtt: set gateway subscription" gateway_id=gateway-2 subscribe=true
time="2020-11-14T08:10:41Z" level=debug msg="backend/semtechudp: received udp packet from gateway" addr="ip:18599" protocol_version=2 type=PullData
time="2020-11-14T08:10:41Z" level=debug msg="backend/semtechudp: sending udp packet to gateway" addr="ip:18599" protocol_version=2 type=PullACK
time="2020-11-14T08:10:41Z" level=debug msg="integration/mqtt: set gateway subscription" gateway_id=gateway-1 subscribe=true
time="2020-11-14T08:10:49Z" level=debug msg="backend/semtechudp: received udp packet from gateway" addr="ip:45275" protocol_version=2 type=PushData
time="2020-11-14T08:10:49Z" level=info msg="integration/mqtt: publishing event" event=stats qos=0 stats_id=45459540-89d0-4b61-8ba8-47d394f6108a topic=gateway/gateway-2/event/stats
brocaar commented 3 years ago

I'm trying to trace through why this might happen since the gatewaysSubscribed is emptied in the onconnected handler.

Could you check if https://github.com/brocaar/chirpstack-gateway-bridge/blob/subscribe_refactor/internal/integration/mqtt/backend.go#L39 does contain your gateway ID? If not, then the issue might not be related to the MQTT (re)subscribe, but the registration of your Gateway ID.

Unsuccessful - reconnect, but no attempt to re-subscribe

Do you have the full log from at the time the subscription was created when a PullData frame was received?

Please note: the GatewayID is added when a PullData frame is received from the UDP packet-forwarder: https://github.com/brocaar/chirpstack-gateway-bridge/blob/subscribe_refactor/internal/backend/semtechudp/backend.go#L335.

It is set to true here: https://github.com/brocaar/chirpstack-gateway-bridge/blob/subscribe_refactor/internal/backend/semtechudp/registry.go#L64, and when no PullData frame has been received from the UDP packet-forwarder for a while, it is set to false here: https://github.com/brocaar/chirpstack-gateway-bridge/blob/subscribe_refactor/internal/backend/semtechudp/registry.go#L84.

In case your gateway did not send a PullData frame / the ChirpStack Gateway Bridge was unable to receive a PullData frame for a while, then it is expected that the ChirpStack Gateway Bridge will drop the MQTT subscription.

You should see the following kind of messages periodically in your logs:

time="2020-11-14T08:10:41Z" level=debug msg="backend/semtechudp: received udp packet from gateway" addr="ip:18599" protocol_version=2 type=PullData
time="2020-11-14T08:10:41Z" level=debug msg="backend/semtechudp: sending udp packet to gateway" addr="ip:18599" protocol_version=2 type=PullACK
time="2020-11-14T08:10:41Z" level=debug msg="integration/mqtt: set gateway subscription" gateway_id=gateway-1 subscribe=true
JohnRoesler commented 3 years ago

Hey @brocaar i'm ready to say the change is working well. There have been a couple of odd issues as noted above, but nothin in the past 7 days. The combination of clean_session=false and your subscription handling updates have us humming along now without issue. I look forward to seeing this in a release 😄

JohnRoesler commented 3 years ago

@brocaar fancy releasing this soon?