Azure / amqpnetlite

AMQP 1.0 .NET Library
Apache License 2.0
396 stars 141 forks source link

Modify MessageAnnotations with Apache Artemis #581

Closed ldwedari closed 1 month ago

ldwedari commented 3 months ago

I'm connecting to Apache Artemis to publish and consume messages. I would like to provide message annotations when calling receiver.Modify() as shown in test/Common/LinkTest.cs. But I don't see the annotations being added to the message. Does this works with Artemis? I guess that the messages are inmutable. If that is the case, why is it possible to call Modify() with additional annotations? Thanks.

xinchen10 commented 1 month ago

According to the spec, upon receiving the modified outcome, the source (which is the Artemis in this case) makes the message available again to the same or a different consumer (depending on the flags in the modified performative) and SHOULD apply the annotations field to the message.

Artemis does the first perfectly, but it does not seem to do the second at all. The message is sent to the consumer without any modification to annotations section. Message-annotations section is one of the mutable sections of a message.

From the library side the modified outcome is sent correctly. Given the following test code,

        public async Task BasicSendReceiveAsync()
        {
            string testName = "BasicSendReceiveAsync";

            Connection connection = await Connection.Factory.CreateAsync(this.testTarget.Address);
            Session session = new Session(connection);
            SenderLink sender = new SenderLink(session, "sender-" + testName, testTarget.Path);

            Message message = new Message();
            message.MessageAnnotations = new MessageAnnotations();
            message.MessageAnnotations.Map[(Symbol)"key0"] = "value0";
            message.Properties = new Properties() { MessageId = "msg0", GroupId = testName };
            message.ApplicationProperties = new ApplicationProperties();
            message.ApplicationProperties["sn"] = 100;
            await sender.SendAsync(message);

            ReceiverLink receiver = new ReceiverLink(session, "receiver-" + testName, testTarget.Path);
            Message message0 = await receiver.ReceiveAsync();
            receiver.Modify(message0, true, true, new Fields() { { (Symbol)"key1", "value1" } });

            await connection.CloseAsync();

            connection = await Connection.Factory.CreateAsync(this.testTarget.Address);
            session = new Session(connection);
            ReceiverLink receiver2 = new ReceiverLink(session, "receiver2-" + testName, testTarget.Path);
            var message2 = await receiver2.ReceiveAsync();
            receiver2.Accept(message2);
            await connection.CloseAsync();
        }

The protocol communication is as follows.

[02:41:44.794] SEND AMQP 3 1 0 0
[02:41:44.815] RECV sasl-mechanisms(sasl-server-mechanisms:[PLAIN,ANONYMOUS])
[02:41:44.819] SEND sasl-init(mechanism:PLAIN,initial-response:...,hostname:localhost)
[02:41:44.821] RECV sasl-outcome(code:Ok)
[02:41:44.823] SEND AMQP 0 1.0.0
[02:41:44.824] SEND (ch=0) open(container-id:AMQPNetLite-4a53d2ac,host-name:localhost,max-frame-size:262144,channel-max:255)
[02:41:44.826] RECV AMQP 0 1 0 0
[02:41:44.828] RECV (ch=0) open(container-id:0d2a1d17-148e-11ef-a836-00155d040a05,max-frame-size:131072,channel-max:65535,idle-time-out:30000,offered-capabilities:[sole-connection-for-container,DELAYED_DELIVERY,SHARED-SUBS,ANONYMOUS-RELAY],properties:[product:apache-activemq-artemis,version:2.33.0])
[02:41:44.830] SEND (ch=0) begin(next-outgoing-id:4294967293,incoming-window:2048,outgoing-window:2048,handle-max:1023)
[02:41:44.835] SEND (ch=0) attach(name:sender-BasicSendReceiveAsync,handle:0,role:False,source:source(),target:target(address:q1),initial-delivery-count:0)
[02:41:44.859] RECV (ch=0) begin(remote-channel:0,next-outgoing-id:1,incoming-window:2147483647,outgoing-window:2147483647,handle-max:65535)
[02:41:44.861] RECV (ch=0) attach(name:sender-BasicSendReceiveAsync,handle:0,role:True,snd-settle-mode:Mixed,rcv-settle-mode:First,source:source(),target:target(address:q1))
[02:41:44.864] RECV (ch=0) flow(next-in-id:4294967293,in-window:2147483647,next-out-id:1,out-window:2147483647,handle:0,delivery-count:0,link-credit:1000)
[02:41:44.868] SEND (ch=0) transfer(handle:0,delivery-id:0,delivery-tag:00000001,message-format:0,settled:False,batchable:True) payload 76
[02:41:44.870] RECV (ch=0) disposition(role:True,first:0,last:0,settled:True,state:accepted())
[02:41:44.873] SEND (ch=0) attach(name:receiver-BasicSendReceiveAsync,handle:1,role:True,source:source(address:q1),target:target())
[02:41:44.875] SEND (ch=0) flow(next-in-id:1,in-window:2048,next-out-id:4294967294,out-window:2147483646,handle:1,link-credit:200,drain:False)
[02:41:44.876] RECV (ch=0) attach(name:receiver-BasicSendReceiveAsync,handle:1,role:False,snd-settle-mode:Mixed,rcv-settle-mode:First,source:source(address:q1),target:target(),incomplete-unsettled:False,initial-delivery-count:0)
[02:41:44.877] RECV (ch=0) transfer(handle:1,delivery-id:0,delivery-tag:00,message-format:0,settled:False) payload 76
[02:41:44.882] SEND (ch=0) disposition(role:True,first:0,settled:True,state:modified(delivery-failed:True,undeliverable-here:True,message-annotations:[key1:value1]))
[02:41:44.884] SEND (ch=0) close()
[02:41:44.885] RECV (ch=0) close()
[02:41:44.890] SEND AMQP 3 1 0 0
[02:41:44.892] RECV sasl-mechanisms(sasl-server-mechanisms:[PLAIN,ANONYMOUS])
[02:41:44.892] SEND sasl-init(mechanism:PLAIN,initial-response:...,hostname:localhost)
[02:41:44.892] RECV sasl-outcome(code:Ok)
[02:41:44.893] SEND AMQP 0 1.0.0
[02:41:44.893] SEND (ch=0) open(container-id:AMQPNetLite-4a53d2ac,host-name:localhost,max-frame-size:262144,channel-max:255)
[02:41:44.893] RECV AMQP 0 1 0 0
[02:41:44.893] RECV (ch=0) open(container-id:0d2a1d17-148e-11ef-a836-00155d040a05,max-frame-size:131072,channel-max:65535,idle-time-out:30000,offered-capabilities:[sole-connection-for-container,DELAYED_DELIVERY,SHARED-SUBS,ANONYMOUS-RELAY],properties:[product:apache-activemq-artemis,version:2.33.0])
[02:41:44.894] SEND (ch=0) begin(next-outgoing-id:4294967293,incoming-window:2048,outgoing-window:2048,handle-max:1023)
[02:41:44.894] SEND (ch=0) attach(name:receiver2-BasicSendReceiveAsync,handle:0,role:True,source:source(address:q1),target:target())
[02:41:44.894] SEND (ch=0) flow(in-window:2048,next-out-id:4294967293,out-window:2048,handle:0,link-credit:200,drain:False)
[02:41:44.896] RECV (ch=0) begin(remote-channel:0,next-outgoing-id:1,incoming-window:2147483647,outgoing-window:2147483647,handle-max:65535)
[02:41:44.896] RECV (ch=0) attach(name:receiver2-BasicSendReceiveAsync,handle:0,role:False,snd-settle-mode:Mixed,rcv-settle-mode:First,source:source(address:q1),target:target(),incomplete-unsettled:False,initial-delivery-count:0)
[02:41:44.898] RECV (ch=0) transfer(handle:0,delivery-id:0,delivery-tag:00,message-format:0,settled:False) payload 88
[02:41:44.899] SEND (ch=0) disposition(role:True,first:0,settled:True,state:accepted())
[02:41:44.899] SEND (ch=0) close()
[02:41:44.901] RECV (ch=0) close()

message2.MessageAnnotations map has only "key0":"value0". The "key1" in modified outcome was not applied by the broker.

The size of message2 is 88 bytes which is greater than the original (76 bytes), because the broker incremented delivery-count of the message upon receiving the modified outcome. {header(delivery-count:1)}. message0 has a null header.