absmach / export

Mainflux Export service that sends messages from one Mainflux cloud to another via MQTT
10 stars 11 forks source link

Consume goroutine in some cases never process channel messages #29

Closed pricelessrabbit closed 4 years ago

pricelessrabbit commented 4 years ago

Hi all. I found a very strange behaviour in export service if adding multiple routes. Seems that sometimes the workers goroutines remains waiting also if there is data in the channel. This issue appears randomly, but i managed to reproduce it with workers = 1 and GOMAXPROCS= 1 (but the issue can appear also with default settings). This is my setup

[[routes]]
mqtt_topic = "channels/8f99dca5-7e83-4d57-bc4f-c3356d4e73d7/messages"
nats_topic = "measures"
subtopic = ""
type = "default"
workers = 1

[[routes]]
mqtt_topic = "channels/8f99dca5-7e83-4d57-bc4f-c3356d4e73d7/messages"
nats_topic = "measures2"
subtopic = ""
type = "default"
workers = 1

i also have an external process that publishes on nats "measures" topic.

Seems a strange concurrency issue, but i cannot understand it. Tried some workarounds to better understand the issue that works:

for _, r := range e.consumers {
        //_, err := nc.ChanQueueSubscribe(r.NatsTopic, exportGroup, r.Messages)
        _, err := nc.QueueSubscribe(r.NatsTopic, exportGroup,func(msg *nats.Msg){
            r.Messages <- msg
        })
        if err != nil {
            e.logger.Error(fmt.Sprintf("Failed to subscribe to NATS %s: %s", r.NatsTopic, err))
        }
        for i := 0; i < r.Workers; i++ {
            go r.Consume()
        }
    }