minghuaw / fe2o3-amqp

A rust implementation of the AMQP1.0 protocol based on serde and tokio.
MIT License
64 stars 7 forks source link

Attach frame decode != Attach frame encode well enough for Azure ServiceBus WCF Relay #257

Closed brandonros closed 3 months ago

brandonros commented 3 months ago
use fe2o3_amqp::types::performatives::Attach;
use serde_amqp::read::SliceReader;

fn main() {
    let bytes = hex::decode("000002000000005312D0000002810000000EA13252656C61794C696E6B5F37396333313465652D343966312D343438622D626232662D3837373734353536626137333A6F757443424040005328C00C0B4040404040404040404040005329C06C07A16373623A2F2F72722D72656C61792D6561737475732E736572766963656275732E77696E646F77732E6E65742F52525252525252525252525252525252522F39306637393164612D646331652D343963392D613535632D6466643861643433393865652F404040404040404040404040D1000001B80000000AA311636F6D2E6D6963726F736F66743A737774A1DA5368617265644163636573735369676E61747572652073723D6874747025336125326625326672722D72656C61792D6561737475732E736572766963656275732E77696E646F77732E6E6574253266525252525252525252525252525252525225326639306637393164612D646331652D343963392D613535632D646664386164343339386565253266267369673D7876414169527A5541786D4C393164563367427665693175444C78424C51373833543761302532626A75706A772533642673653D3137323233393731383926736B6E3D5252524170705633A31A636F6D2E6D6963726F736F66743A636C69656E742D6167656E74A118536572766963654275732F332E302E35313237322E32303BA31B636F6D2E6D6963726F736F66743A64796E616D69632D72656C617941A31B636F6D2E6D6963726F736F66743A6C697374656E65722D74797065A11152656C61796564436F6E6E656374696F6EA319636F6D2E6D6963726F736F66743A747261636B696E672D6964A12436363264323630342D663535662D343961642D613964652D666339633333663266353763").unwrap();
    let reader = SliceReader::new(&bytes);
    let mut deserializer = serde_amqp::de::Deserializer::new(reader);
    let result: Result<Attach, _> = serde_path_to_error::deserialize(&mut deserializer);
    println!("{:?}", result);
}

image

$ cargo run
   Compiling scratchpad v0.1.0 (/Users/brandon/Desktop/scratchpad)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.38s
     Running `/Users/brandon/.cargo/target/debug/scratchpad`
Err(Error { path: Path { segments: [] }, original: InvalidFormatCode })
minghuaw commented 3 months ago

It doesn't looks like this is a valid AMQP 1.0 encoded value. The beginning 4 bytes of the decoded hex are 0, 0, 2, 91

The first byte 0 indicates that it's a DescribedType. Then the second byte should be the format code for the descriptor, which is expected to be either a Symbol (with the format code 0xa3 or 0xb3) or Ulong (with the format code 0x80, 0x53, or 0x44). However, as stated in the spec (http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html)

Descriptor values other than symbolic (symbol) or numeric (ulong) are, while not syntactically invalid, reserved - this includes numeric types other than ulong.

I have seen Microsoft defining custom types that uses reserved values in the SDK of their product. For example, Service Bus and Event Hubs defined a message format that is not define in the spec.

brandonros commented 3 months ago

Thank you for the response.

I made one mistake. The first 2 bytes are a u16 that represents the length. If those are skipped, are we any closer to decoding it?

brandonros commented 3 months ago

This "works" and "fe2o3-amqp" does not. Which is fine + expected like you said given Microsoft custom format.

const encodeU8 = (value) => value.toString(16).padStart(2, '0')

const encodeAttachPacket = () => {
    const linkName = `RelayLink_${uuidv4()}:out`
    const target = 'sb://...' // very odd non aqmps:// target
    const resourceUri = 'http://...'
    const signingKey = '...'
    const policyName = '...'
    const listenerType = 'RelayedConnection'
    const clientAgent = 'ServiceBus/3.0.51272.20;'
    const dynamicRelay = [0x41] // true
    const token = generateSasToken(
        resourceUri,
        signingKey,
        policyName,
        60
    )
    console.log({
        linkName,
        target,
        resourceUri,
        signingKey,
        policyName,
        token
    })
    const trackingId = uuidv4()
    const SWT_TOKEN_HEADER_NAME = 'com.microsoft:swt'
    const CLIENT_AGENT_HEADER_NAME = 'com.microsoft:client-agent'
    const DYNAMIC_RELAY_HEADER_NAME = 'com.microsoft:dynamic-relay'
    const LISTENER_TYPE_HEADER_NAME = 'com.microsoft:listener-type'
    const TRACKING_ID_HEADER_NAME = 'com.microsoft:tracking-id'
    const packet = Buffer.concat([
        Buffer.from('02000000005312D0000002810000000EA', 'hex'),

        Buffer.from(`A1${encodeU8(linkName.length)}`, 'hex'),
        Buffer.from(linkName),

        Buffer.from('43424040005328C00C0B4040404040404040404040005329C06C07', 'hex'),

        Buffer.from(`A1${encodeU8(target.length)}`, 'hex'),
        Buffer.from(target),

        Buffer.from('404040404040404040404040D1000001B80000000A', 'hex'),

        Buffer.from(`A3${encodeU8(SWT_TOKEN_HEADER_NAME.length)}`, 'hex'),
        Buffer.from(SWT_TOKEN_HEADER_NAME),
        Buffer.from(`A1${encodeU8(token.length)}`, 'hex'),
        Buffer.from(token),

        Buffer.from(`A3${encodeU8(CLIENT_AGENT_HEADER_NAME.length)}`, 'hex'),
        Buffer.from(CLIENT_AGENT_HEADER_NAME),
        Buffer.from(`A1${encodeU8(clientAgent.length)}`, 'hex'),
        Buffer.from(clientAgent),

        Buffer.from(`A3${encodeU8(DYNAMIC_RELAY_HEADER_NAME.length)}`, 'hex'),
        Buffer.from(DYNAMIC_RELAY_HEADER_NAME),
        Buffer.from(dynamicRelay),

        Buffer.from(`A3${encodeU8(LISTENER_TYPE_HEADER_NAME.length)}`, 'hex'),
        Buffer.from(LISTENER_TYPE_HEADER_NAME),
        Buffer.from(`A1${encodeU8(listenerType.length)}`, 'hex'),
        Buffer.from(listenerType),

        Buffer.from(`A3${encodeU8(TRACKING_ID_HEADER_NAME.length)}`, 'hex'),
        Buffer.from(TRACKING_ID_HEADER_NAME),
        Buffer.from(`A1${encodeU8(trackingId.length)}`, 'hex'),
        Buffer.from(trackingId),
    ])
    const packetLength = packet.length + 4
    const packetLengthBytes = uInt32ToBEBytes(packetLength)
    return Buffer.concat([
        packetLengthBytes,
        packet,
    ])
}
minghuaw commented 3 months ago

Thank you for the response.

I made one mistake. The first 2 bytes are a u16 that represents the length. If those are skipped, are we any closer to decoding it?

2 doesn't really match any format code though.

This "works" and "fe2o3-amqp" does not.

I'll take a look at the code example later today

minghuaw commented 3 months ago

Technically 2 could be useful if it is part of the start of a frame. The doff byte for AMQP 1.0 will always be 2. Let me take a look later today

minghuaw commented 3 months ago

I just saw that you have updated the hex code. Just by the look of it seems to be closer to a valid AMQP 1.0 value

minghuaw commented 3 months ago

@brandonros the first 6 bytes are all part of the frame header, and the remaining can be decoded

use fe2o3_amqp_types::performatives::Attach;

fn main() {
    let bytes = hex::decode("000002000000005312D0000002810000000EA13252656C61794C696E6B5F37396333313465652D343966312D343438622D626232662D3837373734353536626137333A6F757443424040005328C00C0B4040404040404040404040005329C06C07A16373623A2F2F72722D72656C61792D6561737475732E736572766963656275732E77696E646F77732E6E65742F52525252525252525252525252525252522F39306637393164612D646331652D343963392D613535632D6466643861643433393865652F404040404040404040404040D1000001B80000000AA311636F6D2E6D6963726F736F66743A737774A1DA5368617265644163636573735369676E61747572652073723D6874747025336125326625326672722D72656C61792D6561737475732E736572766963656275732E77696E646F77732E6E6574253266525252525252525252525252525252525225326639306637393164612D646331652D343963392D613535632D646664386164343339386565253266267369673D7876414169527A5541786D4C393164563367427665693175444C78424C51373833543761302532626A75706A772533642673653D3137323233393731383926736B6E3D5252524170705633A31A636F6D2E6D6963726F736F66743A636C69656E742D6167656E74A118536572766963654275732F332E302E35313237322E32303BA31B636F6D2E6D6963726F736F66743A64796E616D69632D72656C617941A31B636F6D2E6D6963726F736F66743A6C697374656E65722D74797065A11152656C61796564436F6E6E656374696F6EA319636F6D2E6D6963726F736F66743A747261636B696E672D6964A12436363264323630342D663535662D343961642D613964652D666339633333663266353763").unwrap();

    println!("{:x?}", &bytes[6..]);

    let attach: Attach = serde_amqp::from_slice(&bytes[6..]).unwrap();
    println!("{:?}", attach);
}

This gives the following

Attach { 
    name: "RelayLink_79c314ee-49f1-448b-bb2f-87774556ba73:out", 
    handle: Handle(0), 
    role: Sender, 
    snd_settle_mode: Mixed, 
    rcv_settle_mode: First, 
    source: Some(Source { 
        address: None, 
        durable: None, 
        expiry_policy: SessionEnd, 
        timeout: 0, 
        dynamic: false, 
        dynamic_node_properties: None, 
        distribution_mode: None, 
        filter: None, 
        default_outcome: None, 
        outcomes: None, 
        capabilities: None 
    }), 
    target: Some(Target(Target { 
        address: Some("sb://rr-relay-eastus.servicebus.windows.net/RRRRRRRRRRRRRRRRR/90f791da-dc1e-49c9-a55c-dfd8ad4398ee/"), 
        durable: None, 
        expiry_policy: SessionEnd, 
        timeout: 0, 
        dynamic: false, 
        dynamic_node_properties: None, 
        capabilities: None 
    })), 
    unsettled: None, 
    incomplete_unsettled: false, 
    initial_delivery_count: None, 
    max_message_size: None, 
    offered_capabilities: None, 
    desired_capabilities: None, 
    properties: Some(OrderedMap({
        Symbol("com.microsoft:swt"): String("SharedAccessSignature sr=http%3a%2f%2frr-relay-eastus.servicebus.windows.net%2fRRRRRRRRRRRRRRRRR%2f90f791da-dc1e-49c9-a55c-dfd8ad4398ee%2f&sig=xvAAiRzUAxmL91dV3gBvei1uDLxBLQ783T7a0%2bjupjw%3d&se=1722397189&skn=RRRAppV3"), 
        Symbol("com.microsoft:client-agent"): String("ServiceBus/3.0.51272.20;"), 
        Symbol("com.microsoft:dynamic-relay"): Bool(true), 
        Symbol("com.microsoft:listener-type"): String("RelayedConnection"), 
        Symbol("com.microsoft:tracking-id"): String("662d2604-f55f-49ad-a9de-fc9c33f2f57c")
    })) 
}
minghuaw commented 3 months ago

You could actually use the FrameDecoder if you get rid of the first two bytes and start with the 2

use bytes::BytesMut;
use fe2o3_amqp::frames::amqp::FrameDecoder;
use tokio_util::codec::Decoder;

fn main() {
    let bytes = hex::decode("000002000000005312D0000002810000000EA13252656C61794C696E6B5F37396333313465652D343966312D343438622D626232662D3837373734353536626137333A6F757443424040005328C00C0B4040404040404040404040005329C06C07A16373623A2F2F72722D72656C61792D6561737475732E736572766963656275732E77696E646F77732E6E65742F52525252525252525252525252525252522F39306637393164612D646331652D343963392D613535632D6466643861643433393865652F404040404040404040404040D1000001B80000000AA311636F6D2E6D6963726F736F66743A737774A1DA5368617265644163636573735369676E61747572652073723D6874747025336125326625326672722D72656C61792D6561737475732E736572766963656275732E77696E646F77732E6E6574253266525252525252525252525252525252525225326639306637393164612D646331652D343963392D613535632D646664386164343339386565253266267369673D7876414169527A5541786D4C393164563367427665693175444C78424C51373833543761302532626A75706A772533642673653D3137323233393731383926736B6E3D5252524170705633A31A636F6D2E6D6963726F736F66743A636C69656E742D6167656E74A118536572766963654275732F332E302E35313237322E32303BA31B636F6D2E6D6963726F736F66743A64796E616D69632D72656C617941A31B636F6D2E6D6963726F736F66743A6C697374656E65722D74797065A11152656C61796564436F6E6E656374696F6EA319636F6D2E6D6963726F736F66743A747261636B696E672D6964A12436363264323630342D663535662D343961642D613964652D666339633333663266353763").unwrap();

    // Get rid of the length bytes, start with DOFF
    let mut src = BytesMut::from(&bytes[2..]);

    let mut decoder = FrameDecoder {};
    let frame = decoder.decode(&mut src).unwrap().unwrap();

    println!("{:?}", frame);
}
brandonros commented 3 months ago

Ok. Now if you turn around and encode the decoded struct, does it encode 1:1?

The problem I'm facing is, unless I have my Sender config wrong, Microsoft ServiceBus does not like the encoded version out of fe2o3-amqp on their server. Let me look

@minghuaw

brandonros commented 3 months ago

Like this:

use fe2o3_amqp_types::performatives::Attach;

fn main() {
    let bytes = hex::decode("000002000000005312D0000002810000000EA13252656C61794C696E6B5F37396333313465652D343966312D343438622D626232662D3837373734353536626137333A6F757443424040005328C00C0B4040404040404040404040005329C06C07A16373623A2F2F72722D72656C61792D6561737475732E736572766963656275732E77696E646F77732E6E65742F52525252525252525252525252525252522F39306637393164612D646331652D343963392D613535632D6466643861643433393865652F404040404040404040404040D1000001B80000000AA311636F6D2E6D6963726F736F66743A737774A1DA5368617265644163636573735369676E61747572652073723D6874747025336125326625326672722D72656C61792D6561737475732E736572766963656275732E77696E646F77732E6E6574253266525252525252525252525252525252525225326639306637393164612D646331652D343963392D613535632D646664386164343339386565253266267369673D7876414169527A5541786D4C393164563367427665693175444C78424C51373833543761302532626A75706A772533642673653D3137323233393731383926736B6E3D5252524170705633A31A636F6D2E6D6963726F736F66743A636C69656E742D6167656E74A118536572766963654275732F332E302E35313237322E32303BA31B636F6D2E6D6963726F736F66743A64796E616D69632D72656C617941A31B636F6D2E6D6963726F736F66743A6C697374656E65722D74797065A11152656C61796564436F6E6E656374696F6EA319636F6D2E6D6963726F736F66743A747261636B696E672D6964A12436363264323630342D663535662D343961642D613964652D666339633333663266353763").unwrap();

    println!("{:x?}", &bytes[6..]);

    let attach: Attach = serde_amqp::from_slice(&bytes[6..]).unwrap();
    println!("{:?}", attach);

    let encoded = serde_amqp::to_vec(&attach).unwrap();
    println!("{}", hex::encode(&encoded));
}
brandonros commented 3 months ago

image

What the heck are they doing with these extra bytes and how hard would it be to support it...

minghuaw commented 3 months ago

It may not encode one to one because some AMQP 1.0 implementations have chosen to ignore variable length encoding of some types (eg. u64 less than 255 can be encoded into just two bytes, one format code and one value).

If your use case is service bus, I would recommend azservicebus tho. I am also the author of that crate, and it is built on top of fe2o3-amqp. I do remember somewhere Microsoft used custom message format.

brandonros commented 3 months ago

This is even worse unfortunately. My use case is we have a 12+ year old .NET Framework app tied to WCF that can't easily be migrated, and we want to extend it to be able to interact with other services.

aka, It might use AzureServiceBus, but a very specific ".NET WCF Azure Relay" thing that is legacy/not heavily used anymore.

Not your problem obviously, thanks for the support. If you could point me in the right direction/add anything, or think of any easy fix where I could make the attach packet that comes out of fe2o3-amqp be "compatible" with this format Azure is looking for regards to relay.

minghuaw commented 3 months ago

I'm not familiar with the relay thing, so I'll need some time to look at the code you provided. I'll try to get back to you by tomorrow morning and let you know if and how hard would it be to support it

minghuaw commented 3 months ago

Just a positive note though, since it can be decoded with AMQP 1.0, I do think there's a high possibility that it could be supported without too much effort

minghuaw commented 3 months ago

Problem Diagnosis

After taking a closer look at the encoded value, I can confirm that the differences are indeed caused by some values not encoded in their shortest possible form.

All the remaining fields are encoded the same. Microsoft have multiple open source implementations of the AMQP 1.0 protocol (even for dotnet along). So this might be caused by an old version of the implementation or differences in those implementations.

Potential Solutions

I think the solution depends on whether the decoder can handle the cases mentioned above (ie. List0 and ignoring trailing Nulls), and I think a simple experiment could probably help. For the experiment, I think you could try creating the dotnet buffer with the bytes encoded by fe2o3-amqp and see whether that Attach is accepted.

Option 1

If the experiment is successful, then you should be able to just attach the sender as usual with the necessary properties. The following code should create a sender link that has the necessary properties

use fe2o3_amqp::{link::SenderAttachError, session::SessionHandle, Sender};
use fe2o3_amqp_types::{definitions::{Fields, ReceiverSettleMode, SenderSettleMode}, messaging::{Source, Target}};
use serde_amqp::{primitives::Symbol, Value};

async fn build_sender(session: &mut SessionHandle<()>) -> Result<Sender, SenderAttachError> {
    let mut properties = Fields::new();
    properties.insert(Symbol::from(SWT_TOKEN_HEADER_NAME), Value::from(SWT_TOKEN_HEADER_VALUE));
    properties.insert(Symbol::from(CLIENT_AGENT_HEADER_NAME), Value::from(CLIENT_AGENT_HEADER_VALUE));
    properties.insert(Symbol::from(DYNAMIC_RELAY_HEADER_NAME), Value::from(DYNAMIC_RELAY_HEADER_VALUE));
    properties.insert(Symbol::from(LISTENER_TYPE_HEADER_NAME), Value::from(LISTENER_TYPE_HEADER_VALUE));
    properties.insert(Symbol::from(TRACKING_ID_HEADER_NAME), Value::from(TRACKING_ID_HEADER_VALUE));

    Sender::builder()
        .name(NAME)
        .sender_settle_mode(SenderSettleMode::Mixed)
        .receiver_settle_mode(ReceiverSettleMode::First)
        .source(Source::default())
        .target(Target::builder().address(TARGET_ADDRESS).build())
        .properties(properties)
        .attach(session).await
}

Option 2

If the experiment fails, ie. the decoder on the server side cannot handle List0 and ignoring trailing Nulls, since the problem is only caused by the encoding of Source and Target, you could probably try manually setting the last field of Source and Target to a non-Null value. The last field of both Source and Target is capabilties, which is of type Option<Array<Symbol>>. Setting that field to Some(Array(Vec::new())) should then make that field non-Null but an empty Array, and this can be accomplished with the following code.

use fe2o3_amqp::{link::SenderAttachError, session::SessionHandle, Sender};
use fe2o3_amqp_types::{definitions::{Fields, ReceiverSettleMode, SenderSettleMode}, messaging::{Source, Target}};
use serde_amqp::{primitives::{Array, Symbol}, Value};

async fn build_sender_empty_caps(session: &mut SessionHandle<()>) -> Result<Sender, SenderAttachError> {
    let source = Source::builder()
        .capabilities(Array(Vec::new())) // Use an empty Array to avoid the default value, which is None
        .build();
    let target = Target::builder()
        .address(TARGET_ADDRESS)
        .capabilities(Array(Vec::new())) // Use an empty Array to avoid the default value, which is None
        .build();
    let mut properties = Fields::new();
    properties.insert(Symbol::from(SWT_TOKEN_HEADER_NAME), Value::from(SWT_TOKEN_HEADER_VALUE));
    properties.insert(Symbol::from(CLIENT_AGENT_HEADER_NAME), Value::from(CLIENT_AGENT_HEADER_VALUE));
    properties.insert(Symbol::from(DYNAMIC_RELAY_HEADER_NAME), Value::from(DYNAMIC_RELAY_HEADER_VALUE));
    properties.insert(Symbol::from(LISTENER_TYPE_HEADER_NAME), Value::from(LISTENER_TYPE_HEADER_VALUE));
    properties.insert(Symbol::from(TRACKING_ID_HEADER_NAME), Value::from(TRACKING_ID_HEADER_VALUE));

    Sender::builder()
        .name(NAME)
        .sender_settle_mode(SenderSettleMode::Mixed)
        .receiver_settle_mode(ReceiverSettleMode::First)
        .source(source)
        .target(target)
        .properties(properties)
        .attach(session).await
}

Worst Case Option

In the worst case scenario, we may need to create a fork and remove the code in the macro that ignores trailing nulls, and change the Serializer to no longer encode an empty list as List0 but as a list of nulls

brandonros commented 3 months ago

Option 1:

Brandons-Laptop:azure_relay_rs brandon 2024-08-01 11:28:55 $ cargo run --bin remoter_amqp
[2024-08-01T15:29:04Z TRACE fe2o3_amqp::transport] SEND proto_header = ProtocolHeader { id: Sasl, major: 1, minor: 0, revision: 0 }
[2024-08-01T15:29:04Z TRACE fe2o3_amqp::transport] RECV incoming_header = ProtocolHeader { id: Sasl, major: 1, minor: 0, revision: 0 }
[2024-08-01T15:29:04Z TRACE fe2o3_amqp::connection::builder] received = Mechanisms(SaslMechanisms { sasl_server_mechanisms: Array([Symbol("MSSBCBS"), Symbol("PLAIN"), Symbol("ANONYMOUS"), Symbol("EXTERNAL")]) })
[2024-08-01T15:29:04Z TRACE fe2o3_amqp::connection::builder] sending = Init(SaslInit { mechanism: Symbol("EXTERNAL"), initial_response: None, hostname: Some("redacted.servicebus.windows.net") })
[2024-08-01T15:29:04Z TRACE fe2o3_amqp::connection::builder] received = Outcome(SaslOutcome { code: Ok, additional_data: Some([87, 101, 108, 99, 111, 109, 101, 33]) })
[2024-08-01T15:29:04Z TRACE fe2o3_amqp::transport] SEND proto_header = ProtocolHeader { id: Amqp, major: 1, minor: 0, revision: 0 }
[2024-08-01T15:29:04Z TRACE fe2o3_amqp::transport] RECV proto_header = ProtocolHeader { id: Amqp, major: 1, minor: 0, revision: 0 }
[2024-08-01T15:29:04Z TRACE fe2o3_amqp::connection] SEND frame = Frame { channel: 0, body: Open(Open { container_id: "RelayConnection_4b4c910e-9ff7-4223-9556-c525317c02ec", hostname: Some("redacted.servicebus.windows.net"), max_frame_size: MaxFrameSize(65536), channel_max: ChannelMax(8191), idle_time_out: None, outgoing_locales: None, incoming_locales: None, offered_capabilities: None, desired_capabilities: None, properties: None }) }
[2024-08-01T15:29:04Z TRACE fe2o3_amqp::connection] RECV frame = Open { container_id: "f1d464ed50564d059135a00f9f5b42fe_G6", hostname: None, max_frame_size: MaxFrameSize(65536), channel_max: ChannelMax(4999), idle_time_out: Some(120000), outgoing_locales: None, incoming_locales: None, offered_capabilities: None, desired_capabilities: None, properties: None }
[2024-08-01T15:29:04Z DEBUG fe2o3_amqp::connection::engine] AllocateSession
[2024-08-01T15:29:04Z TRACE fe2o3_amqp::connection::engine] SEND channel = 0, frame = Begin(Begin { remote_channel: None, next_outgoing_id: 1, incoming_window: 5000, outgoing_window: 5000, handle_max: Handle(262143), offered_capabilities: None, desired_capabilities: None, properties: None })
[2024-08-01T15:29:04Z TRACE fe2o3_amqp::connection::engine] RECV frame=Frame { channel: 0, body: Begin(Begin { remote_channel: Some(0), next_outgoing_id: 1, incoming_window: 5000, outgoing_window: 5000, handle_max: Handle(255), offered_capabilities: None, desired_capabilities: None, properties: None }) }
[2024-08-01T15:29:05Z INFO  remoter_amqp] sender sender_name = RelayLink_bf8d9bba-813e-4303-993b-db36a25052b3:out relay_url = sb://redacted.servicebus.windows.net/redacted/90f791da-dc1e-49c9-a55c-dfd8ad4398ee/ properties = OrderedMap({Symbol("com.microsoft:swt"): String("SharedAccessSignature sr=http%3a%2f%2fredacted.servicebus.windows.net%2fredacted%2f90f791da-dc1e-49c9-a55c-dfd8ad4398ee%2f&sig=redacted&se=1722529743&skn=redacted"), Symbol("com.microsoft:client-agent"): String("ServiceBus/3.0.51272.20;"), Symbol("com.microsoft:dynamic-relay"): Bool(true), Symbol("com.microsoft:listener-type"): String("RelayedConnection"), Symbol("com.microsoft:tracking-id"): String("886160bd-daf4-4ef4-bd8c-d4500840d68c")})
[2024-08-01T15:29:05Z TRACE fe2o3_amqp::session::engine] control: AllocateLink
[2024-08-01T15:29:05Z TRACE fe2o3_amqp::connection::engine] SEND channel = 0, frame = Attach(Attach { name: "RelayLink_bf8d9bba-813e-4303-993b-db36a25052b3:out", handle: Handle(0), role: Sender, snd_settle_mode: Mixed, rcv_settle_mode: First, source: Some(Source { address: None, durable: None, expiry_policy: SessionEnd, timeout: 0, dynamic: false, dynamic_node_properties: None, distribution_mode: None, filter: None, default_outcome: None, outcomes: None, capabilities: None }), target: Some(Target(Target { address: Some("sb://redacted.servicebus.windows.net/redacted/90f791da-dc1e-49c9-a55c-dfd8ad4398ee/"), durable: None, expiry_policy: SessionEnd, timeout: 0, dynamic: false, dynamic_node_properties: None, capabilities: None })), unsettled: None, incomplete_unsettled: false, initial_delivery_count: Some(0), max_message_size: None, offered_capabilities: None, desired_capabilities: None, properties: Some(OrderedMap({Symbol("com.microsoft:swt"): String("SharedAccessSignature sr=http%3a%2f%2fredacted.servicebus.windows.net%2fredacted%2f90f791da-dc1e-49c9-a55c-dfd8ad4398ee%2f&sig=redacted&se=1722529743&skn=redacted"), Symbol("com.microsoft:client-agent"): String("ServiceBus/3.0.51272.20;"), Symbol("com.microsoft:dynamic-relay"): Bool(true), Symbol("com.microsoft:listener-type"): String("RelayedConnection"), Symbol("com.microsoft:tracking-id"): String("886160bd-daf4-4ef4-bd8c-d4500840d68c")})) })
[2024-08-01T15:29:05Z TRACE fe2o3_amqp::connection::engine] RECV frame=Frame { channel: 0, body: Attach(Attach { name: "RelayLink_bf8d9bba-813e-4303-993b-db36a25052b3:out", handle: Handle(0), role: Receiver, snd_settle_mode: Mixed, rcv_settle_mode: First, source: None, target: None, unsettled: None, incomplete_unsettled: false, initial_delivery_count: None, max_message_size: None, offered_capabilities: None, desired_capabilities: None, properties: None }) }
[2024-08-01T15:29:05Z TRACE fe2o3_amqp::connection::engine] RECV frame=Frame { channel: 0, body: Detach(Detach { handle: Handle(0), closed: true, error: Some(Error { condition: AmqpError(NotAllowed), description: Some("The scheme of the Target or Source address URL in the ATTACH frame ('sb') is invalid. Only 'amqp' and 'amqps' schemes are supported. TrackingId:f1d464ed50564d059135a00f9f5b42fe_G6, SystemTracker:gateway7, Timestamp:2024-08-01T15:29:05"), info: None }) }) }
[2024-08-01T15:29:05Z ERROR fe2o3_amqp::link::builder] attach_error = IncomingTargetIsNone
[2024-08-01T15:29:05Z TRACE fe2o3_amqp::session] RECV frame = Detach { handle: Handle(0), closed: true, error: Some(Error { condition: AmqpError(NotAllowed), description: Some("The scheme of the Target or Source address URL in the ATTACH frame ('sb') is invalid. Only 'amqp' and 'amqps' schemes are supported. TrackingId:f1d464ed50564d059135a00f9f5b42fe_G6, SystemTracker:gateway7, Timestamp:2024-08-01T15:29:05"), info: None }) }
[2024-08-01T15:29:05Z DEBUG fe2o3_amqp::link] Sending detach: Detach { handle: Handle(0), closed: true, error: None }
[2024-08-01T15:29:05Z TRACE fe2o3_amqp::link] RECV detach = Detach { handle: Handle(0), closed: true, error: Some(Error { condition: AmqpError(NotAllowed), description: Some("The scheme of the Target or Source address URL in the ATTACH frame ('sb') is invalid. Only 'amqp' and 'amqps' schemes are supported. TrackingId:f1d464ed50564d059135a00f9f5b42fe_G6, SystemTracker:gateway7, Timestamp:2024-08-01T15:29:05"), info: None }) }
thread 'main' panicked at src/remoter_amqp.rs[2024-08-01T15:29:05Z TRACE fe2o3_amqp::connection::engine] SEND channel = 0, frame = Detach(Detach { handle: Handle(0), closed: true, error: None })
:110:10:
called `Result::unwrap()` on an `Err` value: RemoteClosedWithError(Error { condition: AmqpError(NotAllowed), description: Some("The scheme of the Target or Source address URL in the ATTACH frame ('sb') is invalid. Only 'amqp' and 'amqps' schemes are supported. TrackingId:f1d464ed50564d059135a00f9f5b42fe_G6, SystemTracker:gateway7, Timestamp:2024-08-01T15:29:05"), info: None })
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Option 2:

$ cargo run --bin remoter_amqp
[2024-08-01T15:32:48Z TRACE fe2o3_amqp::transport] SEND proto_header = ProtocolHeader { id: Sasl, major: 1, minor: 0, revision: 0 }
[2024-08-01T15:32:48Z TRACE fe2o3_amqp::transport] RECV incoming_header = ProtocolHeader { id: Sasl, major: 1, minor: 0, revision: 0 }
[2024-08-01T15:32:48Z TRACE fe2o3_amqp::connection::builder] received = Mechanisms(SaslMechanisms { sasl_server_mechanisms: Array([Symbol("MSSBCBS"), Symbol("PLAIN"), Symbol("ANONYMOUS"), Symbol("EXTERNAL")]) })
[2024-08-01T15:32:48Z TRACE fe2o3_amqp::connection::builder] sending = Init(SaslInit { mechanism: Symbol("EXTERNAL"), initial_response: None, hostname: Some("redacted.servicebus.windows.net") })
[2024-08-01T15:32:48Z TRACE fe2o3_amqp::connection::builder] received = Outcome(SaslOutcome { code: Ok, additional_data: Some([87, 101, 108, 99, 111, 109, 101, 33]) })
[2024-08-01T15:32:48Z TRACE fe2o3_amqp::transport] SEND proto_header = ProtocolHeader { id: Amqp, major: 1, minor: 0, revision: 0 }
[2024-08-01T15:32:48Z TRACE fe2o3_amqp::transport] RECV proto_header = ProtocolHeader { id: Amqp, major: 1, minor: 0, revision: 0 }
[2024-08-01T15:32:48Z TRACE fe2o3_amqp::connection] SEND frame = Frame { channel: 0, body: Open(Open { container_id: "RelayConnection_9d601b0f-152c-4422-a07d-6943e6c1252b", hostname: Some("redacted.servicebus.windows.net"), max_frame_size: MaxFrameSize(65536), channel_max: ChannelMax(8191), idle_time_out: None, outgoing_locales: None, incoming_locales: None, offered_capabilities: None, desired_capabilities: None, properties: None }) }
[2024-08-01T15:32:48Z TRACE fe2o3_amqp::connection] RECV frame = Open { container_id: "6135d0191e2847c3bb041c8a58e878f0_G33", hostname: None, max_frame_size: MaxFrameSize(65536), channel_max: ChannelMax(4999), idle_time_out: Some(120000), outgoing_locales: None, incoming_locales: None, offered_capabilities: None, desired_capabilities: None, properties: None }
[2024-08-01T15:32:48Z DEBUG fe2o3_amqp::connection::engine] AllocateSession
[2024-08-01T15:32:48Z TRACE fe2o3_amqp::connection::engine] SEND channel = 0, frame = Begin(Begin { remote_channel: None, next_outgoing_id: 1, incoming_window: 5000, outgoing_window: 5000, handle_max: Handle(262143), offered_capabilities: None, desired_capabilities: None, properties: None })
[2024-08-01T15:32:48Z TRACE fe2o3_amqp::connection::engine] RECV frame=Frame { channel: 0, body: Begin(Begin { remote_channel: Some(0), next_outgoing_id: 1, incoming_window: 5000, outgoing_window: 5000, handle_max: Handle(255), offered_capabilities: None, desired_capabilities: None, properties: None }) }
[2024-08-01T15:32:49Z INFO  remoter_amqp] sender sender_name = RelayLink_fe8fceb4-366b-411f-888d-695b176bdf86:out relay_url = sb://redacted.servicebus.windows.net/redacted/90f791da-dc1e-49c9-a55c-dfd8ad4398ee/ properties = OrderedMap({Symbol("com.microsoft:swt"): String("SharedAccessSignature sr=http%3a%2f%2fredacted.servicebus.windows.net%2fredacted%2f90f791da-dc1e-49c9-a55c-dfd8ad4398ee%2f&sig=redacted&se=1722529968&skn=redacted"), Symbol("com.microsoft:client-agent"): String("ServiceBus/3.0.51272.20;"), Symbol("com.microsoft:dynamic-relay"): Bool(true), Symbol("com.microsoft:listener-type"): String("RelayedConnection"), Symbol("com.microsoft:tracking-id"): String("cef42a05-2ea1-422c-b6e1-6de70bfa75bf")})
[2024-08-01T15:32:49Z TRACE fe2o3_amqp::session::engine] control: AllocateLink
[2024-08-01T15:32:49Z TRACE fe2o3_amqp::connection::engine] SEND channel = 0, frame = Attach(Attach { name: "RelayLink_fe8fceb4-366b-411f-888d-695b176bdf86:out", handle: Handle(0), role: Sender, snd_settle_mode: Mixed, rcv_settle_mode: First, source: Some(Source { address: None, durable: None, expiry_policy: SessionEnd, timeout: 0, dynamic: false, dynamic_node_properties: None, distribution_mode: None, filter: None, default_outcome: None, outcomes: None, capabilities: Some(Array([])) }), target: Some(Target(Target { address: Some("sb://redacted.servicebus.windows.net/redacted/90f791da-dc1e-49c9-a55c-dfd8ad4398ee/"), durable: None, expiry_policy: SessionEnd, timeout: 0, dynamic: false, dynamic_node_properties: None, capabilities: Some(Array([])) })), unsettled: None, incomplete_unsettled: false, initial_delivery_count: Some(0), max_message_size: None, offered_capabilities: None, desired_capabilities: None, properties: Some(OrderedMap({Symbol("com.microsoft:swt"): String("SharedAccessSignature sr=http%3a%2f%2fredacted.servicebus.windows.net%2fredacted%2f90f791da-dc1e-49c9-a55c-dfd8ad4398ee%2f&sig=redacted&se=1722529968&skn=redacted"), Symbol("com.microsoft:client-agent"): String("ServiceBus/3.0.51272.20;"), Symbol("com.microsoft:dynamic-relay"): Bool(true), Symbol("com.microsoft:listener-type"): String("RelayedConnection"), Symbol("com.microsoft:tracking-id"): String("cef42a05-2ea1-422c-b6e1-6de70bfa75bf")})) })
[2024-08-01T15:32:49Z TRACE fe2o3_amqp::connection::engine] RECV frame=Frame { channel: 0, body: End(End { error: None }) }
[2024-08-01T15:32:49Z TRACE fe2o3_amqp::connection::engine] RECV frame=Frame { channel: 0, body: Close(Close { error: Some(Error { condition: AmqpError(InternalError), description: Some("An AMQP error occurred (condition='amqp:internal-error')."), info: None }) }) }
[2024-08-01T15:32:49Z ERROR fe2o3_amqp::connection::engine] RemoteClosedWithError(Error { condition: AmqpError(InternalError), description: Some("An AMQP error occurred (condition='amqp:internal-error')."), info: None })
[2024-08-01T15:32:49Z DEBUG fe2o3_amqp::connection::engine] Stopped
[2024-08-01T15:32:49Z TRACE fe2o3_amqp::session] RECV end = End { error: None }
[2024-08-01T15:32:49Z ERROR fe2o3_amqp::session::engine] IllegalConnectionState
[2024-08-01T15:32:49Z DEBUG fe2o3_amqp::session::engine] Stopped
[2024-08-01T15:32:49Z ERROR fe2o3_amqp::link::builder] attach_error = IllegalSessionState
thread 'main' panicked at src/remoter_amqp.rs:118:10:
called `Result::unwrap()` on an `Err` value: IllegalSessionState
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Worst case option:

we may need to create a fork and remove the code in the macro that ignores trailing nulls, and change the Serializer to no longer encode an empty list as List0 but as a list of nulls

Can you link to me on GitHub where this is? I'm down to try it (or if you can create a branch, I'll pull and try)

Thank you for your help on this. It is above and beyond. Do not want to eat too much of your time.

NBFS:

My next challenge in parallel to this is once I have it working (hacky with node.js hardcoded Buffers at the moment as a workaround for lack of a real encoder/parser), I need to decode and encode NBFS

[MC-NBFX]: .NET Binary Format: XML Data Structure and [MC-NBFS]: .NET Binary Format: SOAP Data Structure

Any experience with this?

https://github.com/khoad/msbingo/blob/master/nbfs/soap_dictionary.go

minghuaw commented 3 months ago

Can you link to me on GitHub where this is? I'm down to try it (or if you can create a branch, I'll pull and try)

I don't think this would be helpful at this moment because it doesn't seem to be an error or bug associated with the protocol, but rather the server rejected the link for some other reason. I'll just point to the code in case you want to give a try anyway,

But I think the root cause is something else, and changing these code is highly likely NOT going to fix the problem.

minghuaw commented 3 months ago

What's interesting in the option 1 is the error message

The scheme of the Target or Source address URL in the ATTACH frame ('sb') is invalid. Only 'amqp' and 'amqps' schemes are supported.

I remember seeing current Azure SB SDK using sb in the source/target scheme, and I think it's actually how azservicebus does it as well, which works fine. I might be wrong since I haven't looked at those code for a long time. Let me double check that.

Meanwhile, I think maybe it's worthwhile to try changing the scheme to "amqp" or "amqps"?

minghuaw commented 3 months ago

I remember seeing current Azure SB SDK using sb in the source/target scheme, and I think it's actually how azservicebus does it as well, which works fine.

@brandonros I was wrong about this. Even though the endpoint segment in a service bus connection string uses a "sb" scheme, the "sb" scheme is implicitly changed to "amqps" when establishing the connection (see here), and "amqps" is also used when referring to entities (ie. senders, receivers) on the server (see here). So I think going with option 1 and changing the scheme from "sb" to "amqps" has a higher chance to solve this problem

minghuaw commented 3 months ago

[MC-NBFX]: .NET Binary Format: XML Data Structure and [MC-NBFS]: .NET Binary Format: SOAP Data Structure

Unfortunately, I haven't worked with these formats in rust. However, given that SOAP is a relatively simple protocol encoded as xml, I think it wouldn't be too hard to parse it with an xml crate

brandonros commented 3 months ago

image

What the heck are they doing with these extra bytes and how hard would it be to support it...

@minghuaw

Unless I'm wrong, isn't this proof that the .NET AzureRelay client uses sb://?

Two ideas:

  1. Can we add log::trace! for when it writes the binary payload over the wire

  2. I'll just fork this library and add a really custom out of the serializer thing for this. I appreciate your time. Let me close the issue and not burn more of your time. This was just for any other poor soul who will land on this from Google.

To clarify: yes, I tried a lot of stuff, exactly what you said. I wish it worked. I do not see it as a reflection of this library/the encoder at all. This is probably code from 2009 on their end... not sure.

It does not help that it's under the guise of "service bus" when it is like some fringe deprecated legacy "actually not Azure service bus feature"

If you drop me your email I'd love to show you what I came up with, I think you'll find it funny if you are interested.

minghuaw commented 3 months ago

Unless I'm wrong, isn't this proof that the .NET AzureRelay client uses sb://?

@brandonros True, but you will find their more recent SDKs doing what I mentioned (implicitly changing sb to amqps).

This is probably code from 2009

If this code is that old, the whole AMQP 1.0 protocol was probably still a draft, and thing could have changed a lot from that version to what ended in the final version.

If you drop me your email I'd love to show you what I came up with, I think you'll find it funny if you are interested.

My email is public under my github profile. I'll just paste it here: michael.wu1107@gmail.com

brandonros commented 3 months ago

Kind of interesting...

impl Encoder<Frame> for FrameEncoder {
    type Error = Error;

    fn encode(&mut self, item: Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
        use serde_amqp::ser::Serializer;

        match item.body {
            FrameBody::Open(performative) => {
                write_header(dst, item.channel);
                let mut serializer = Serializer::from(dst.writer());
                performative.serialize(&mut serializer)
            }
            FrameBody::Begin(performative) => {
                write_header(dst, item.channel);
                let mut serializer = Serializer::from(dst.writer());
                performative.serialize(&mut serializer)
            }
            FrameBody::Attach(performative) => {
                write_header(dst, item.channel);

                match performative.role {
                    fe2o3_amqp_types::definitions::Role::Sender => {
                        dst.put(&*hex::decode("005312D0000002810000000E").unwrap());

                        dst.put_u8(0xA1);
                        dst.put_u8(performative.name.len() as u8);
                        dst.put(performative.name.as_bytes());

                        dst.put(&*hex::decode("43424040005328C00C0B4040404040404040404040005329C06C07").unwrap());

                        let target_archetype = match performative.target {
                            Some(target_archetype) => target_archetype,
                            None => todo!(),
                        };
                        let target = match target_archetype.as_ref() {
                            fe2o3_amqp_types::messaging::TargetArchetype::Target(target) => target,
                        };
                        let target_address = target.address.as_ref().unwrap();
                        dst.put_u8(0xA1);
                        dst.put_u8(target_address.len() as u8);
                        dst.put(target_address.as_bytes());

                        dst.put(&*hex::decode("404040404040404040404040D1000001B80000000A").unwrap());

                        if let Some(properties) = performative.properties {
                            for (symbol, value) in properties {
                                let key = &symbol.0;

                                dst.put_u8(0xA3);
                                dst.put_u8(key.len() as u8);
                                dst.put(key.as_bytes());

                                match value {
                                    serde_amqp::Value::Described(_) => todo!(),
                                    serde_amqp::Value::Null => todo!(),
                                    serde_amqp::Value::Bool(value) => {
                                        if value == true {
                                            dst.put_u8(0x41);
                                        } else {
                                            todo!()
                                        }
                                    },
                                    serde_amqp::Value::Ubyte(_) => todo!(),
                                    serde_amqp::Value::Ushort(_) => todo!(),
                                    serde_amqp::Value::Uint(_) => todo!(),
                                    serde_amqp::Value::Ulong(_) => todo!(),
                                    serde_amqp::Value::Byte(_) => todo!(),
                                    serde_amqp::Value::Short(_) => todo!(),
                                    serde_amqp::Value::Int(_) => todo!(),
                                    serde_amqp::Value::Long(_) => todo!(),
                                    serde_amqp::Value::Float(_) => todo!(),
                                    serde_amqp::Value::Double(_) => todo!(),
                                    serde_amqp::Value::Decimal32(_) => todo!(),
                                    serde_amqp::Value::Decimal64(_) => todo!(),
                                    serde_amqp::Value::Decimal128(_) => todo!(),
                                    serde_amqp::Value::Char(_) => todo!(),
                                    serde_amqp::Value::Timestamp(_) => todo!(),
                                    serde_amqp::Value::Uuid(_) => todo!(),
                                    serde_amqp::Value::Binary(_) => todo!(),
                                    serde_amqp::Value::String(value) => {
                                        dst.put_u8(0xA1);
                                        dst.put_u8(value.len() as u8);
                                        dst.put(value.as_bytes());
                                    },
                                    serde_amqp::Value::Symbol(_) => todo!(),
                                    serde_amqp::Value::List(_) => todo!(),
                                    serde_amqp::Value::Map(_) => todo!(),
                                    serde_amqp::Value::Array(_) => todo!(),
                                };
                            }   
                        }
                    },
                    fe2o3_amqp_types::definitions::Role::Receiver => todo!(),
                }

                println!("{}", hex::encode(&dst));

                /*let mut serializer = Serializer::from(dst.writer());
                performative.serialize(&mut serializer)*/
                Ok(())
            }
            FrameBody::Flow(performative) => {
                write_header(dst, item.channel);
                let mut serializer = Serializer::from(dst.writer());
                performative.serialize(&mut serializer)
            }
            FrameBody::Transfer {
                performative,
                payload,
            } => self.encode_transfer(dst, item.channel, performative, payload),
            FrameBody::Disposition(performative) => {
                write_header(dst, item.channel);
                let mut serializer = Serializer::from(dst.writer());
                performative.serialize(&mut serializer)
            }
            FrameBody::Detach(performative) => {
                write_header(dst, item.channel);
                let mut serializer = Serializer::from(dst.writer());
                performative.serialize(&mut serializer)
            }
            FrameBody::End(performative) => {
                write_header(dst, item.channel);
                let mut serializer = Serializer::from(dst.writer());
                performative.serialize(&mut serializer)
            }
            FrameBody::Close(performative) => {
                write_header(dst, item.channel);
                let mut serializer = Serializer::from(dst.writer());
                performative.serialize(&mut serializer)
            }
            FrameBody::Empty => {
                write_header(dst, item.channel);
                Ok(())
            }
        }
        .map_err(Into::into)
    }
}

The payload looks what I would expect and I still get the complaint about sb://

Meaning... it has to be one of these?

start_send: item = Frame { channel: 0, body: Open(Open { container_id: "RelayConnection_a310b071-20f5-402e-8149-83e37f82ca0a", hostname: Some("redacted.servicebus.windows.net"), max_frame_size: MaxFrameSize(10000), channel_max: ChannelMax(1fff), idle_time_out: None, outgoing_locales: None, incoming_locales: None, offered_capabilities: None, desired_capabilities: None, properties: None }) }
start_send: encoded_item = 02000000005310c06704a13452656c6179436f6e6e656374696f6e5f61333130623037312d323066352d343032652d383134392d383365333766383263613061a12672722d72656c61792d7272727272722e736572766963656275732e77696e646f77732e6e65747000010000601fff

start_send: item = Frame { channel: 0, body: Begin(Begin { remote_channel: None, next_outgoing_id: 1, incoming_window: 1388, outgoing_window: 1388, handle_max: Handle(3ffff), offered_capabilities: None, desired_capabilities: None, properties: None }) }
start_send: encoded_item = 02000000005311c0130540520170000013887000001388700003ffff

start_send: item = Frame { channel: 0, body: Empty }
start_send: encoded_item = 02000000

thoughts?

Edit: I think I found it and I need your help. It's the open packet. When we are encoding it, it isn't overriding the url with .hostname()?

    let mut connection = Connection::builder()
            .container_id(container_id)
            .hostname("redacted-redacted-redacted-relay.servicebus.windows.net") // extra -relay
            .tls_connector(tls_connector)
            .alt_tls_establishment(true)
            .max_frame_size(65536)
            .channel_max(8191)
            .sasl_profile(SaslProfile::External)
            .open(connection_string.as_str())
            .await
            .unwrap();

Would you expect that to encode in the Open frame?