Open wingedfox opened 7 years ago
Does this not work for you?
const c = require('amqp10').Constants;
let receiver = await client.createReceiver('some.address', {
attach: { receiverSettleMode: c.receiverSettleMode.settleOnDisposition },
});
receiver.on('message', message => {
this.modify(message, { undeliverableHere: true });
});
Nope, this does not work. Message simply disappears (durable=false) or falls to DLQ (durable=true).
@wingedfox that would seem to be an implementation detail of ActiveMQ itself then, we allow you to do everything the AMQP 1.0 spec allows us to do. So you have full control over disposition here, if there are any other options that ActiveMQ supports then that would be something you'd have to check on that side. An interesting note is that the modified
disposition allows for a few other properties that might help you:
(from the spec:) Field Details delivery-failed count the transfer as an unsuccessful delivery attempt If the delivery-failed flag is set, any messages modified MUST have their delivery-count incremented.
undeliverable-here prevent redelivery If the undeliverable-here is set, then any messages released MUST NOT be redelivered to the modifying link endpoint.
message-annotations message attributes Map containing attributes to combine with the existing message-annotations held in the message’s header section. Where the existing message-annotations of the message contain an entry with the same key as an entry in this field, the value in this field associated with that key replaces the one in the existing headers; where the existing message-annotations has no such value, the value in this map is added.
First of all, please update the docs and default policy, because you define senderSettleMode
and receiverSettleMode
but framing takes value from sndSettleMode
and rcvSettleMode
.
Take a look at
https://github.com/noodlefrenzy/node-amqp10/blob/master/lib/policies/policy.js#L143
https://github.com/noodlefrenzy/node-amqp10/blob/master/lib/frames.js#L224
Probably I'm doing something wrong, but I have the following framing sequence:
amqp10:link undefined:0: Transitioning from DETACHED to ATTACHING due to sendAttach +7ms
amqp10:link attach CH=1, Handle=0 +2ms
amqp10:framing sending frame: @attach(18) [name='amq.test_99f95ae3-1081-4569-9b8b-a055a7dc7155' handle=0 role=false sndSettleMode=0 rcvSettleMode=0 source=@source(40) [address='localhost' durable=0 expiryPolicy='session-end' timeout=0 dynamic=false dynamicNodeProperties={} distributionMode=null filter={} defaultOutcome=null outcomes=null capabilities=null] target=@target(41) [address='amq.test' durable=0 expiryPolicy='session-end' timeout=0 dynamic=false dynamicNodeProperties={} capabilities=null] unsettled={} incompleteUnsettled=false initialDeliveryCount=1 maxMessageSize=0 offeredCapabilities=null desiredCapabilities=null properties={}] +1ms
amqp10:trace raw: [0000009e02000001005312c0910ea12d616d712e746573745f39396639356165332d313038312d343536392d396238622d613035356137646337313535434250005000005328c0260ba1096c6f63616c686f737443a30b73657373696f6e2d656e644342c1010040c10100404040005329c01f07a108616d712e7465737443a30b73657373696f6e2d656e644342c1010040c10100425201444040c10100] +0ms
amqp10:connection Rx: 0000008e02000000005312c08107a12d616d712e746573745f39396639356165332d313038312d343536392d396238622d613035356137646337313535434150025000005328c02308a1096c6f63616c686f737443a30b73657373696f6e2d656e644342c1010040c10100005329c01e06a108616d712e7465737443a30b73657373696f6e2d656e644342c101000000002402000000005313c017075201707fffffff5201707fffffff43520170000003e8 +1ms
amqp10:framing received frame: @attach(18) [name='amq.test_99f95ae3-1081-4569-9b8b-a055a7dc7155' handle=0 role=true sndSettleMode=2 rcvSettleMode=0 source=@source(40) [address='localhost' durable=0 expiryPolicy='session-end' timeout=0 dynamic=false dynamicNodeProperties={} distributionMode=null filter={} defaultOutcome=null outcomes=null capabilities=null] target=@target(41) [address='amq.test' durable=0 expiryPolicy='session-end' timeout=0 dynamic=false dynamicNodeProperties={} capabilities=null] unsettled={} incompleteUnsettled=false initialDeliveryCount=null maxMessageSize=0 offeredCapabilities=null desiredCapabilities=null properties={}] +2ms
amqp10:trace raw: [005312c08107a12d616d712e746573745f39396639356165332d313038312d343536392d396238622d613035356137646337313535434150025000005328c02308a1096c6f63616c686f737443a30b73657373696f6e2d656e644342c1010040c10100005329c01e06a108616d712e7465737443a30b73657373696f6e2d656e644342c10100] +0ms
amqp10:link amq.test_99f95ae3-1081-4569-9b8b-a055a7dc7155:0: Transitioning from ATTACHING to ATTACHED due to attachReceived +1ms
amqp10:link amq.test_99f95ae3-1081-4569-9b8b-a055a7dc7155: attached CH=[1=>0], Handle=[0=>0] +0ms
amqp10:framing received frame: @flow(19) [nextIncomingId=1 incomingWindow=2147483647 nextOutgoingId=1 outgoingWindow=2147483647 handle=0 deliveryCount=1 linkCredit=1000 available=null drain=false echo=false properties={}] +0ms
amqp10:trace raw: [005313c017075201707fffffff5201707fffffff43520170000003e8] +0ms
amqp10:session New Incoming Window (known id): 2147483647 = 1 + 2147483647 - 1 +1ms
amqp10:link:sender setting credits (1001,2147483647) +0ms
amqp10:link:sender canSend(1001,2147483647) = true +0ms
amqp10:link:sender sending: { behavior: 2 } +0ms
amqp10:framing sending frame: @transfer(20) [handle=0 deliveryId=1 deliveryTag=<Buffer 31> messageFormat=0 settled=false more=false rcvSettleMode=0 state=null resume=false aborted=false batchable=false] +2ms
amqp10:trace raw: [0000004202000001005314c0100b435201a00131434242500040424242005373c00e0d40404040404040404040404040005377c10d02a1086265686176696f725202] +0ms
amqp10:connection Rx: 0000001802000000005315c00b0541520152014100532445 +3ms
amqp10:framing received frame: @disposition(21) [role=true first=1 last=1 settled=true state=@accepted(36) [] batchable=false] +1ms
amqp10:trace raw: [005315c00b0541520152014100532445] +0ms
So, even if I ask to use 'unsettled' and have no consumers on queue, ActiveMQ returns settled message. Do you have an idea on what's going wrong?
Trying the same with RabbitMQ I've got "rcv-settle-mode second not supported", which at least describes its' accepted(@36) behaviour. Probably ActiveMQ does not support receiver side disposition as well...
@wingedfox thanks for pointing out the inconsistencies with the settle modes, that has been fixed in f3c46177c2991984e3af7e77799e5f7987137dc3. We actually had another bug related to setting the settled
field in a TransferFrame, where it was never actually checking the sender settle mode.
What appears to be happening in your case is that "auto settle" is used as the receiver settle mode:
amqp10:framing sending frame: @transfer(20) [handle=0 deliveryId=1 deliveryTag=<Buffer 31> messageFormat=0 settled=false more=false rcvSettleMode=0 state=null resume=false aborted=false batchable=false] +2ms
In this case we want rcvSettleMode
to be 1
, which is defined as "The receiver will only settle after sending the disposition to the sender and receiving a disposition indicating settlement of the delivery from the sender."
I'll push a patch release right now, could you please try again with the updated code? (v3.5.1)
Please take a look at my question on activemq mailing list http://activemq.2283324.n4.nabble.com/How-to-setup-policy-for-client-side-message-acceptance-td4724961.html
I send attach frame with senderSettleMode=0 receiverSettleMode=1
, but get back attach frame with senderSettleMode=2 receiverSettleMode=0
, which leads to accept(36)
in sender callback even if no consumers registered.
@mbroadst checked with 3.5.1, properties are updated and work as expected, but there's no success with the link attachment
amqp10:framing => @attach(18) [name='test_659b77bb-8579-48c4-8db3-b6abbe9d6ad8' handle=0 role=false sndSettleMode=0 rcvSettleMode=1 source=@source(40) [address='localhost' durable=0 expiryPolicy='session-end' timeout=0 dynamic=false dynamicNodeProperties={} distributionMode=null filter={} defaultOutcome=null outcomes=null capabilities=null] target=@target(41) [address='test' durable=0 expiryPolicy='session-end' timeout=0 dynamic=false dynamicNodeProperties={} capabilities=null] unsettled={} incompleteUnsettled=false initialDeliveryCount=1 maxMessageSize=0 offeredCapabilities=null desiredCapabilities=null properties={}] +1ms
amqp10:trace raw: [0000009602000001005312c0890ea129746573745f36353962373762622d383537392d343863342d386462332d623661626265396436616438434250005001005328c0260ba1096c6f63616c686f737443a30b73657373696f6e2d656e644342c1010040c10100404040005329c01b07a1047465737443a30b73657373696f6e2d656e644342c1010040c10100425201444040c10100] +0ms
amqp10:trace Rx: 0000008602000000005312c07907a129746573745f36353962373762622d383537392d343863342d386462332d623661626265396436616438434150025000005328c02308a1096c6f63616c686f737443a30b73657373696f6e2d656e644342c1010040c10100005329c01a06a1047465737443a30b73657373696f6e2d656e644342c101000000002402000000005313c017075201707fffffff5201707fffffff43520170000003e8 +15ms
amqp10:framing <= @attach(18) [name='test_659b77bb-8579-48c4-8db3-b6abbe9d6ad8' handle=0 role=true sndSettleMode=2 rcvSettleMode=0 source=@source(40) [address='localhost' durable=0 expiryPolicy='session-end' timeout=0 dynamic=false dynamicNodeProperties={} distributionMode=null filter={} defaultOutcome=null outcomes=null capabilities=null] target=@target(41) [address='test' durable=0 expiryPolicy='session-end' timeout=0 dynamic=false dynamicNodeProperties={} capabilities=null] unsettled={} incompleteUnsettled=false initialDeliveryCount=null maxMessageSize=0 offeredCapabilities=null desiredCapabilities=null properties={}] +1ms
amqp10:trace raw: [005312c07907a129746573745f36353962373762622d383537392d343863342d386462332d623661626265396436616438434150025000005328c02308a1096c6f63616c686f737443a30b73657373696f6e2d656e644342c1010040c10100005329c01a06a1047465737443a30b73657373696f6e2d656e644342c10100] +1ms
@wingedfox yeah that looks like ActiveMQ isn't respecting the options you are sending on attach 🤔
Hi,
Is there a way to tune up redelivery policy? I have a need to detect whether consumer can process message or no and modify message with
undeliverableHere
flag in latter case. I useRenewOnSettle
policy for manual message acceptance, but modified messages are not getting redelivered to another consumer but instanty disappear, even if I mark specific messages durable they just fall to DLQ and remain there until manual processing.ActiveMQ provides Redelivery Policy settings, but it is not clear how to combine it with the provided policies.