streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

channel.Publish lose data #361

Closed zhaoziming closed 5 years ago

zhaoziming commented 6 years ago

I publish 100000 message to the queue,but only 99000+ message show on the rabbitmq's control panel, I test many times,almost every time this problem arises,unless use time.Sleep to avoid this problem. but I use php, this will not happen. What's the reason? Here is my code

package main

import( "fmt" "github.com/streadway/amqp" "bytes" "log" //"time" )

const( base_url string = "http://abc.test.com/v3/place/detail?key=%s&id=%s&myid=%d" mqurl string = "amqp://root:fang.com@10.1.192.159:5672/citymap_poi" )

var key_arr = []string{ "7518a26a0097b5d4018e992a5b065359", "4e6dd02e1330885e656eab8c40aa6028", "99a6652e43cc3f3fc0f03064dd90b3de"}

var key_arr_length = len(key_arr)

var conn amqp.Connection var channel amqp.Channel var count = 0

func main(){ sender() }

func sender(){ for i := 0; i < 100000; i++ { send(1, "ABCDEFGHIJ") //time.Sleep(time.Second/10000) } }

func send(id int,gdid string) { url := fmt.Sprintf(base_url, key_arr[id % key_arr_length], gdid, id)

if channel == nil {
    mqConnect()
}

err := channel.Publish("queue_url", "key_1", false, false, amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(url),
})
failOnErr(err, "")

}

func mqConnect() { var err error conn, err = amqp.Dial(mqurl) failOnErr(err, "failed to connect tp rabbitmq")

channel, err = conn.Channel()
failOnErr(err, "failed to open a channel")

}

func BytesToString(b []byte) string { s := bytes.NewBuffer(*b) r := s.String() return &r }

func failOnErr(err error, msg string) { if err != nil { log.Fatalf("%s:%s", msg, err) panic(fmt.Sprintf("%s:%s", msg, err)) } }

bombsimon commented 6 years ago

I think you should slim down your example and go back to basics. A lot of code isn't relevant to your question and some of it isn't even used in your example. Also, put all of it in formatting tags and use the preview to ensure readability.

If you look at the actual test code from the RabbitMQ tutorial you can see that they're using defer conn.Close() to gracefully close the connection. The reason for this is to drain the connection before exiting the application and ensure that all messages has been published to the broker.

Change your main so the program will wait for the connection to be drained to this:

func main(){
    sender()
    conn.Close()
}
vidmed commented 5 years ago

The problem is not in gracefully closing connection. I have the same problem. This issue https://github.com/streadway/amqp/issues/370 also describes lose of data.

bombsimon commented 5 years ago

@vidmed I don't think they're related. The code above in this issue is resolved by waiting for the connection (and that's why the author got it to work with time.Sleep). I'm not denying there's other issues but as long as you don't lose your connection the code above works perfectly fine.

In my opinion this issue should be closed.

michaelklishin commented 5 years ago

As mentioned in https://github.com/streadway/amqp/issues/370, publishing without publisher confirms is fire-and-forget (and entirely asynchronous).