rabbitmq / rabbitmq-amqp1.0

AMQP 1.0 support for RabbitMQ
https://www.rabbitmq.com/
Other
93 stars 20 forks source link

rabbitmq-server with 1.0 plugin does not seem to honour flow control at all #67

Closed grs closed 5 years ago

grs commented 5 years ago

In AMQP 1.0, transfers should not be made unless the receiving peer has granted credit. Using 3.6.16 of the 1.0 plugin, rabbitmq appears not to honour this.

To reproduce using the Apache Qpid proton python client, run the following against a queue containing messages ('examples' by default, so can use the simple_send.py example[1] to fill it up):

from __future__ import print_function
import optparse
from proton.handlers import MessagingHandler
from proton.reactor import Container

class Recv(MessagingHandler):
    def __init__(self, url):
        super(Recv, self).__init__(prefetch=0)
        self.url = url

    def on_start(self, event):
        event.container.create_receiver(self.url)

    def on_message(self, event):
        print(event.message)

parser = optparse.OptionParser(usage="usage: %prog [options]")
parser.add_option("-a", "--address", default="localhost:5672/examples",
                  help="address from which messages are received (default %default)")
opts, args = parser.parse_args()

try:
    Container(Recv(opts.address)).run()
except KeyboardInterrupt: pass

This should never receive any messages as it does not issue credit.

Using PN_TRACE_FRM=1 when running that you see the following trace (queue populated with simple_send.py -m 5:

$ PN_TRACE_FRM=1 ./no_flow.py
[0x5626994241f0]:  -> SASL
[0x5626994241f0]:  <- SASL
[0x5626994241f0]:0 <- @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS, :PLAIN, :AMQPLAIN]]
[0x5626994241f0]:0 -> @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@localhost.localdomain"]
[0x5626994241f0]:0 <- @sasl-outcome(68) [code=0]
[0x5626994241f0]:  -> AMQP
[0x5626994241f0]:0 -> @open(16) [container-id="d560cae9-d147-472c-9af4-da0278e4336e", hostname="localhost", channel-max=32767]
[0x5626994241f0]:0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=2147483647]
[0x5626994241f0]:0 -> @attach(18) [name="d560cae9-d147-472c-9af4-da0278e4336e-examples", handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="examples", durable=0, timeout=0, dynamic=false], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0, max-message-size=0]
[0x5626994241f0]:  <- AMQP
[0x5626994241f0]:0 <- @open(16) [container-id="rabbit@localhost.localdomain", channel-max=32767, idle-time-out=60000, properties={:"cluster_name"="rabbit@localhost.localdomain", :copyright="Copyright (C) 2007-2018 Pivotal Software, Inc.", :information="Licensed under the MPL.  See http://www.rabbitmq.com/", :platform="Erlang/OTP 19.3.6.10", :product="RabbitMQ", :version="3.6.16"}]
[0x5626994241f0]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=0, incoming-window=65535, outgoing-window=65535, handle-max=4294967295]
[0x5626994241f0]:0 <- @attach(18) [name="d560cae9-d147-472c-9af4-da0278e4336e-examples", handle=0, role=false, snd-settle-mode=0, rcv-settle-mode=0, source=@source(40) [address="examples", durable=0, timeout=0, dynamic=false, default-outcome=@released(38) [], outcomes=@PN_SYMBOL[:"amqp:accepted:list", :"amqp:rejected:list", :"amqp:released:list"]], initial-delivery-count=0]
[0x5626994241f0]:0 <- @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x01", message-format=0, settled=false, more=false, resume=false, aborted=false, batchable=false] (49) "\x00Sp\xc0\x06\x05B@@A@\x00Ss\xd0\x00\x00\x00\x06\x00\x00\x00\x01S\x01\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x01"
[0x5626994241f0]:0 <- @transfer(20) [handle=0, delivery-id=1, delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x02", message-format=0, settled=false, more=false, resume=false, aborted=false, batchable=false] (49) "\x00Sp\xc0\x06\x05B@@A@\x00Ss\xd0\x00\x00\x00\x06\x00\x00\x00\x01S\x02\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x02"
[0x5626994241f0]:0 <- @transfer(20) [handle=0, delivery-id=2, delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x03", message-format=0, settled=false, more=false, resume=false, aborted=false, batchable=false] (49) "\x00Sp\xc0\x06\x05B@@A@\x00Ss\xd0\x00\x00\x00\x06\x00\x00\x00\x01S\x03\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x03"
[0x5626994241f0]:0 <- @transfer(20) [handle=0, delivery-id=3, delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x04", message-format=0, settled=false, more=false, resume=false, aborted=false, batchable=false] (49) "\x00Sp\xc0\x06\x05B@@A@\x00Ss\xd0\x00\x00\x00\x06\x00\x00\x00\x01S\x04\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x04"
[0x5626994241f0]:0 <- @transfer(20) [handle=0, delivery-id=4, delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x05", message-format=0, settled=false, more=false, resume=false, aborted=false, batchable=false] (49) "\x00Sp\xc0\x06\x05B@@A@\x00Ss\xd0\x00\x00\x00\x06\x00\x00\x00\x01S\x05\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x05"
Message{first_acquirer=1, id=1, body={"sequence"=1}}
Message{first_acquirer=1, id=2, body={"sequence"=2}}
Message{first_acquirer=1, id=3, body={"sequence"=3}}
Message{first_acquirer=1, id=4, body={"sequence"=4}}
Message{first_acquirer=1, id=5, body={"sequence"=5}}
[0x5626994241f0]:0 -> @disposition(21) [role=true, first=0, last=4, settled=true, state=@accepted(36) []]

As you can see, the client never send a flow performative, meaning no credit has been granted. The server therefore shoul not send transfer performatives.

[1] https://gitbox.apache.org/repos/asf?p=qpid-proton.git;a=blob;f=python/examples/simple_send.py;h=7717a16577c940e8fd9ad8650771f52f6e81ba8c;hb=HEAD

kjnilsson commented 5 years ago

Hi. Thanks for the report.

This was addressed in https://github.com/rabbitmq/rabbitmq-amqp1.0/issues/43

Please update to RabbitMQ 3.7.x to take advantage of this fix.