amqp / rhea

A reactive messaging library based on the AMQP protocol
Apache License 2.0
273 stars 80 forks source link

Issue and question - Detached receivers / draining credits #412

Open kbfirebreather opened 8 months ago

kbfirebreather commented 8 months ago

Issue: Can't ack a message on a detached receiver.

I'm trying to allow messages that are being consumed (message sent / haven't been ack'd yet) during server shutdown. I attempted to detach the receiver while it's processing, and waiting for it to complete before closing the queues / connection to Artemis. It appears though that calling ack() in a detached state doesn't actually acknowledge the message, and the next consumer on the queue picks up the message that was being processed. Is acknowledging a message while in a detached state supported?

Question:

If not, I would like to configure the consumers to stop consuming messages during shutdown, while it waits for pre-shutdown consuming to complete. How can I set the credits for the receiver to 0 so Artemis doesn't attempt to send any messages during shutdown? I see on the receiver interface there is a drain_credit() function. Is that what I could call to prevent the consumer from picking up new messages?

grs commented 8 months ago

Can you attach a simple reproducer? Or else a protocol trace/log of the interaction? Acknowledgement should be allowed after the detaching of the receiver. Whether the broker handles that is another question though. It may consider unacked messages at the point the receiver is closed as unconsumed.

kbfirebreather commented 8 months ago

When you say protocol trace/log, do you mean from rhea, or my broker? (ActiveMQ in this case)

grs commented 8 months ago

Either should do. The key thing is to determine what the actual interaction is.

kbfirebreather commented 8 months ago

Okay, turned on rhea debug logging, and scaled my service to two so there were two consumers on the queue.

here are the logs from the service that consumed the message originally, showing the message being consumed, a sleep statement being executed, and then the SIGTERM being sent in the middle of the sleep.

[vagrant@k3sdev (default) /opt/service/kubernetes ]> kubectl logs -f service-alerts-dbd9d8cc5-sgkx8 --since=10s
Mon, 16 Oct 2023 13:31:20 GMT rhea:io [connection-1] read 440 bytes
Mon, 16 Oct 2023 13:31:20 GMT rhea:io [connection-1] got frame of size 440
Mon, 16 Oct 2023 13:31:20 GMT rhea:raw [connection-1] RECV: 440 000001b802000000005314c00905520243a001004342005370c00402415000005372c12904a30e782d6f70742d6a6d732d646573745100a312782d6f70742d6a6d732d6d73672d747970655105005373d0000000280000000a40a00475736572a10c71756575652e616c65727473404040404040830000018b38af1647005374c11502a1114a4d535844656c6976657279436f756e7440005377b1000001197b2274797065223a22616c657274222c227365766572697479223a227761726e696e67222c2274696d657374616d70223a22323031312d31302d30355431343a34383a30302e3030305a222c227469746c65223a225465737420416c657274222c22636f6e74656e74223a2254686520636f6e74656e742e2e2e222c226461746154797065223a22747970654f6644617461222c2264617461223a7b227761726e696e67223a22736f6d657468696e67222c22696e666f223a22736f6d657468696e6720656c7365227d2c2265787465726e616c223a66616c73652c2275726c223a2268747470733a2f2f6d61737465722e62656173742e75732e6c6d636f2e636f6d2f222c2270726f6475636572223a226265617374227d
Mon, 16 Oct 2023 13:31:20 GMT rhea:frames [connection-1]:0 <- transfer#14 {"handle":2,"delivery_tag":{"type":"Buffer","data":[0]}} <Buffer 00 53 70 c0 04 02 41 50 00 00 53 72 c1 29 04 a3 0e 78 2d 6f 70 74 2d 6a 6d 73 2d 64 65 73 74 51 00 a3 12 78 2d 6f 70 74 2d 6a 6d 73 2d 6d 73 67 2d 74 ... 368 more bytes>
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'List8', typecode: 192, width: 1, category: 3, create: [Function (anonymous)] { typecode: 192 } }, value: [ Typed { type: [TypeDesc], value: true }, Typed { type: [TypeDesc], value: 0 } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 112 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 112 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Map8', typecode: 193, width: 1, category: 3, create: [Function (anonymous)] { typecode: 193 } }, value: [ Typed { type: [TypeDesc], value: 'x-opt-jms-dest' }, Typed { type: [TypeDesc], value: 0 }, Typed { type: [TypeDesc], value: 'x-opt-jms-msg-type' }, Typed { type: [TypeDesc], value: 5 } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 114 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 114 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'List32', typecode: 208, width: 4, category: 3, create: [Function (anonymous)] { typecode: 208 } }, value: [ Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: <Buffer 75 73 65 72> }, Typed { type: [TypeDesc], value: 'queue.alerts' }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: 2023-10-16T13:31:20.519Z } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 115 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 115 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Map8', typecode: 193, width: 1, category: 3, create: [Function (anonymous)] { typecode: 193 } }, value: [ Typed { type: [TypeDesc], value: 'JMSXDeliveryCount' }, Typed { type: [TypeDesc], value: null } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 116 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 116 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Str32', typecode: 177, width: 4, category: 2, encoding: 'utf8', create: [Function (anonymous)] { typecode: 177 } }, value: '{"type":"alert","severity":"warning","timestamp":"2011-10-05T14:48:00.000Z","title":"Test Alert","content":"The content...","dataType":"typeOfData","data":{"warning":"something","info":"something else"},"external":false,"producer":"service"}', descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 119 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 119 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:events [connection-1] Link got event: message
[2023-10-16T13:31:20Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): Sleeping for 30 seconds to simulator long running amq consumer
Mon, 16 Oct 2023 13:31:21 GMT rhea:frames [connection-1]:0 -> empty
Mon, 16 Oct 2023 13:31:21 GMT rhea:raw [connection-1] SENT: 8 0000000802000000
[2023-10-16T13:31:26Z] INFO  (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): Received SIGTERM event. Shutting down.
[2023-10-16T13:31:26Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): closing context
[2023-10-16T13:31:26Z] INFO  (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): Shutting down Messenger
[2023-10-16T13:31:26Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): queue.alerts Detaching queue to wait for consuming messages while shutting down
[2023-10-16T13:31:26Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): Waiting for consumers to complete, or timeout to be reached
Mon, 16 Oct 2023 13:31:26 GMT rhea:frames [connection-1]:0 -> detach#16 {"handle":2}
Mon, 16 Oct 2023 13:31:26 GMT rhea:raw [connection-1] SENT: 23 0000001702000000005316d00000000700000002520242
[2023-10-16T13:31:26Z] INFO  (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): server closed
Mon, 16 Oct 2023 13:31:26 GMT rhea:io [connection-1] read 16 bytes
Mon, 16 Oct 2023 13:31:26 GMT rhea:io [connection-1] got frame of size 16
Mon, 16 Oct 2023 13:31:26 GMT rhea:raw [connection-1] RECV: 16 0000001002000000005316c003015202
Mon, 16 Oct 2023 13:31:26 GMT rhea:frames [connection-1]:0 <- detach#16 {"handle":2}
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [connection-1] Link got event: receiver_close
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [connection-1] Session got event: receiver_close
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [connection-1] Connection got event: receiver_close
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [cd669231-d8cf-b04c-aa6e-0bd01fbc508f] Container got event: receiver_close

Here is the other service that is consuming on the queue. Ends up picking up the message as soon as the other service detaches on the queue, which occurs at 13:31:26

[vagrant@k3sdev (default) ~ ]> kubectl logs -f service-alerts-dbd9d8cc5-dhrjk --since=10s
Mon, 16 Oct 2023 13:31:15 GMT rhea:io [connection-1] read 8 bytes
Mon, 16 Oct 2023 13:31:15 GMT rhea:io [connection-1] got frame of size 8
Mon, 16 Oct 2023 13:31:15 GMT rhea:raw [connection-1] RECV: 8 0000000802000000
Mon, 16 Oct 2023 13:31:15 GMT rhea:frames [connection-1]:0 <- empty
Mon, 16 Oct 2023 13:31:26 GMT rhea:io [connection-1] read 441 bytes
Mon, 16 Oct 2023 13:31:26 GMT rhea:io [connection-1] got frame of size 441
Mon, 16 Oct 2023 13:31:26 GMT rhea:raw [connection-1] RECV: 441 000001b902000000005314c00a0552025201a001004342005370c00402415000005372c12904a30e782d6f70742d6a6d732d646573745100a312782d6f70742d6a6d732d6d73672d747970655105005373d0000000280000000a40a00475736572a10c71756575652e616c65727473404040404040830000018b38af1647005374c11502a1114a4d535844656c6976657279436f756e7440005377b1000001197b2274797065223a22616c657274222c227365766572697479223a227761726e696e67222c2274696d657374616d70223a22323031312d31302d30355431343a34383a30302e3030305a222c227469746c65223a225465737420416c657274222c22636f6e74656e74223a2254686520636f6e74656e742e2e2e222c226461746154797065223a22747970654f6644617461222c2264617461223a7b227761726e696e67223a22736f6d657468696e67222c22696e666f223a22736f6d657468696e6720656c7365227d2c2265787465726e616c223a66616c73652c2275726c223a2268747470733a2f2f6d61737465722e62656173742e75732e6c6d636f2e636f6d2f222c2270726f6475636572223a226265617374227d
Mon, 16 Oct 2023 13:31:26 GMT rhea:frames [connection-1]:0 <- transfer#14 {"handle":2,"delivery_id":1,"delivery_tag":{"type":"Buffer","data":[0]}} <Buffer 00 53 70 c0 04 02 41 50 00 00 53 72 c1 29 04 a3 0e 78 2d 6f 70 74 2d 6a 6d 73 2d 64 65 73 74 51 00 a3 12 78 2d 6f 70 74 2d 6a 6d 73 2d 6d 73 67 2d 74 ... 368 more bytes>
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'List8', typecode: 192, width: 1, category: 3, create: [Function (anonymous)] { typecode: 192 } }, value: [ Typed { type: [TypeDesc], value: true }, Typed { type: [TypeDesc], value: 0 } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 112 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 112 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Map8', typecode: 193, width: 1, category: 3, create: [Function (anonymous)] { typecode: 193 } }, value: [ Typed { type: [TypeDesc], value: 'x-opt-jms-dest' }, Typed { type: [TypeDesc], value: 0 }, Typed { type: [TypeDesc], value: 'x-opt-jms-msg-type' }, Typed { type: [TypeDesc], value: 5 } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 114 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 114 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'List32', typecode: 208, width: 4, category: 3, create: [Function (anonymous)] { typecode: 208 } }, value: [ Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: <Buffer 75 73 65 72> }, Typed { type: [TypeDesc], value: 'queue.alerts' }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: 2023-10-16T13:31:20.519Z } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 115 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 115 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Map8', typecode: 193, width: 1, category: 3, create: [Function (anonymous)] { typecode: 193 } }, value: [ Typed { type: [TypeDesc], value: 'JMSXDeliveryCount' }, Typed { type: [TypeDesc], value: null } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 116 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 116 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Str32', typecode: 177, width: 4, category: 2, encoding: 'utf8', create: [Function (anonymous)] { typecode: 177 } }, value: '{"type":"alert","severity":"warning","timestamp":"2011-10-05T14:48:00.000Z","title":"Test Alert","content":"The content...","dataType":"typeOfData","data":{"warning":"something","info":"something else"},"external":false,"producer":"service"}', descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 119 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 119 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [connection-1] Link got event: message
[2023-10-16T13:31:26Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-dhrjk): Sleeping for 30 seconds to simulator long running amq consumer
Mon, 16 Oct 2023 13:31:27 GMT rhea:frames [connection-1]:0 -> empty
Mon, 16 Oct 2023 13:31:27 GMT rhea:raw [connection-1] SENT: 8 0000000802000000
grs commented 8 months ago

Does anything else happen on the first process after the receiver_close? My understanding of your issue is that you want to acknowledge the message received after that point. Is that happening here? (There is no disposition in that first log, but neither is there a session close or connection close, so from an AMQP perspective it is incomplete).

My suspicion is that the broker will consider any messages unacknowledged at the point the receiver is detached to be available for redelivery. In that case, sending an acknowledgement after the detach is not going to have any effect anyway. (I can't confirm this without knowing what if anything happens with the first process after the detach).

Regarding the other question, at present you can only successfully drain the credit if you have automatic credit management disabled. There is an example of that here: https://github.com/amqp/rhea/blob/main/examples/drain.js.

kbfirebreather commented 8 months ago

Ah yeah, I accidentally cut off some logging at the end. I recreated it, and these were the rhea logs in the service that was shutting down

The message is acked, then the queue is closed

[2023-10-16T15:35:50Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-vzjn5): Closing queues
Mon, 16 Oct 2023 15:35:50 GMT rhea:frames [connection-1]:0 -> disposition#15 {"role":true,"settled":true,"state":[]}
Mon, 16 Oct 2023 15:35:50 GMT rhea:raw [connection-1] SENT: 28 0000001c02000000005315d00000000c000000054143434100532445

These are the logs from the second service that got the same message

Mon, 16 Oct 2023 15:35:59 GMT rhea:frames [connection-1]:0 -> disposition#15 {"role":true,"settled":true,"state":[]}
Mon, 16 Oct 2023 15:35:59 GMT rhea:raw [connection-1] SENT: 28 0000001c02000000005315d00000000c000000054143434100532445
Mon, 16 Oct 2023 15:35:59 GMT rhea:frames [connection-1]:0 -> flow#13 {"next_incoming_id":1,"incoming_window":2048,"outgoing_window":4294967295,"handle":2,"delivery_count":1,"link_credit":15}
Mon, 16 Oct 2023 15:35:59 GMT rhea:raw [connection-1] SENT: 39 0000002702000000005313d00000001700000007520170000008004370ffffffff52025201520f
Mon, 16 Oct 2023 15:36:15 GMT rhea:io [connection-1] read 8 bytes
Mon, 16 Oct 2023 15:36:15 GMT rhea:io [connection-1] got frame of size 8
Mon, 16 Oct 2023 15:36:15 GMT rhea:raw [connection-1] RECV: 8 0000000802000000
Mon, 16 Oct 2023 15:36:15 GMT rhea:frames [connection-1]:0 <- empty
Mon, 16 Oct 2023 15:36:21 GMT rhea:frames [connection-1]:0 -> empty
Mon, 16 Oct 2023 15:36:21 GMT rhea:raw [connection-1] SENT: 8 0000000802000000

It does look like this particular consumer is disabling automatic credit management, but not sure about the rest of the services. That is good to know!

Edit: Is it possible to mutate a consumer to disable automatic credit management, or is that only possible during consumer instantiation?

grs commented 8 months ago

Ok, the extra logs confirm that the broker is redelivering any unacknowledged message at the point that the receiver is closed. So acknowledgements sent after that are ignored.

kbfirebreather commented 8 months ago

Is it possible to mutate a consumer to disable automatic credit management, or is that only possible during consumer instantiation?

grs commented 8 months ago

At present you can only set that on creation of the receiver.