go-stomp / stomp

Go language library for STOMP protocol
Apache License 2.0
307 stars 93 forks source link

Read stops after 18 messages on queue #131

Open bugrakocabay opened 6 months ago

bugrakocabay commented 6 months ago

Hello,

I have an ActiveMQ server running on docker. I am sending messages to queue in a for loop to experiment but after 18 messages the client stops reading the messages. What could be the reason for this?

Docker: docker run -e ARTEMIS_USER=myUser -e ARTEMIS_PASSWORD=myPass --name mycontainer -it -p 61616:61616 -p 8161:8161 apache/activemq-artemis:latest-alpine

Message producer:

func connectActiveMQ() (*stomp.Conn, error) {
    var err error
    netConn, err := net.DialTimeout("tcp", "localhost:61616", 10*time.Second)
    if err != nil {
        return nil, err
    }

    stompConn, err := stomp.Connect(netConn, stomp.ConnOpt.Login("myUser", "myPass"))
    if err != nil {
        return nil, err
    }

    _, err = stompConn.Subscribe("test", stomp.AckAuto)
    if err != nil {
        log.Printf("Failed to subscribe to ActiveMQ: %s", err)
        return nil, err
    }

    return stompConn, nil

func main() {
        conn, err := connectActiveMQ()
    if err != nil {
        log.Fatalf("Failed to connect to ActiveMQ: %s", err)
    }
    for j := 0; j < 100; j++ {
        go func() {
            err = conn.Send("test", "text/plain", []byte(strconv.Itoa(j+1)), stomp.SendOpt.NoContentLength, stomp.SendOpt.Header("persistent", "true"))
            if err != nil {
                log.Printf("Failed to send message: %s", err)
            }
        }()
        log.Printf("Sent message: %s", j)
    }
}

Message consumer:

func connectActiveMQ() (*stomp.Subscription, error) {
    var err error
    netConn, err := net.DialTimeout("tcp", "localhost:61616", 10*time.Second)
    if err != nil {
        return nil, err
    }

    stompConn, err := stomp.Connect(netConn, stomp.ConnOpt.Login("myUser", "myPass"))
    if err != nil {
        return nil, err
    }

    c, err := stompConn.Subscribe("test", stomp.AckAuto)
    if err != nil {
        return nil, err
    }

    return c, nil
}

func main() {
    c, err := connectActiveMQ()
    if err != nil {
        log.Fatalf("Failed to connect to ActiveMQ: %s", err)
    }

    go func() {
        for {
            m, err := c.Read()
            if err != nil {
                log.Printf("Failed to read message from ActiveMQ: %s", err)
            }
            log.Printf("Received message: %s", m.Body)
        }
    }()

    signalChannel := setupSignalHandling()
    waitForShutdown(signalChannel)
}
worg commented 5 months ago

I'll try to make time this weekend to look into this