wagslane / go-rabbitmq

A wrapper of streadway/amqp that provides reconnection logic and sane defaults
https://blog.boot.dev/golang/connecting-to-rabbitmq-in-golang-easy/
MIT License
768 stars 125 forks source link

Consumer closed while Fetching Messages #108

Closed sultanfariz closed 1 year ago

sultanfariz commented 1 year ago

I have implemented a consumer that's supposed to fetch many messages. However, the consumer connection seems to be always closed even before i ack/nack the message. For my case, I expect it to be always idle. The code for the RabbitMQ consumer looks like this:

        consumer, err := rabbitmq.NewConsumer(
        p.conn,
        func(d rabbitmq.Delivery) rabbitmq.Action {
            var data map[string]interface{}
            err := json.Unmarshal(d.Body, &data)
            if err != nil {
                return rabbitmq.NackRequeue
            }

            // set type assertion for id, createdAt, and updatedAt
            var id float64
            var ok bool
            if x, found := data["Id"]; found {
                if id, ok = x.(float64); !ok {
                    fmt.Println("id is not float64")
                    return rabbitmq.NackRequeue
                }
            } else {
                fmt.Println("id is not found")
                return rabbitmq.NackRequeue
            }
            // set stream response
            post := &postPB.Post{
                Id:      int32(id),
                Title:   data["Title"].(string),
                Content: data["Content"].(string),
                Topic:   data["Topic"].(string),
            }

            response := &postPB.SubscribePostByTopicResponse{
                Meta: &postPB.GenericResponse{
                    Status:  "success",
                    Message: "success",
                },
                Post: post,
            }
            if err := stream.Send(response); err != nil {
                fmt.Println("error sending stream: ", err)
                return rabbitmq.NackRequeue
            }
            return rabbitmq.Ack
        },
        "grpc_queue",
        rabbitmq.WithConsumerOptionsRoutingKey("grpc_queue"),
        rabbitmq.WithConsumerOptionsExchangeName("events"),
        rabbitmq.WithConsumerOptionsExchangeDeclare,
    )

I need to make the consumer connection always idle and fetch as many messages as possible without closing the connection. Please suggest a solution to this issue.

sultanfariz commented 1 year ago

I'd like to inform that this issue is no longer valid. It turns out that the issue occurred because I forgot to put the main thread blocking code, which caused the consumer to exit immediately after consuming the first message. Thanks :).