I have an app which receives the messages at the rate of 1000 pps. With auto ack enabled, app can handle this rate but memory of my container keeps on growing and never return to os until app is closed or channel is closed.
To avoid this memory issue I have used manual ack and various prefetch count (1/2/200/500/1000) and all the qos value are limiting the application consumption rate to around 250 pps.
Can someone suggest better way to handle packet at 1000pps kind of rate and avoid memory issue?
deliveries, err := w.channel.Consume(w.qName, w.cTag, opts.autoAck, false, false, false, nil)
for {
select {
case d := <-deliveries:
if len(d.Body) == 0 {
if !opts.autoAck {
d.Nack(false, false)
}
errorChan <- emptyBodyError
closeWorker(w)
return
}
handler(d)
if !opts.autoAck {
d.Ack(false)
}
}
}
}
Basically I am putting messages that i receive on a different channel and acking msg immediately.
Also I have made the handler function as go routine and this again behaved similar to auto ack scenario and memory started growing
Can someone help me why my pps is getting reduced because of manual ack by 4 times and any design changes if required from my app side to resolve this?
This kind of "re-dispatch" is the right approach if your consumers do not naturally keep up even with manual acknowledgements and a prefetch limit. Memory and CPU profiling will help narrow the bottleneck down.
I have an app which receives the messages at the rate of 1000 pps. With auto ack enabled, app can handle this rate but memory of my container keeps on growing and never return to os until app is closed or channel is closed. To avoid this memory issue I have used manual ack and various prefetch count (1/2/200/500/1000) and all the qos value are limiting the application consumption rate to around 250 pps. Can someone suggest better way to handle packet at 1000pps kind of rate and avoid memory issue?
deliveries, err := w.channel.Consume(w.qName, w.cTag, opts.autoAck, false, false, false, nil) for { select { case d := <-deliveries: if len(d.Body) == 0 { if !opts.autoAck { d.Nack(false, false) } errorChan <- emptyBodyError closeWorker(w) return } handler(d) if !opts.autoAck { d.Ack(false) } } } }
func handler(d amqp.Delivery, pktChan chan amqp.Delivery) { pktChan <- d }