Azure / go-amqp

AMQP 1.0 client library for Go.
https://github.com/Azure/go-amqp
MIT License
104 stars 56 forks source link

Encountering protocol errors from Azure IoTHub with Gateway v2 #326

Closed mswartley closed 3 months ago

mswartley commented 3 months ago

We started encountering the following errors from the go-amqp library when sending messages to certain Azure IoTHub instances:

protocol error: received flow without next-incoming-id after session established

It took a while, but Azure support determined that the issue is related to the version of the Gateway component in IoTHub:

However, I have tested with the Apache QPID client against IoTHub with both Gateway v1 and v2 and have not encountered this error.

My test code is below, as well as logs from both a successful send to IoTHub with Gateway v1 and a failed send to IoTHub with Gateway v2. Note that the successful and failure log files are from 2 different IoTHub instances. Once the instance that returned the failure is downgraded to Gateway v1, the same code results in a successful send.

Have you seen this error in any of your testing or integration with Azure IoTHub?

import (
    "context"
    "fmt"
    "log"
    "testing"

    "github.com/Azure/go-amqp"
)

func TestBadIoTHub(t *testing.T) {
    ctx := context.Background()

    host := "bad-iothub-001.azure-devices.net"
    uname := "iothubowner@sas.root.bad-iothub-001"
    sas := "SharedAccessSignature sr=bad-iothub-001.azure-devices.net&sig=[redacted]&se=1715651633&skn=iothubowner"

    connOpts := amqp.ConnOptions{
        SASLType: amqp.SASLTypePlain(uname, sas),
    }

    conn, err := amqp.Dial(ctx, fmt.Sprintf("amqps://%s", host), &connOpts)
    if err != nil {
        log.Fatal("Problem creating connection: ", err)
    }
    defer conn.Close()

    session, err := conn.NewSession(ctx, nil)
    if err != nil {
        log.Fatal("Problem creating session: ", err)
    }

    sender, err := session.NewSender(ctx, "/devices/ABCDEF0000000001/messages/events", nil)
    if err != nil {
        log.Fatal("Problem creating sender: ", err)
    }

    if err = sender.Send(ctx, amqp.NewMessage([]byte("data")), nil); err != nil {
        log.Fatal("Problem sending message: ", err)
    }
    _ = sender.Close(ctx)
}

Logs from successful send to IoTHub with Gateway v1:

=== RUN   TestGoodIoTHub
11:58:50.000129 RX (negotiateSASL 0x140001e4480): Frame{Type: AMQP, Channel: 0, Body: SaslMechanisms{Mechanisms : [EXTERNAL MSSBCBS ANONYMOUS PLAIN]}}
11:58:50.000156 TX (ConnSASLPlain 0x140001e4480): Frame{Type: SASL, Channel: 0, Body: SaslInit{Mechanism : PLAIN, InitialResponse: ********, Hostname: }}
11:58:50.030596 RX (saslOutcome 0x140001e4480): Frame{Type: AMQP, Channel: 0, Body: SaslOutcome{Code : 0, AdditionalData: [87 101 108 99 111 109 101 33]}}
11:58:50.056228 TX (openAMQP 0x140001e4480): Frame{Type: AMQP, Channel: 0, Body: Open{ContainerID : dLoIzmGy1-le7FNPK1c7alPRAsBA396tp04PpZ1cn3at_5VtGaF67g, Hostname: good-iothub-001.azure-devices.net, MaxFrameSize: 65536, ChannelMax: 65535, IdleTimeout: 30s, OutgoingLocales: [], IncomingLocales: [], OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:58:50.085598 RX (openAMQP 0x140001e4480): Frame{Type: AMQP, Channel: 0, Body: Open{ContainerID : DeviceGateway_f05ecde2bdba41c2b49cad6fa103dca7, Hostname: localhost, MaxFrameSize: 65536, ChannelMax: 8191, IdleTimeout: 2m0s, OutgoingLocales: [], IncomingLocales: [], OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:58:50.085716 TX (connWriter 0x140001e4480) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Begin{RemoteChannel: <nil>, NextOutgoingID: 0, IncomingWindow: 5000, OutgoingWindow: 5000, HandleMax: 4294967294, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:58:50.116693 RX (connReader 0x140001e4480): Frame{Type: AMQP, Channel: 0, Body: Begin{RemoteChannel: 0, NextOutgoingID: 1, IncomingWindow: 5000, OutgoingWindow: 5000, HandleMax: 262143, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:58:50.116830 TX (connWriter 0x140001e4480) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Attach{Name: 7vvkHUiY5WppbtzgZiRYjcXV0l6HrDKVdl0CN6r9GDTEtIklS5XOYA, Handle: 0, Role: Sender, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: , Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: /devices/ABCDEF0000000001/messages/events, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:58:50.143726 RX (connReader 0x140001e4480): Frame{Type: AMQP, Channel: 0, Body: Attach{Name: 7vvkHUiY5WppbtzgZiRYjcXV0l6HrDKVdl0CN6r9GDTEtIklS5XOYA, Handle: 0, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: , Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: /devices/ABCDEF0000000001/messages/events, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 1048576, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:58:50.143876 RX (connReader 0x140001e4480): Frame{Type: AMQP, Channel: 0, Body: Flow{NextIncomingID: 0, IncomingWindow: 5000, NextOutgoingID: 1, OutgoingWindow: 5000, Handle: 0, DeliveryCount: 0, LinkCredit: 1000, Available: 0, Drain: false, Echo: false, Properties: map[]}}
11:58:50.143924 RX (Session 0x1400021cc30): link 7vvkHUiY5WppbtzgZiRYjcXV0l6HrDKVdl0CN6r9GDTEtIklS5XOYA attached, input handle 0, output handle 0
11:58:50.143981 TX (Sender 0x140003f4000) (pause): target: "/devices/ABCDEF0000000001/messages/events", link credit: 0, deliveryCount: 0
11:58:50.144000 TX (Sender 0x140003f4000) (enable): target: "/devices/ABCDEF0000000001/messages/events", link credit: 1000, deliveryCount: 0
11:58:50.144005 TX (Sender 0x140003f4000) (enable): target: "/devices/ABCDEF0000000001/messages/events", link credit: 999, deliveryCount: 1
11:58:50.144013 TX (connWriter 0x140001e4480) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: 0000000000000000, MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 9}}
11:58:50.368107 RX (connReader 0x140001e4480): Frame{Type: AMQP, Channel: 0, Body: Disposition{Role: Receiver, First: 0, Last: <nil>, Settled: true, State: Accepted, Batchable: false}}
11:58:50.368190 TX (Sender 0x140003f4000) (enable): target: "/devices/ABCDEF0000000001/messages/events", link credit: 999, deliveryCount: 1
11:58:50.368197 TX (connWriter 0x140001e4480): Frame{Type: AMQP, Channel: 0, Body: Close{Error: *Error(nil)}}
11:58:50.368366 RX (connReader 0x140001e4480): terminal error: read tcp 172.16.104.249:58010->20.49.109.145:5671: use of closed network connection
--- PASS: TestGoodIoTHub (0.54s)
PASS

Logs from failed send to IoTHub with Gateway v2:

=== RUN   TestBadIoTHub
11:46:02.909161 RX (negotiateSASL 0x140002b8000): Frame{Type: AMQP, Channel: 0, Body: SaslMechanisms{Mechanisms : [PLAIN ANONYMOUS MSSBCBS EXTERNAL]}}
11:46:02.909198 TX (ConnSASLPlain 0x140002b8000): Frame{Type: SASL, Channel: 0, Body: SaslInit{Mechanism : PLAIN, InitialResponse: ********, Hostname: }}
11:46:02.940925 RX (saslOutcome 0x140002b8000): Frame{Type: AMQP, Channel: 0, Body: SaslOutcome{Code : 0, AdditionalData: []}}
11:46:02.967802 TX (openAMQP 0x140002b8000): Frame{Type: AMQP, Channel: 0, Body: Open{ContainerID : 0f_lmQbE4Y3uWM56QDJ_P4Row5CnTETZfZsh6xFxaeJJR62VudwtvQ, Hostname: bad-iothub-001.azure-devices.net, MaxFrameSize: 65536, ChannelMax: 65535, IdleTimeout: 30s, OutgoingLocales: [], IncomingLocales: [], OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:46:02.996538 RX (openAMQP 0x140002b8000): Frame{Type: AMQP, Channel: 0, Body: Open{ContainerID : 4ddb645be6b4429ab34acadd19455fcb, Hostname: , MaxFrameSize: 65535, ChannelMax: 1024, IdleTimeout: 4m0s, OutgoingLocales: [], IncomingLocales: [], OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:46:02.996654 TX (connWriter 0x140002b8000) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Begin{RemoteChannel: <nil>, NextOutgoingID: 0, IncomingWindow: 5000, OutgoingWindow: 5000, HandleMax: 4294967294, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:46:03.024231 RX (connReader 0x140002b8000): Frame{Type: AMQP, Channel: 0, Body: Begin{RemoteChannel: 0, NextOutgoingID: 1, IncomingWindow: 4294967295, OutgoingWindow: 5000, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:46:03.024406 TX (connWriter 0x140002b8000) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Attach{Name: F0Lrmrd2uBP9kEcFSrg9yeOG6P6NagyMs5zuY2A1CGL3oVCUwWTyOQ, Handle: 0, Role: Sender, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: , Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: /devices/ABCDEF0000000001/messages/events, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:46:03.056799 RX (connReader 0x140002b8000): Frame{Type: AMQP, Channel: 0, Body: Attach{Name: F0Lrmrd2uBP9kEcFSrg9yeOG6P6NagyMs5zuY2A1CGL3oVCUwWTyOQ, Handle: 0, Role: Receiver, SenderSettleMode: mixed, ReceiverSettleMode: first, Source: source{Address: , Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: /devices/ABCDEF0000000001/messages/events, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 1048576, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:46:03.056878 RX (connReader 0x140002b8000): Frame{Type: AMQP, Channel: 0, Body: Flow{NextIncomingID: <nil>, IncomingWindow: 4294967295, NextOutgoingID: 0, OutgoingWindow: 5000, Handle: 0, DeliveryCount: 0, LinkCredit: 50, Available: <nil>, Drain: false, Echo: false, Properties: map[]}}
11:46:03.056882 RX (Session 0x140002388f0): link F0Lrmrd2uBP9kEcFSrg9yeOG6P6NagyMs5zuY2A1CGL3oVCUwWTyOQ attached, input handle 0, output handle 0
11:46:03.056901 TX (connWriter 0x140002b8000) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: End{Error: *Error{Condition: amqp:not-allowed, Description: next-incoming-id not set after session established, Info: map[]}}}
11:46:03.056969 TX (Sender 0x140002ca000) (pause): target: "/devices/ABCDEF0000000001/messages/events", link credit: 0, deliveryCount: 0
11:46:03.098032 RX (connReader 0x140002b8000): Frame{Type: AMQP, Channel: 0, Body: End{Error: *Error(nil)}}
2024/05/14 11:46:03 Problem sending message: protocol error: received flow without next-incoming-id after session established
jhendrixMSFT commented 3 months ago

This is a known issue and a problem with the IoTHub v2 gateway. Per spec, once a session has been established, the peer (IoTHub) must send flow frames with next-incoming-id set.

The following series of frames indicates that a session has been established.

11:46:02.996654 TX (connWriter 0x140002b8000) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Begin{RemoteChannel: <nil>, NextOutgoingID: 0, IncomingWindow: 5000, OutgoingWindow: 5000, HandleMax: 4294967294, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
11:46:03.024231 RX (connReader 0x140002b8000): Frame{Type: AMQP, Channel: 0, Body: Begin{RemoteChannel: 0, NextOutgoingID: 1, IncomingWindow: 4294967295, OutgoingWindow: 5000, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}

At this point, the peer must not send flow frames with an empty next-incoming-id, but it does just that.

11:46:03.056878 RX (connReader 0x140002b8000): Frame{Type: AMQP, Channel: 0, Body: Flow{NextIncomingID: <nil>, IncomingWindow: 4294967295, NextOutgoingID: 0, OutgoingWindow: 5000, Handle: 0, DeliveryCount: 0, LinkCredit: 50, Available: <nil>, Drain: false, Echo: false, Properties: map[]}}

This is a violation of the protocol so go-amqp terminates the connection.

Unfortunately, I don't know when IoTHub will fix their AMQP implementation.

Given that this is a service-side issue, I'm closing this. Please ping back if you have further questions.