valinurovam / garagemq

AMQP message broker implemented with golang
MIT License
230 stars 40 forks source link

[POSSIBLE BUG] - Publisher get frozen when publishing without any delay #104

Open sujit-baniya opened 8 months ago

sujit-baniya commented 8 months ago

I'm using BadgerDB as persistent DB and when trying to publish around 500K messages with Persistent messages for testing without any delay, it causes publisher to freeze.

package main

import (
    "bytes"
    "context"
    "fmt"
    "log"
    "time"

    amqp "github.com/oarkflow/amqp/amqp091"

    grabbit "github.com/oarkflow/amqp"
)

func OnPubReattempting(name string, retry int) bool {
    log.Printf("callback_redo: {%s} retry count {%d}", name, retry)
    return true // want continuing
}

// OnNotifyPublish CallbackNotifyPublish
func OnNotifyPublish(confirm amqp.Confirmation, ch *grabbit.Channel) {
    log.Printf("callback: publish confirmed status [%v] from queue [%s]\n", confirm.Ack, ch.Queue())
}

// OnNotifyReturn CallbackNotifyReturn
func OnNotifyReturn(_ amqp.Return, ch *grabbit.Channel) {
    log.Printf("callback: publish returned from queue [%s]\n", ch.Queue())
}

func PublishMsg(publisher *grabbit.Publisher, start, end int) {
    message := amqp.Publishing{DeliveryMode: amqp.Persistent} // This just runs for some messages and just freezes
    // message := amqp.Publishing{}
    message.Headers = map[string]any{
        "next-queue": "I'm loving it",
    }
    data := make([]byte, 0, 64)
    buff := bytes.NewBuffer(data)

    for i := start; i < end; i++ {
        // <-time.After(1 * time.Millisecond)
        buff.Reset()
        buff.WriteString(fmt.Sprintf("test number %04d", i))
        message.Body = buff.Bytes()
        log.Println("going to send:", buff.String())

        if err := publisher.Publish(message); err != nil {
            log.Println("publishing failed with: ", err)
        }
    }
}

func main() {
    ctxMaster, ctxCancel := context.WithCancel(context.TODO())
    conn := grabbit.NewConnection("amqp://guest:guest@localhost:5672", amqp.Config{}, grabbit.WithConnectionCtx(ctxMaster))
    pubOpt := grabbit.DefaultPublisherOptions()
    pubOpt.WithKey("workload").WithContext(ctxMaster).WithConfirmationsCount(20)

    topos := make([]*grabbit.TopologyOptions, 0, 8)
    topos = append(topos, &grabbit.TopologyOptions{
        Name:          "workload",
        IsDestination: true,
        Durable:       true,
        Declare:       true,
    })
    publisher := grabbit.NewPublisher(conn, pubOpt,
        grabbit.WithChannelCtx(ctxMaster),
        grabbit.WithChannelTopology(topos),
        grabbit.OnChannelRecovering(OnPubReattempting),
        grabbit.OnPublishSuccess(OnNotifyPublish),
        grabbit.OnPublishFailure(OnNotifyReturn),
    )
    if !publisher.AwaitAvailable(30*time.Second, 1*time.Second) {
        log.Println("publisher not ready yet")
        ctxCancel()
        return
    }

    PublishMsg(publisher, 0, 500000)
}

Screenshot

Seems Channel is closed before all messages are being published

valinurovam commented 8 months ago

It's looke like some issue is here

log.Printf("callback: publish confirmed status [%v] from queue [%s]\n", confirm.Ack, ch.Queue())

channel.Queue method has a lock

func (ch *Channel) Queue() string {
    ch.baseChan.mu.RLock()
    defer ch.baseChan.mu.RUnlock()

    return ch.queue
}

If you change ch.Queue() for simple string, for example "queueName" - all is good

valinurovam commented 8 months ago

It looks like lock/unluck on ch.Queue is useless and make some decrease in performance

sujit-baniya commented 8 months ago

@valinurovam Thank you for response.

So you mean, If I remove lock, it might work?

Question: Is lock necessary there? Because I don't see any operations that might cause deadlock

valinurovam commented 8 months ago

Looks like deadlock somewhere. If you change method like this

// Queue returns the active (as indicated by [IsDestination] option in topology options) queue name.
// Useful for finding the server assigned name.
func (ch *Channel) Queue() string {
    fmt.Println("try to get lock... ")
    ch.baseChan.mu.RLock()
    fmt.Println(" locked.")
    defer func() {
        fmt.Println(" unlocked.\n")
        ch.baseChan.mu.RUnlock()
    }()

    return ch.queue
}

Logs

try to get lock... 
2023/12/29 19:11:40 going to send: test number 1623
 locked.
 unlocked.

callback: publish confirmed status [true] from queue [workload]
try to get lock... 
2023/12/29 19:11:40 going to send: test number 1624
 locked.
 unlocked.

callback: publish confirmed status [true] from queue [workload]
2023/12/29 19:11:40 going to send: test number 1625
2023/12/29 19:11:40 going to send: test number 1626
2023/12/29 19:11:40 going to send: test number 1627
2023/12/29 19:11:40 going to send: test number 1628
try to get lock... 
2023/12/29 19:11:40 going to send: test number 1629
 locked.
 unlocked.

callback: publish confirmed status [true] from queue [workload]
try to get lock...