chirpstack / chirpstack-gateway-bridge

ChirpStack Gateway Bridge abstracts Packet Forwarder protocols into Protobuf or JSON over MQTT.
https://www.chirpstack.io
MIT License
422 stars 270 forks source link

Bottleneck for MQTT publish when using Qos > 0 #97

Closed lglenat closed 5 years ago

lglenat commented 5 years ago

Is this a bug or a feature request?

Limitation / Bug

What happened?

When using QoS 1 or 2, the uplink rate is limited to a few packets per second (around 10).

For a Gateway with a high concentration of nodes, or if the Gateway Bridge is installed in the cloud and handles messages for several gateways, this is an issue.

The bottle-necking is due to the following code in backend.go:

if token := b.conn.Publish(topic.String(), b.qos, false, bytes); token.Wait() && token.Error() != nil {
    return token.Error()
}

The token.Wait() call is blocking. With QoS 0, it's returning without any delay, but with QoS 1 it's blocking until the PUBACK is received for that message, which can take up to a few hundred milliseconds depending on network latency.

I don't know if token.Wait() should be called at all since no provisions are made to retry publishing the message if it fails anyway. From what I understood reading the code, If the MQTT connection drops, the Paho client will re-publish the messages upon reconnection if they were queued with QoS 1 or 2. Maybe this is enough and should remove the need for the token.Wait().

Or message publishing needs to be parallelized so that one message is not blocking the others while in the token.Wait(). And in that case maybe token.WaitTimeout() should be used instead.

I'd be happy working on a PR to fix this if needed, but I am not familiar enough with Go yet to code anything complex.

What version are your using?

Latest (2.6.1)

How can your issue be reproduced?

Have a lot of uplink packets going through the bridge (a few dozens). Note: the UDP packets are queued in the network layer at the socket level so they are not always dropped, but they are not sent up to the Network Server as fast as they should, and will be dropped eventually, once the socket queue is full.

brocaar commented 5 years ago

You're right!

I think the changes are minimal, see the runV2 and runV3 functions in root_run.go. Example:

    go func() {
        for rxPacket := range gw.RXPacketChan() {
            if err := pubsub.PublishGatewayRX(rxPacket.RXInfo.MAC, rxPacket); err != nil {
                log.WithError(err).Error("publish uplink message error")
            }
        }
    }()

I think this needs to be changed to (untested):

    go func() {
        for rxPacket := range gw.RXPacketChan() {
                    go func(rxPacket gw.RXPacketByte) {
                    if err := pubsub.PublishGatewayRX(rxPacket.RXInfo.MAC, rxPacket); err != nil {
                log.WithError(err).Error("publish uplink message error")
            }
                    }(rxPacket)
        }
    }()

Would you be able to update these for loops (there are a couple in runV2 and runV3) and do some testing?

lglenat commented 5 years ago

I will have a look in the next few days. Yes I'll do throughput tests to make sure it's a workable solution. I'll keep you posted.

brocaar commented 5 years ago

@lglenat have you already had the time to look intoo this?

lglenat commented 5 years ago

@brocaar finally yes. See the attached PR.