wagslane / go-rabbitmq

A wrapper of streadway/amqp that provides reconnection logic and sane defaults
https://blog.boot.dev/golang/connecting-to-rabbitmq-in-golang-easy/
MIT License
805 stars 128 forks source link

get message in a variable like github.com/streadway/amqp #132

Closed parsibox closed 1 year ago

parsibox commented 1 year ago

hi i want get message in a variable like github.com/streadway/amqp i need to get a batch size of message if exist for example i want get 100 message but on that time onlye 60 message exist it return 60 message to me and i want 1 second and retry again to get 100 message ,.... is it possible? can you write an example?

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "PeerToPeer_0", // name
        true,           // durable
        false,          // delete when unused
        false,          // exclusive
        false,          // no-wait
        nil,            // arguments
    )
    if err != nil {
        log.Fatalf("Failed to declare a queue: %v", err)
    }

    err = ch.Qos(100, 0, false)
    if err != nil {
        log.Fatalf("Failed to set QoS: %v", err)
    }
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    if err != nil {
        log.Fatalf("Failed to register a consumer: %v", err)
    }
    //  counter := 0
    for {
        //  fmt.Printf("halghe: %d\n", counter)
        for i := 0; i <= 100; i++ {
            select {
            case msg, ok := <-msgs:
                if !ok {
                    // Channel is closed
                    break
                }

                fmt.Printf("Received message %d : %s\n", i, string(msg.Body))

                // Acknowledge the message
                err = msg.Ack(false)
                if err != nil {
                    panic(err)
                }
            default:
                if i <= 100 {
                    fmt.Printf("sabr %d", i)
                    //time.Sleep(time.Second)
                }
            }
        }
        //  counter++
    }
}
wagslane commented 1 year ago

You can set the prefetch to 100 and the concurrency to 100

parsibox commented 1 year ago

i mean i get in once not in seprate handler