Azure / go-amqp

AMQP 1.0 client library for Go.
https://github.com/Azure/go-amqp
MIT License
104 stars 56 forks source link

Error sending to queue #292

Closed alex3dm closed 1 year ago

alex3dm commented 1 year ago

Hello When sending to the queue at load, I get an error - received disposition frame with unknown link handle 1. connect to Artemis MQ version v0.19.1 your code - session.go https://github.com/Azure/go-amqp/blob/968a4440db46b6734d75df988442321e1b413749/session.go#L391

jhendrixMSFT commented 1 year ago

Can you please upgrade to v1.0.0 and see if the problem persists?

alex3dm commented 1 year ago

After updating to v1.0.0 the problem remained. Rewrote without caching the session, it began to work perfectly before: sender, err := producer.ctx.Session.NewSender(context.Background(), dest.GetQueueName(), nil) .... err = sender.Send(context.Background(), textMsgImpl.AmqMessage, &amqp.SendOptions{})

after: session, err := producer.ctx.client.NewSession(context.Background(), &amqp.SessionOptions{}) ..... sender, err := session.NewSender(context.Background(), dest.GetQueueName(), nil) .... err = sender.Send(context.Background(), textMsgImpl.AmqMessage, &amqp.SendOptions{})

Why does this problem occur during session caching?

jhendrixMSFT commented 1 year ago

Thanks for trying.

How does session caching work in your app?

alex3dm commented 1 year ago

The application creates a Sender object It creates 1 connection. Through this connection, a Session is created The method SendMessage of the Sender object sends a message to Artemis Link a diagram

jhendrixMSFT commented 1 year ago

I see. So if I understand correctly, there's a single Sender object that contains the *amqp.Conn and *amqp.Session which is presumably long-lived and used across multiple HTTP handler goroutines. Does each goroutine create a new *amqp.Sender? Is Sender.Close() called after sending the message?

alex3dm commented 1 year ago

Yes, that's right. After creating the sender *amqp.Sender, I close it with defer defer Sender.Close() at a load of up to 20 TPS it works correctly after 20 TPS of the established type of error - received disposition frame with unknown link handle 1

jhendrixMSFT commented 1 year ago

I'm writing a test for this. Below is what I came up with, but it doesn't reproduce the error. How does this compare to your program? Anything I should change?

func TestMultipleSendersSharedSession(t *testing.T) {
    if localBrokerAddr == "" {
        t.Skip()
    }

    checkLeaks := leaktest.Check(t)

    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    client, err := amqp.Dial(ctx, localBrokerAddr, nil)
    cancel()
    require.NoError(t, err)

    ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
    session, err := client.NewSession(ctx, nil)
    cancel()
    require.NoError(t, err)

    wg := &sync.WaitGroup{}
    for senders := 0; senders < 100; senders++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            sender, err := session.NewSender(context.Background(), "TestMultipleSendersSharedSession", nil)
            require.NoError(t, err)
            for msgCount := 0; msgCount < 1000; msgCount++ {
                ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
                err = sender.Send(ctx, amqp.NewMessage([]byte("test")), nil)
                cancel()
                require.NoError(t, err)
            }
            ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
            require.NoError(t, sender.Close(ctx))
            cancel()
        }()
    }

    wg.Wait()
    require.NoError(t, client.Close())
    checkLeaks()
}
alex3dm commented 1 year ago

correctly and size message 10 kb

func TestMultipleSendersSharedSession(t *testing.T) {
  if localBrokerAddr == "" {
    t.Skip()
  }

  checkLeaks := leaktest.Check(t)

  ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
  client, err := amqp.Dial(ctx, localBrokerAddr, nil)
  cancel()
  require.NoError(t, err)

  session, err := client.NewSession(context.Background(), nil)
  require.NoError(t, err)

  wg := &sync.WaitGroup{}
  for senders := 0; senders < 100; senders++ {
    wg.Add(1)
    go func() {
      defer wg.Done()
      for msgCount := 0; msgCount < 1000; msgCount++ {
        sender, err := session.NewSender(context.Background(), "TestMultipleSendersSharedSession", nil)
        require.NoError(t, err)
        err = sender.Send(context.Background(), amqp.NewMessage([]byte("test")), nil)
        require.NoError(t, err)
        sender.Close(context.Background())
        time.Sleep(time.Millisecond*100)
      }
    }()
  }

  wg.Wait()
  require.NoError(t, session.Close(context.Background()))
  require.NoError(t, client.Close())
  checkLeaks()
}
alex3dm commented 1 year ago

leaktest.Check(t) - what library are you using

jhendrixMSFT commented 1 year ago

That comes from github.com/fortytw2/leaktest

jhendrixMSFT commented 1 year ago

Thanks for the suggestions I can repro this now. Will investigate and get back to you.

jhendrixMSFT commented 1 year ago

Thanks for helping with the repro. I have a fix for this. Your workaround is also fine.

alex3dm commented 1 year ago

When will version 1.0.1 be released?

jhendrixMSFT commented 1 year ago

Hopefully today, just waiting on a review.

jhendrixMSFT commented 1 year ago

v1.0.1 with the fix is out, thanks for the bug report.