amqp / rhea

A reactive messaging library based on the AMQP protocol
Apache License 2.0
281 stars 79 forks source link

Messages are not further received after 2046 unaccepted messages #425

Open radoslawlandowski opened 2 months ago

radoslawlandowski commented 2 months ago

Hello,

(First of all - not sure if i'm misusing rhea library or my broker setup is incorrect)

One ActiveMQ broker, one consumer, durable subscription. We have large groups of messages, up to 10000 messages. Each message is around 1000 bytes long. We accept all messages at once only when the last message in the group has arrived.

I setup the broker locally and when sending 10000 messages in the same group after 2047 messages no meessage is received (or the broker stops sending them) I've been trying to play around with broker settings but none of them change the situation so I'd like to understand if that's rhea library issue

We'd like to receive 10000 messages within a single group.

Receiver setup code:

        this.connection!.open_receiver({
            name: listenerName,
            autoaccept: false,
            source: { address: address, durable: 2, expiry_policy: 'never' },
        })

Broker statement at message sending freeze

DEBUG | queue://queue, subscriptions=1, memory=0%, size=10001, pending=0 toPageIn: 200, force:false, Inflight: 2400, pagedInMessages.size 2400, pagedInPendingDispatch.size 0, enqueueCount: 0, dequeueCount: 0, memUsage:4725600, maxPageSize:200

The last rhea events around the freeze

rhea:frames [connection-2]:0 <- transfer#14 {"delivery_id":2046,"delivery_tag":{"type":"Buffer","data":[7,254]}} <Buffer 00 53 70 d0 00 00 00 05 00 00 00 01 41 00 53 73 d0 00 00 00 4c 00 00 00 03 a1 24 61 32 61 36 30 66 66 37 2d 62 33 39 64 2d 34 31 34 34 2d 62 37 66 32 ... 811 more bytes> +33ms
rhea:message decoding section: Typed { type: TypeDesc { name: 'List32', typecode: 208, width: 4, category: 3, create: [Function (anonymous)] { typecode: 208 } }, value: [ Typed { type: [TypeDesc], value: true } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 112 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 112 } +30ms
rhea:message decoding section: Typed { type: TypeDesc { name: 'List32', typecode: 208, width: 4, category: 3, create: [Function (anonymous)] { typecode: 208 } }, value: [ Typed { type: [TypeDesc], value: 'a2a60ff7-b39d-4144-b7f2-0571a149c7b5' }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: 'queue' } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 115 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 115 } +1ms
rhea:message decoding section: Typed { type: TypeDesc { name: 'Map32', typecode: 209, width: 4, category: 3, create: [Function (anonymous)] { typecode: 209 } }, value: [ Typed { type: [TypeDesc], value: 'identifier' }, Typed { type: [TypeDesc], value: 'abc' } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 116 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 116 } +1ms
rhea:message decoding section: Typed { type: TypeDesc { name: 'Str32', typecode: 177, width: 4, category: 2, encoding: 'utf8', create: [Function (anonymous)] { typecode: 177 } }, value: '{"mail":"123@123.it","code":"0000002047"}', descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 119 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 119 } +2ms
rhea:events [connection-2] Link got event: message +58ms
rhea:events [connection-2] Session got event: message +1ms
rhea:events [connection-2] Connection got event: message +0ms
rhea:frames [connection-1]:0 -> empty +3s
rhea:raw [connection-1] SENT: 8 0000000802000000 +3s
rhea:frames [connection-2]:0 -> empty +2s
rhea:raw [connection-2] SENT: 8 0000000802000000 +2s
rhea:frames [connection-1]:0 -> empty +5s
rhea:raw [connection-1] SENT: 8 0000000802000000 +5s
rhea:frames [connection-2]:0 -> empty +2s
rhea:raw [connection-2] SENT: 8 0000000802000000 +2s
rhea:frames [connection-1]:0 -> empty +5s
rhea:raw [connection-1] SENT: 8 0000000802000000 +5s
rhea:frames [connection-2]:0 -> empty +2s
rhea:raw [connection-2] SENT: 8 0000000802000000 +2s

Best regards, thank you for that great lib!

grs commented 2 months ago

That is broker specific behaviour, what ActiveMQ terms 'preftech': https://activemq.apache.org/components/classic/documentation/what-is-the-prefetch-limit-for

Also relevant may be https://activemq.apache.org/components/classic/documentation/amqp but you may need to ask on the ActiveMQ forum for details on how to control the prefetch per client over AMQP.

radoslawlandowski commented 2 months ago

Thank you for your help!

I've been playing around with the configuration mentioned by you and also some AWS advisor but broker side configuration does not solve the problem for me :( . He noticed that apart from broker side configuration one can modify the client-side connection url, in the following way:

"ssl://b-995b1978-0142-4749-9f59-92e9e0623b46-1.mq.eu-west-1.amazonaws.com:61617?jms.prefetchPolicy.all=5000"

But I understand in rhea there is no notion of such parameter jms.prefetchPolicy.all ?

grs commented 2 months ago

Rhea does not have a parameter jms.prefetchPolicy.all because it is a library for the AMQP protocol. However I suspect that there is a way to represent that over the AMQP protocol and if you can determine that (my advice would be to ask the ActiveMQ user group at apache), then rhea should be able to comply with that.