rabbitmq / amqp091-go

An AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`
Other
1.49k stars 135 forks source link

Messages don't do ack #229

Closed KoNekoD closed 10 months ago

KoNekoD commented 10 months ago

Describe the bug

I use single amqp instance for many consumers and my messages not ack

type Instance struct { Conn amqp.Connection Ch amqp.Channel errorChecker error_checker2.ErrorCheckerInterface }

func (i *Instance) CreateAndConsume( name string, executeTask func(message []byte) error, ) error { //ione, := NewInstanceWithoutDI(i.errorChecker) //_ = i_one.Ch.ExchangeDelete(name, false, false) // //itwo, := NewInstanceWithoutDI(i.errorChecker) //_ = i_two.Ch.QueueUnbind(name, name, name, nil) // //ithree, := NewInstanceWithoutDI(i.errorChecker) //, = i_three.Ch.QueuePurge(name, false)

q, err := i.Ch.QueueDeclare(name, true, false, false, false, nil)
if err != nil {
    return errors.Wrap(err, "Q")
}

err = i.Ch.ExchangeDeclare(name, "fanout", true, false, false, false, nil)
if err != nil {
    return errors.Wrap(err, "E")
}

err = i.Ch.QueueBind(name, name, name, false, nil)
if err != nil {
    return errors.Wrap(err, "B")
}

msgs, err := i.Ch.Consume(q.Name, "", false, false, false, false, nil)
if err != nil {
    return errors.Wrap(err, "R")
}

var wg sync.WaitGroup

wg.Add(1)

go func(wg *sync.WaitGroup) {
    defer wg.Done()
    for d := range msgs {
        executeTaskErr := executeTask(d.Body)
        if i.errorChecker.ErrorFound(executeTaskErr) {
            nackErr := d.Nack(false, true)
            i.errorChecker.Check(nackErr)
        } else {
            ackErr := d.Ack(false)
            i.errorChecker.Check(ackErr)
        }
    }
}(&wg)

wg.Wait()

return errors.New("This method should not stop")

}

Reproduction steps

see code

Expected behavior

Unacked: 0

Additional context

No response

KoNekoD commented 10 months ago

image

KoNekoD commented 10 months ago

But i run 1 consumer for messages handling

KoNekoD commented 10 months ago

I've come to the point where messages are immediately accepted even though the previous one hasn't been acked yet. It is necessary to forbid it to take new ones from the queue until the current one is done.....