Azure / go-amqp

AMQP 1.0 client library for Go.
https://github.com/Azure/go-amqp
MIT License
104 stars 56 forks source link

Closing sender/receiver hangs #310

Closed AdminBenni closed 11 months ago

AdminBenni commented 11 months ago

We are connecting to a local RabbitMQ instance and everything works until it comes to closing the sender/receiver, where it hangs.

func TestManual(t *testing.T) {
    conn, err := amqp.Dial(context.TODO(), "amqp://guest:guest@localhost:5672/", nil)
    defer func(conn *amqp.Conn) {
        fmt.Println("Closing Conn")
        err := conn.Close()
        if err != nil {
        }
    }(conn)
    if err != nil {
        t.Fatal(err)
    }
    session, err := conn.NewSession(context.TODO(), nil)
    defer func(session *amqp.Session, ctx context.Context) {
        fmt.Println("Closing Session")
        err := session.Close(ctx)
        if err != nil {
            t.Fatal(err)
        }
    }(session, context.TODO())
    if err != nil {
        t.Fatal(err)
    }
    sender, err := session.NewSender(context.TODO(), "test-target", nil)
    defer func(sender *amqp.Sender, ctx context.Context) {
        fmt.Println("Closing Sender")
        err := sender.Close(context.TODO())
        if err != nil {
            t.Fatal(err)
        }
    }(sender, context.TODO())
    msg := &amqp.Message{
        Data: make([][]byte, 0),
    }
    msg.Data = append(msg.Data, []byte("my awesome payload"))
    err = sender.Send(context.TODO(), msg, nil)
    if err != nil {
        t.Fatal(err)
    }
    receiver, err := session.NewReceiver(context.TODO(), "test-target", nil)
    defer func(receiver *amqp.Receiver, ctx context.Context) {
        fmt.Println("Closing Receiver")
        err := receiver.Close(context.TODO())
        if err != nil {
            t.Fatal(err)
        }
    }(receiver, context.TODO())
    msg, err = receiver.Receive(context.TODO(), nil)
    if err != nil {
        t.Fatal(err)
    }
    err = receiver.AcceptMessage(context.TODO(), msg)
    if err != nil {
        t.Fatal(err)
    }
    fmt.Printf("%s\n", msg.Data[0])
}

Here is the debug log from the code above.

=== RUN   TestManual
11:23:40.640728 RX (negotiateSASL 0xc000002180): Frame{Type: AMQP, Channel: 0, Body: SaslMechanisms{Mechanisms : [ANONYMOUS AMQPLAIN PLAIN]}}
11:23:40.641814 TX (ConnSASLPlain 0xc000002180): Frame{Type: SASL, Channel: 0, Body: SaslInit{Mechanism : PLAIN, InitialResponse: ********, Hostname: }}
11:23:40.642877 RX (saslOutcome 0xc000002180): Frame{Type: AMQP, Channel: 0, Body: SaslOutcome{Code : 0, AdditionalData: []}}
11:23:40.643409 TX (openAMQP 0xc000002180): Frame{Type: AMQP, Channel: 0, Body: Open{ContainerID : Pkj63ORDWQlJ9JNq9jbPxkidqaNpyR5aZ5vPiiBCSdRYDADKsB357A, Hostname: localhost, MaxFrameSize: 65536, ChannelMax: 65535, IdleTimeout: 30s, OutgoingLocales: [], IncomingLocales: [], OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:23:40.644477 RX (openAMQP 0xc000002180): Frame{Type: AMQP, Channel: 0, Body: Open{ContainerID : rabbit@rabbitmq, Hostname: , MaxFrameSize: 65536, ChannelMax: 65535, IdleTimeout: 1m0s, OutgoingLocales: [], IncomingLocales: [], OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[cluster_name:rabbit@rabbitmq copyright:Copyright (c) 2007-2023 VMware, Inc. or its affiliates. information:Licensed under the MPL 2.0. Website: https://rabbitmq.com platform:Erlang/OTP 25.3.2.5 product:RabbitMQ version:3.12.4]}}
11:23:40.644477 TX (Conn 0xc000002180): cleaning up 0 abandoned sessions
11:23:40.644477 TX (Session 0xc0000ac340) mux frame to Conn (0xc000002180): Begin{RemoteChannel: <nil>, NextOutgoingID: 0, IncomingWindow: 5000, OutgoingWindow: 5000, HandleMax: 4294967294, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
11:23:40.644477 TX (Conn 0xc000002180): mux frame to connWriter: Frame{Type: AMQP, Channel: 0, Body: Begin{RemoteChannel: <nil>, NextOutgoingID: 0, IncomingWindow: 5000, OutgoingWindow: 5000, HandleMax: 4294967294, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:23:40.644477 TX (connWriter 0xc000002180) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Begin{RemoteChannel: <nil>, NextOutgoingID: 0, IncomingWindow: 5000, OutgoingWindow: 5000, HandleMax: 4294967294, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:23:40.646061 RX (connReader 0xc000002180): Frame{Type: AMQP, Channel: 0, Body: Begin{RemoteChannel: 0, NextOutgoingID: 0, IncomingWindow: 65535, OutgoingWindow: 65535, HandleMax: 4294967294, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:23:40.646061 RX (connReader 0xc000002180): mux frame to Session (0xc0000ac340): Frame{Type: AMQP, Channel: 0, Body: Begin{RemoteChannel: 0, NextOutgoingID: 0, IncomingWindow: 65535, OutgoingWindow: 65535, HandleMax: 4294967294, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:23:40.646571 TX (Session 0xc0000ac340): cleaning up 0 abandoned links
11:23:40.646609 TX (Session 0xc0000ac340): 0, Attach{Name: 1SCL-40mHT3pArPwArVtg4QLBNFz8ETQvz3R2uxhoJtaM3vhKJnRjg, Handle: 0, Role: Sender, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: , Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: test-target, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
11:23:40.646609 TX (Session 0xc0000ac340) mux frame to Conn (0xc000002180): Attach{Name: 1SCL-40mHT3pArPwArVtg4QLBNFz8ETQvz3R2uxhoJtaM3vhKJnRjg, Handle: 0, Role: Sender, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: , Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: test-target, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
11:23:40.646609 TX (Conn 0xc000002180): mux frame to connWriter: Frame{Type: AMQP, Channel: 0, Body: Attach{Name: 1SCL-40mHT3pArPwArVtg4QLBNFz8ETQvz3R2uxhoJtaM3vhKJnRjg, Handle: 0, Role: Sender, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: , Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: test-target, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:23:40.646609 TX (connWriter 0xc000002180) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Attach{Name: 1SCL-40mHT3pArPwArVtg4QLBNFz8ETQvz3R2uxhoJtaM3vhKJnRjg, Handle: 0, Role: Sender, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: , Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: test-target, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:23:40.646609 TX (link 0xc0001ba000): mux frame to Session (0xc0000ac340): Attach{Name: 1SCL-40mHT3pArPwArVtg4QLBNFz8ETQvz3R2uxhoJtaM3vhKJnRjg, Handle: 0, Role: Sender, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: , Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: test-target, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
11:23:40.648199 RX (connReader 0xc000002180): Frame{Type: AMQP, Channel: 0, Body: Attach{Name: 1SCL-40mHT3pArPwArVtg4QLBNFz8ETQvz3R2uxhoJtaM3vhKJnRjg, Handle: 0, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: , Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: test-target, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:23:40.648199 RX (connReader 0xc000002180): mux frame to Session (0xc0000ac340): Frame{Type: AMQP, Channel: 0, Body: Attach{Name: 1SCL-40mHT3pArPwArVtg4QLBNFz8ETQvz3R2uxhoJtaM3vhKJnRjg, Handle: 0, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: , Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: test-target, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:23:40.648199 RX (connReader 0xc000002180): Frame{Type: AMQP, Channel: 0, Body: Flow{NextIncomingID: 0, IncomingWindow: 65535, NextOutgoingID: 0, OutgoingWindow: 65535, Handle: 0, DeliveryCount: <nil>, LinkCredit: 65536, Available: <nil>, Drain: false, Echo: false, Properties: map[]}}
11:23:40.648199 RX (connReader 0xc000002180): mux frame to Session (0xc0000ac340): Frame{Type: AMQP, Channel: 0, Body: Flow{NextIncomingID: 0, IncomingWindow: 65535, NextOutgoingID: 0, OutgoingWindow: 65535, Handle: 0, DeliveryCount: <nil>, LinkCredit: 65536, Available: <nil>, Drain: false, Echo: false, Properties: map[]}}
11:23:40.648199 RX (Session 0xc0000ac340): Attach{Name: 1SCL-40mHT3pArPwArVtg4QLBNFz8ETQvz3R2uxhoJtaM3vhKJnRjg, Handle: 0, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: , Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: test-target, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
11:23:40.648199 RX (Session 0xc0000ac340): mux frame to link (0xc0001ba000): 1SCL-40mHT3pArPwArVtg4QLBNFz8ETQvz3R2uxhoJtaM3vhKJnRjg, Attach{Name: 1SCL-40mHT3pArPwArVtg4QLBNFz8ETQvz3R2uxhoJtaM3vhKJnRjg, Handle: 0, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: , Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: test-target, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
11:23:40.648199 RX (Session 0xc0000ac340): Flow{NextIncomingID: 0, IncomingWindow: 65535, NextOutgoingID: 0, OutgoingWindow: 65535, Handle: 0, DeliveryCount: <nil>, LinkCredit: 65536, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
11:23:40.648199 RX (Session 0xc0000ac340): flow - remoteOutgoingWindow: 65535 remoteIncomingWindow: 65535 nextOutgoingID: 0
11:23:40.648199 RX (Session 0xc0000ac340): mux frame to link (0xc0001ba000): 1SCL-40mHT3pArPwArVtg4QLBNFz8ETQvz3R2uxhoJtaM3vhKJnRjg, Flow{NextIncomingID: 0, IncomingWindow: 65535, NextOutgoingID: 0, OutgoingWindow: 65535, Handle: 0, DeliveryCount: <nil>, LinkCredit: 65536, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
11:23:40.648199 TX (Sender 0xc0001ba000) (pause): target: "test-target", link credit: 0, deliveryCount: 0
11:23:40.648199 RX (Sender 0xc0001ba000): Flow{NextIncomingID: 0, IncomingWindow: 65535, NextOutgoingID: 0, OutgoingWindow: 65535, Handle: 0, DeliveryCount: <nil>, LinkCredit: 65536, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
11:23:40.648199 TX (Sender 0xc0001ba000) (enable): target: "test-target", link credit: 65536, deliveryCount: 0
11:23:40.648199 TX (Sender 0xc0001ba000): mux transfer to Session: 0, Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: 0000000000000000, MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 23}
11:23:40.648199 TX (Sender 0xc0001ba000): link: 1SCL-40mHT3pArPwArVtg4QLBNFz8ETQvz3R2uxhoJtaM3vhKJnRjg, link credit: 65535
11:23:40.648199 TX (Sender 0xc0001ba000) (enable): target: "test-target", link credit: 65535, deliveryCount: 1
11:23:40.648199 TX (Session 0xc0000ac340): 0, Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: 0000000000000000, MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 23}
11:23:40.648199 TX (Session 0xc0000ac340) mux frame to Conn (0xc000002180): Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: 0000000000000000, MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 23}
11:23:40.648199 TX (Conn 0xc000002180): mux frame to connWriter: Frame{Type: AMQP, Channel: 0, Body: Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: 0000000000000000, MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 23}}
11:23:40.648199 TX (connWriter 0xc000002180) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: 0000000000000000, MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 23}}
11:23:40.648730 RX (connReader 0xc000002180): Frame{Type: AMQP, Channel: 0, Body: Disposition{Role: Receiver, First: 0, Last: 0, Settled: true, State: Accepted, Batchable: false}}
11:23:40.648730 RX (connReader 0xc000002180): mux frame to Session (0xc0000ac340): Frame{Type: AMQP, Channel: 0, Body: Disposition{Role: Receiver, First: 0, Last: 0, Settled: true, State: Accepted, Batchable: false}}
11:23:40.648730 RX (Session 0xc0000ac340): Disposition{Role: Receiver, First: 0, Last: 0, Settled: true, State: Accepted, Batchable: false}
11:23:40.648730 RX (Session 0xc0000ac340): mux frame to link (0xc0001ba000): 1SCL-40mHT3pArPwArVtg4QLBNFz8ETQvz3R2uxhoJtaM3vhKJnRjg, Disposition{Role: Receiver, First: 0, Last: 0, Settled: true, State: Accepted, Batchable: false}

11:23:40.648730 RX (Sender 0xc0001ba000): Disposition{Role: Receiver, First: 0, Last: 0, Settled: true, State: Accepted, Batchable: false}
11:23:40.648730 TX (Sender 0xc0001ba000) (enable): target: "test-target", link credit: 65535, deliveryCount: 1
11:23:40.648730 TX (Session 0xc0000ac340): cleaning up 0 abandoned links
11:23:40.648730 TX (link 0xc00023e000): mux frame to Session (0xc0000ac340): Attach{Name: qeXi1jtfOX1SiwNAkJ-gQAeIslwiKXVToBuO4kKJHtPGBFUiEhJojQ, Handle: 1, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: test-target, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: , Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
11:23:40.648730 TX (Session 0xc0000ac340): 0, Attach{Name: qeXi1jtfOX1SiwNAkJ-gQAeIslwiKXVToBuO4kKJHtPGBFUiEhJojQ, Handle: 1, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: test-target, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: , Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
11:23:40.649262 TX (Session 0xc0000ac340) mux frame to Conn (0xc000002180): Attach{Name: qeXi1jtfOX1SiwNAkJ-gQAeIslwiKXVToBuO4kKJHtPGBFUiEhJojQ, Handle: 1, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: test-target, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: , Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
11:23:40.649262 TX (Conn 0xc000002180): mux frame to connWriter: Frame{Type: AMQP, Channel: 0, Body: Attach{Name: qeXi1jtfOX1SiwNAkJ-gQAeIslwiKXVToBuO4kKJHtPGBFUiEhJojQ, Handle: 1, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: test-target, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: , Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:23:40.649262 TX (connWriter 0xc000002180) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Attach{Name: qeXi1jtfOX1SiwNAkJ-gQAeIslwiKXVToBuO4kKJHtPGBFUiEhJojQ, Handle: 1, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: test-target, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: , Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:23:40.650368 RX (connReader 0xc000002180): Frame{Type: AMQP, Channel: 0, Body: Attach{Name: qeXi1jtfOX1SiwNAkJ-gQAeIslwiKXVToBuO4kKJHtPGBFUiEhJojQ, Handle: 1, Role: Sender, SenderSettleMode: unsettled, ReceiverSettleMode: <nil>, Source: source{Address: test-target, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: Released Outcomes: [amqp:accepted:list amqp:rejected:list amqp:released:list amqp:modified:list], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:23:40.650368 RX (connReader 0xc000002180): mux frame to Session (0xc0000ac340): Frame{Type: AMQP, Channel: 0, Body: Attach{Name: qeXi1jtfOX1SiwNAkJ-gQAeIslwiKXVToBuO4kKJHtPGBFUiEhJojQ, Handle: 1, Role: Sender, SenderSettleMode: unsettled, ReceiverSettleMode: <nil>, Source: source{Address: test-target, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: Released Outcomes: [amqp:accepted:list amqp:rejected:list amqp:released:list amqp:modified:list], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:23:40.650368 RX (Session 0xc0000ac340): Attach{Name: qeXi1jtfOX1SiwNAkJ-gQAeIslwiKXVToBuO4kKJHtPGBFUiEhJojQ, Handle: 1, Role: Sender, SenderSettleMode: unsettled, ReceiverSettleMode: <nil>, Source: source{Address: test-target, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: Released Outcomes: [amqp:accepted:list amqp:rejected:list amqp:released:list amqp:modified:list], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
11:23:40.650897 RX (Session 0xc0000ac340): mux frame to link (0xc00023e000): qeXi1jtfOX1SiwNAkJ-gQAeIslwiKXVToBuO4kKJHtPGBFUiEhJojQ, Attach{Name: qeXi1jtfOX1SiwNAkJ-gQAeIslwiKXVToBuO4kKJHtPGBFUiEhJojQ, Handle: 1, Role: Sender, SenderSettleMode: unsettled, ReceiverSettleMode: <nil>, Source: source{Address: test-target, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: Released Outcomes: [amqp:accepted:list amqp:rejected:list amqp:released:list amqp:modified:list], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
11:23:40.650897 TX (Session 0xc0000ac340): 0, Flow{NextIncomingID: <nil>, IncomingWindow: 0, NextOutgoingID: 0, OutgoingWindow: 0, Handle: 1, DeliveryCount: 0, LinkCredit: 1, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
11:23:40.650897 TX (Session 0xc0000ac340) mux frame to Conn (0xc000002180): Flow{NextIncomingID: 0, IncomingWindow: 5000, NextOutgoingID: 1, OutgoingWindow: 5000, Handle: 1, DeliveryCount: 0, LinkCredit: 1, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
11:23:40.650897 TX (Conn 0xc000002180): mux frame to connWriter: Frame{Type: AMQP, Channel: 0, Body: Flow{NextIncomingID: 0, IncomingWindow: 5000, NextOutgoingID: 1, OutgoingWindow: 5000, Handle: 1, DeliveryCount: 0, LinkCredit: 1, Available: <nil>, Drain: false, Echo: false, Properties: map[]}}
11:23:40.650897 TX (connWriter 0xc000002180) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Flow{NextIncomingID: 0, IncomingWindow: 5000, NextOutgoingID: 1, OutgoingWindow: 5000, Handle: 1, DeliveryCount: 0, LinkCredit: 1, Available: <nil>, Drain: false, Echo: false, Properties: map[]}}
11:23:40.650897 TX (Receiver 0xc00023e000): mux frame to Session (0xc0000ac340): 0, Flow{NextIncomingID: <nil>, IncomingWindow: 5000, NextOutgoingID: 1, OutgoingWindow: 5000, Handle: 1, DeliveryCount: 0, LinkCredit: 1, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
11:23:40.651968 RX (connReader 0xc000002180): Frame{Type: AMQP, Channel: 0, Body: Flow{NextIncomingID: 1, IncomingWindow: 65534, NextOutgoingID: 0, OutgoingWindow: 65535, Handle: 1, DeliveryCount: 0, LinkCredit: 1, Available: 2, Drain: false, Echo: false, Properties: map[]}}
11:23:40.651968 RX (connReader 0xc000002180): mux frame to Session (0xc0000ac340): Frame{Type: AMQP, Channel: 0, Body: Flow{NextIncomingID: 1, IncomingWindow: 65534, NextOutgoingID: 0, OutgoingWindow: 65535, Handle: 1, DeliveryCount: 0, LinkCredit: 1, Available: 2, Drain: false, Echo: false, Properties: map[]}}
11:23:40.651968 RX (Session 0xc0000ac340): Flow{NextIncomingID: 1, IncomingWindow: 65534, NextOutgoingID: 0, OutgoingWindow: 65535, Handle: 1, DeliveryCount: 0, LinkCredit: 1, Available: 2, Drain: false, Echo: false, Properties: map[]}
11:23:40.651968 RX (Session 0xc0000ac340): flow - remoteOutgoingWindow: 65535 remoteIncomingWindow: 65534 nextOutgoingID: 1
11:23:40.651968 RX (Session 0xc0000ac340): mux frame to link (0xc00023e000): qeXi1jtfOX1SiwNAkJ-gQAeIslwiKXVToBuO4kKJHtPGBFUiEhJojQ, Flow{NextIncomingID: 1, IncomingWindow: 65534, NextOutgoingID: 0, OutgoingWindow: 65535, Handle: 1, DeliveryCount: 0, LinkCredit: 1, Available: 2, Drain: false, Echo: false, Properties: map[]}
11:23:40.651968 RX (Receiver 0xc00023e000): Flow{NextIncomingID: 1, IncomingWindow: 65534, NextOutgoingID: 0, OutgoingWindow: 65535, Handle: 1, DeliveryCount: 0, LinkCredit: 1, Available: 2, Drain: false, Echo: false, Properties: map[]}
11:23:40.652503 RX (connReader 0xc000002180): Frame{Type: AMQP, Channel: 0, Body: Transfer{Handle: 1, DeliveryID: 0, DeliveryTag: 0000000000000001, MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 65}}
11:23:40.652503 RX (connReader 0xc000002180): mux frame to Session (0xc0000ac340): Frame{Type: AMQP, Channel: 0, Body: Transfer{Handle: 1, DeliveryID: 0, DeliveryTag: 0000000000000001, MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 65}}
11:23:40.652503 RX (Session 0xc0000ac340): Transfer{Handle: 1, DeliveryID: 0, DeliveryTag: 0000000000000001, MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 65}
11:23:40.652503 RX (Session 0xc0000ac340): mux frame to link (0xc00023e000): qeXi1jtfOX1SiwNAkJ-gQAeIslwiKXVToBuO4kKJHtPGBFUiEhJojQ, Transfer{Handle: 1, DeliveryID: 0, DeliveryTag: 0000000000000001, MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 65}
11:23:40.652503 RX (Receiver 0xc00023e000): Transfer{Handle: 1, DeliveryID: 0, DeliveryTag: 0000000000000001, MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 65}
11:23:40.652503 RX (Receiver 0xc00023e000): add unsettled delivery ID 0
11:23:40.652503 RX (Receiver 0xc00023e000) link qeXi1jtfOX1SiwNAkJ-gQAeIslwiKXVToBuO4kKJHtPGBFUiEhJojQ - deliveryCount: 1, linkCredit: 0, len(messages): 1
11:23:40.652503 RX (Receiver 0xc00023e000): received delivery ID 0
11:23:40.652503 RX (Receiver 0xc00023e000) (pause): source: "test-target", inflight: 0, linkCredit: 0, deliveryCount: 1, messages: 0, unsettled: 1, settlementCount: 0, settleMode: first
11:23:40.652503 TX (link 0xc00023e000): mux frame to Session (0xc0000ac340): Disposition{Role: Receiver, First: 0, Last: <nil>, Settled: true, State: Accepted, Batchable: false}
11:23:40.652503 RX (Receiver 0xc00023e000) (pause): source: "test-target", inflight: 0, linkCredit: 0, deliveryCount: 1, messages: 0, unsettled: 1, settlementCount: 0, settleMode: first
11:23:40.652503 TX (Session 0xc0000ac340): 0, Disposition{Role: Receiver, First: 0, Last: <nil>, Settled: true, State: Accepted, Batchable: false}
11:23:40.652503 TX (Session 0xc0000ac340) mux frame to Conn (0xc000002180): Disposition{Role: Receiver, First: 0, Last: <nil>, Settled: true, State: Accepted, Batchable: false}
11:23:40.652503 TX (Receiver 0xc00023e000): mux txDisposition Disposition{Role: Receiver, First: 0, Last: <nil>, Settled: true, State: Accepted, Batchable: false}
11:23:40.652503 TX (connWriter 0xc000002180) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Disposition{Role: Receiver, First: 0, Last: <nil>, Settled: true, State: Accepted, Batchable: false}}
11:23:40.652503 TX (Conn 0xc000002180): mux frame to connWriter: Frame{Type: AMQP, Channel: 0, Body: Disposition{Role: Receiver, First: 0, Last: <nil>, Settled: true, State: Accepted, Batchable: false}}
my awesome payload
Closing Receiver
11:23:40.652503 RX (Receiver 0xc00023e000) (auto): source: "test-target", inflight: 0, linkCredit: 0, deliveryCount: 1, messages: 0, unsettled: 0, settlementCount: 1, settleMode: first
11:23:40.652503 RX (Receiver 0xc00023e000) (flow): source: "test-target", inflight: 0, curLinkCredit: 0, newLinkCredit: 1, drain: false, deliveryCount: 1, messages: 0, unsettled: 0, settlementCount: 1, settleMode: first
11:23:40.652503 TX (Receiver 0xc00023e000): mux frame to Session (0xc0000ac340): 0, Flow{NextIncomingID: <nil>, IncomingWindow: 0, NextOutgoingID: 0, OutgoingWindow: 0, Handle: 1, DeliveryCount: 1, LinkCredit: 1, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
11:23:40.652503 TX (Session 0xc0000ac340): 0, Flow{NextIncomingID: <nil>, IncomingWindow: 0, NextOutgoingID: 0, OutgoingWindow: 0, Handle: 1, DeliveryCount: 1, LinkCredit: 1, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
11:23:40.653037 TX (Session 0xc0000ac340) mux frame to Conn (0xc000002180): Flow{NextIncomingID: 1, IncomingWindow: 5000, NextOutgoingID: 1, OutgoingWindow: 5000, Handle: 1, DeliveryCount: 1, LinkCredit: 1, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
11:23:40.653037 TX (Conn 0xc000002180): mux frame to connWriter: Frame{Type: AMQP, Channel: 0, Body: Flow{NextIncomingID: 1, IncomingWindow: 5000, NextOutgoingID: 1, OutgoingWindow: 5000, Handle: 1, DeliveryCount: 1, LinkCredit: 1, Available: <nil>, Drain: false, Echo: false, Properties: map[]}}
11:23:40.653037 TX (Session 0xc0000ac340): 0, Detach{Handle: 1, Closed: true, Error: *Error(nil)}
11:23:40.653037 TX (link 0xc00023e000): mux frame to Session (0xc0000ac340): Detach{Handle: 1, Closed: true, Error: *Error(nil)}
11:23:40.653037 TX (Session 0xc0000ac340) mux frame to Conn (0xc000002180): Detach{Handle: 1, Closed: true, Error: *Error(nil)}
11:23:40.653037 TX (connWriter 0xc000002180) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Flow{NextIncomingID: 1, IncomingWindow: 5000, NextOutgoingID: 1, OutgoingWindow: 5000, Handle: 1, DeliveryCount: 1, LinkCredit: 1, Available: <nil>, Drain: false, Echo: false, Properties: map[]}}
11:23:40.653037 TX (connWriter 0xc000002180) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Detach{Handle: 1, Closed: true, Error: *Error(nil)}}
11:23:40.653037 TX (Conn 0xc000002180): mux frame to connWriter: Frame{Type: AMQP, Channel: 0, Body: Detach{Handle: 1, Closed: true, Error: *Error(nil)}}
11:23:40.700253 RX (connReader 0xc000002180): Frame{Type: AMQP, Channel: 0, Body: Flow{NextIncomingID: 1, IncomingWindow: 65534, NextOutgoingID: 1, OutgoingWindow: 65534, Handle: 1, DeliveryCount: 1, LinkCredit: 1, Available: 1, Drain: false, Echo: false, Properties: map[]}}
11:23:40.700253 RX (connReader 0xc000002180): mux frame to Session (0xc0000ac340): Frame{Type: AMQP, Channel: 0, Body: Flow{NextIncomingID: 1, IncomingWindow: 65534, NextOutgoingID: 1, OutgoingWindow: 65534, Handle: 1, DeliveryCount: 1, LinkCredit: 1, Available: 1, Drain: false, Echo: false, Properties: map[]}}
11:23:40.700490 RX (Session 0xc0000ac340): Flow{NextIncomingID: 1, IncomingWindow: 65534, NextOutgoingID: 1, OutgoingWindow: 65534, Handle: 1, DeliveryCount: 1, LinkCredit: 1, Available: 1, Drain: false, Echo: false, Properties: map[]}
11:23:40.700490 RX (Session 0xc0000ac340): flow - remoteOutgoingWindow: 65534 remoteIncomingWindow: 65534 nextOutgoingID: 1
11:23:40.700490 RX (Session 0xc0000ac340): mux frame to link (0xc00023e000): qeXi1jtfOX1SiwNAkJ-gQAeIslwiKXVToBuO4kKJHtPGBFUiEhJojQ, Flow{NextIncomingID: 1, IncomingWindow: 65534, NextOutgoingID: 1, OutgoingWindow: 65534, Handle: 1, DeliveryCount: 1, LinkCredit: 1, Available: 1, Drain: false, Echo: false, Properties: map[]}
11:23:40.700490 RX (Receiver 0xc00023e000): Flow{NextIncomingID: 1, IncomingWindow: 65534, NextOutgoingID: 1, OutgoingWindow: 65534, Handle: 1, DeliveryCount: 1, LinkCredit: 1, Available: 1, Drain: false, Echo: false, Properties: map[]}
11:23:40.700490 RX (connReader 0xc000002180): Frame{Type: AMQP, Channel: 0, Body: Detach{Handle: 1, Closed: false, Error: *Error(nil)}}
11:23:40.700490 RX (connReader 0xc000002180): mux frame to Session (0xc0000ac340): Frame{Type: AMQP, Channel: 0, Body: Detach{Handle: 1, Closed: false, Error: *Error(nil)}}
11:23:40.700490 RX (Session 0xc0000ac340): Detach{Handle: 1, Closed: false, Error: *Error(nil)}
11:23:40.700490 RX (Session 0xc0000ac340): mux frame to link (0xc00023e000): qeXi1jtfOX1SiwNAkJ-gQAeIslwiKXVToBuO4kKJHtPGBFUiEhJojQ, Detach{Handle: 1, Closed: false, Error: *Error(nil)}
11:23:40.700490 RX (Receiver 0xc00023e000): Detach{Handle: 1, Closed: false, Error: *Error(nil)}
11:23:40.700490 TX (link 0xc00023e000) close error already pending, discarding *Error{Condition: amqp:not-implemented, Description: non-closing detach not supported: Detach{Handle: 1, Closed: false, Error: *Error(nil)}, Info: map[]}
11:23:55.645331 RX (connReader 0xc000002180): received keep-alive frame
11:24:10.646208 RX (connReader 0xc000002180): received keep-alive frame
11:24:10.646415 TX (connWriter 0xc000002180): sending keep-alive frame
11:24:25.647351 RX (connReader 0xc000002180): received keep-alive frame
11:24:40.647136 TX (connWriter 0xc000002180): sending keep-alive frame
11:24:40.648136 RX (connReader 0xc000002180): received keep-alive frame

==== WE MANUALLY KILL THE PROCESS HERE ====

Process finished with the exit code 1
jhendrixMSFT commented 11 months ago

Thanks for the detailed bug report, it's super helpful.

I believe that RabbitMQ is in violation of the AMQP 1.0 spec, specifically section 2.6.6.

Note that one peer MAY send a closing detach while its partner is sending a non-closing detach. In this case, the partner MUST signal that it has closed the link by reattaching and then sending a closing detach.

Here you can see that go-amqp sent the Detach performative to RabbitMQ indicating it's closing the link.

TX (connWriter 0xc000002180) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Detach{Handle: 1, Closed: true, Error: *Error(nil)}}

However, RabbitMQ responds with a non-closing Detach performative.

RX (connReader 0xc000002180): Frame{Type: AMQP, Channel: 0, Body: Detach{Handle: 1, Closed: false, Error: *Error(nil)}}

While this is ok (although I don't think the Detach performatives were simultaneously sent), per section 2.6.6, RabbitMQ MUST reattach the link and send a closing Detach performative.

Are there any logs etc on the RabbitMQ side that might indicate why it didn't close the link? In the meantime, you'll want to use a context.Context with a deadline/timeout when calling close to avoid the hang.

AdminBenni commented 11 months ago

Thanks so much for the quick response. Thanks to your insight we have already switched to ActiveMQ and it works without issue.

In regards to the RabbitMQ logs, unfortunately, even with the highest verbosity setting, they don't tell us much.

2023-09-13 16:02:23.457469+00:00 [info] <0.534.0> Server startup complete; 5 plugins started.
2023-09-13 16:02:23.457469+00:00 [info] <0.534.0>  * rabbitmq_prometheus
2023-09-13 16:02:23.457469+00:00 [info] <0.534.0>  * rabbitmq_amqp1_0
2023-09-13 16:02:23.457469+00:00 [info] <0.534.0>  * rabbitmq_management
2023-09-13 16:02:23.457469+00:00 [info] <0.534.0>  * rabbitmq_management_agent
2023-09-13 16:02:23.457469+00:00 [info] <0.534.0>  * rabbitmq_web_dispatch
2023-09-13 16:02:23.457568+00:00 [debug] <0.534.0> Marking RabbitMQ as running
2023-09-13 16:02:23.457593+00:00 [debug] <0.534.0> Change boot state to `ready`
2023-09-13 16:02:23.457635+00:00 [debug] <0.134.0> Boot state/systemd: notifying of state `ready`
2023-09-13 16:02:23.588281+00:00 [debug] <0.9.0> Time to start RabbitMQ: 4707285 us
2023-09-13 16:02:38.221522+00:00 [info] <0.684.0> accepting AMQP connection <0.684.0> (172.18.0.1:55032 -> 172.18.0.2:5672)
2023-09-13 16:02:38.222850+00:00 [debug] <0.684.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2023-09-13 16:02:38.222921+00:00 [info] <0.684.0> AMQP 1.0 connection <0.684.0>: user 'guest' authenticated
2023-09-13 16:02:38.224060+00:00 [debug] <0.684.0> AMQP 1.0 connection.open frame: hostname = localhost, extracted vhost = /, idle_timeout = 60000
2023-09-13 16:02:38.225625+00:00 [warning] <0.691.0> AMQP 0-9-1 client call timeout was 70000 ms, is updated to a safe effective value of 130000 ms
2023-09-13 16:02:38.230534+00:00 [debug] <0.695.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2023-09-13 16:02:38.231800+00:00 [debug] <0.684.0> AMQP 1.0 opened channel = {0,<0.691.0>}
2023-09-13 16:02:38.231850+00:00 [info] <0.684.0> AMQP 1.0 connection <0.684.0>: user 'guest' authenticated and granted access to vhost '/'
2023-09-13 16:02:38.615953+00:00 [debug] <0.736.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2023-09-13 16:02:38.624706+00:00 [debug] <0.744.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
jhendrixMSFT commented 11 months ago

Glad you found a solution. I'm going to close this issue then given that RabbitMQ is doing the wrong thing.