absmach / export

Mainflux Export service that sends messages from one Mainflux cloud to another via MQTT
10 stars 11 forks source link

Workers get stuck when waiting for Paho Token #31

Open pricelessrabbit opened 3 years ago

pricelessrabbit commented 3 years ago

STR

expected

actual


Seems that the issue is in the Token.wait() used here

when a worker publish to mqtt and qos = 2, paho wait for reconnection to send the message so the worker get stuck in the wait() callback and cant manage further messages.

I managed to fix it using Token.WaitTimeout

token := e.mqtt.Publish(topic, byte(e.cfg.MQTT.QoS), e.cfg.MQTT.Retain, payload)
    sentInTime := token.WaitTimeout(3 * time.Second)
    if !sentInTime {
        e.logger.Warn(fmt.Sprintf("MQTT message in topic %s sending timeout exceed: persisted if QOS >= 1", topic))
        return nil
    }
    if sentInTime && token.Error() != nil {
        e.logger.Error(fmt.Sprintf("Failed to publish to topic %s", topic))
        return token.Error()
    }
    return nil

In that way when waiting exceeds 3 seconds the worker returns online

mteodor commented 3 years ago

@PricelessRabbit Thank you could you try this

go func() {
        token := e.mqtt.Publish(topic, byte(e.cfg.MQTT.QoS), e.cfg.MQTT.Retain, payload)
        if token.Wait() && token.Error() != nil {
            e.logger.Error(fmt.Sprintf("Failed to publish to topic %s", topic))

        }

    }()
return nil
pricelessrabbit commented 3 years ago

hi @mteodor tried both the async goroutine and WaitTimeout and both solutions works, but in case of the goroutine, i'm a bit worried in case of a long disconnection: in case of a 1-day disconnection with 1 message per second there will be 86k of waiting goroutines. Is it ok?

mteodor commented 3 years ago

@PricelessRabbit than combination of both seems best solution, what do you think? Could you make a PR for this?

pricelessrabbit commented 3 years ago

i have this in production in my fork now and it works as expected. if it looks ok to you ill open a PR. pls tell me if i have to change the logging or any other change thanks

go func() {
        token := e.mqtt.Publish(topic, byte(e.cfg.MQTT.QoS), e.cfg.MQTT.Retain, payload)
        publishedInTime := token.WaitTimeout(3 * time.Second)
        if publishedInTime && token.Error() != nil {
            e.logger.Error(fmt.Sprintf("Failed to publish to topic %s", topic))
            return
        }
        if !publishedInTime {
            e.logger.Warn(fmt.Sprintf("Message in topic %s is taking a long time to be published", topic))
        }
    }()
mteodor commented 3 years ago

Yes, please send this PR we'll propose changes through review