Open youboyTizzyT opened 4 years ago
confirms.go:45
// confirm confirms one publishing, increments the expecting delivery tag, and
// removes bookkeeping for that delivery tag.
func (c *confirms) confirm(confirmation Confirmation) {
delete(c.sequencer, c.expecting)
c.expecting++
for _, l := range c.listeners {
l <- confirmation
}
}
make my process can`t Publish get lock
/*
Publish sends a Publishing from the client to an exchange on the server.
When you want a single message to be delivered to a single queue, you can
publish to the default exchange with the routingKey of the queue name. This is
because every declared queue gets an implicit route to the default exchange.
Since publishings are asynchronous, any undeliverable message will get returned
by the server. Add a listener with Channel.NotifyReturn to handle any
undeliverable message when calling publish with either the mandatory or
immediate parameters as true.
Publishings can be undeliverable when the mandatory flag is true and no queue is
bound that matches the routing key, or when the immediate flag is true and no
consumer on the matched queue is ready to accept the delivery.
This can return an error when the channel, connection or socket is closed. The
error or lack of an error does not indicate whether the server has received this
publishing.
It is possible for publishing to not reach the broker if the underlying socket
is shut down without pending publishing packets being flushed from the kernel
buffers. The easy way of making it probable that all publishings reach the
server is to always call Connection.Close before terminating your publishing
application. The way to ensure that all publishings reach the server is to add
a listener to Channel.NotifyPublish and put the channel in confirm mode with
Channel.Confirm. Publishing delivery tags and their corresponding
confirmations start at 1. Exit when all publishings are confirmed.
When Publish does not return an error and the channel is in confirm mode, the
internal counter for DeliveryTags with the first confirmation starts at 1.
*/
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error {
if err := msg.Headers.Validate(); err != nil {
return err
}
ch.m.Lock()
defer ch.m.Unlock()
if err := ch.send(&basicPublish{
Exchange: exchange,
RoutingKey: key,
Mandatory: mandatory,
Immediate: immediate,
Body: msg.Body,
Properties: properties{
Headers: msg.Headers,
ContentType: msg.ContentType,
ContentEncoding: msg.ContentEncoding,
DeliveryMode: msg.DeliveryMode,
Priority: msg.Priority,
CorrelationId: msg.CorrelationId,
ReplyTo: msg.ReplyTo,
Expiration: msg.Expiration,
MessageId: msg.MessageId,
Timestamp: msg.Timestamp,
Type: msg.Type,
UserId: msg.UserId,
AppId: msg.AppId,
},
}); err != nil {
return err
}
if ch.confirming {
ch.confirms.Publish()
}
return nil
}
this is my code ,The method SendSyncMsg will be called every time there is a message
func (k *mqProducer) SendSyncMsg(topic string, msg *massageStruct.Msg) (err error) {
// 发布消息
body := serialization.SerializationMsg(msg)
if err = k.syncCh.Publish(
defaultExchangeNameSync, // exchange
topic, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/octet-stream",
Body: body,
}); err != nil {
log.Error(err, "Failed to publish a message")
return
}
ticker := time.NewTicker(5 * time.Second)
select {
case confirm := <-k.confirm:
if !confirm.Ack {
err = fmt.Errorf("Push failed! topic=%s", topic)
}
case <-ticker.C:
err = fmt.Errorf("out of limit time! topic=%s", topic)
}
return
}
please tell me ,thank you very much.
You do want to avoid using channels in parallel. I.e. many goroutines, many publishes with the same channel. In this library, I believe its safe, but its inefficient/blocking with all the locking unlocking. It's best to create a pool of channels/connections. I have a lot of examples in my repo for you to use!
Secondly, the channel creation part you are writing isn't included in the above code example.
Can you show the code of your channel creation?
To conduct a publish confirmation, you need to put the Channel in Confirm mode. Then susbcribe to NotifyPublish by passing in a confirmation chan
. Then Publish like you are currently doing. Then wait for confirmation. Pretty much all errors from publishing kill the Channel. So you have to make a new RabbitMQ channel on error. An error can be caused by your Exchange or Queue/Topic being missing, so double check your server for those to exist. I also believe there is a dead letter exchange in these events and if that isn't wired up for routing, the message will never be written to disk, then the server will not send back a confirmation if I recall correctly.
This is my example of making a Channel for each Publish and using Confirmations (I don't recommend creating a channel per publish as it is very hard on the RabbitMQ server churn) but provides a clearer picture for you to then improve upon.
// PublishWithConfirmationTransient sends a single message to the address on the letter with confirmation capabilities on transient Channels.
// This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority.
// A confirmation failure keeps trying to publish (at least until timeout failure occurs.)
func (pub *Publisher) PublishWithConfirmationTransient(letter *Letter, timeout time.Duration) {
if timeout == 0 {
timeout = pub.publishTimeOutDuration
}
for {
// Has to use an Ackable channel for Publish Confirmations.
channel := pub.ConnectionPool.GetTransientChannel()
channel.Confirm(false)
confirms := make(chan amqp.Confirmation, 1)
channel.NotifyPublish(confirms)
Publish:
timeoutAfter := time.After(timeout)
err := channel.Publish(
letter.Envelope.Exchange,
letter.Envelope.RoutingKey,
letter.Envelope.Mandatory,
letter.Envelope.Immediate,
amqp.Publishing{
ContentType: letter.Envelope.ContentType,
Body: letter.Body,
Headers: letter.Envelope.Headers,
DeliveryMode: letter.Envelope.DeliveryMode,
},
)
if err != nil {
channel.Close()
if pub.sleepOnErrorInterval < 0 {
time.Sleep(pub.sleepOnErrorInterval)
}
continue // Take it again! From the top!
}
// Wait for very next confirmation on this channel, which should be our confirmation.
for {
select {
case <-timeoutAfter:
// total timeout, handle it
channel.Close()
return
case confirmation := <-confirms:
if !confirmation.Ack {
goto Publish //nack has occurred, republish/retry in this method
}
// Happy Path, publish was received by server and we didn't timeout client side.
channel.Close()
return
default:
// Sleep between confirmation checks.
time.Sleep(time.Duration(time.Millisecond * 1)) // limits CPU spin up
}
}
}
}
I do heavily re-use channels - when it is feasible and have plenty of examples in my personal repo I linked above.
hello i use amqp.Channel.Publish to send Msg. phenomenon: now my go process can
t publish send Msg i think reason: i look code .have a goroutine status is "chan send block ",make my other goroutine can
t publish send Msg. in confirms.go:45 problem: can you tell me ,why confirms.go:45 can`t run next. If it is my use error,Please tell me how to use it correctly. thanks