rabbitmq / rabbitmq-server

Open source RabbitMQ: core server and tier 1 (built-in) plugins
https://www.rabbitmq.com/
Other
12.15k stars 3.91k forks source link

AMQP 0-9-1: Message size and transaction storage length handling #8870

Open ikavgo opened 1 year ago

ikavgo commented 1 year ago

Describe the bug

1) Since message size check happens inside amqp_channel after amqp publish message assembled it's easy to trick rabbit to go up until watermark by sending arbitrary big messages. In that case network and reader run against GC.

https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbit_common/src/rabbit_command_assembler.erl#L103

2) Similar issue (to stomp) with transactions: methods just stored there and this could fill up to watermark too.

https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbit/src/rabbit_channel.erl#L1338

What's interesting - published counter increases even though messages go to transaction or can be rejected because of size:


    State0 = maybe_increase_global_publishers(State),
    rabbit_global_counters:messages_received(amqp091, 1),
    check_msg_size(Content, MaxMessageSize, GCThreshold),

Visible under "Queued messages" graph on the overview page.

Reproduction steps

Of course message sizes and iteration count are exaggregated.

For message size, here we are running against GC:

import pika

parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')

import random
import string

body = ''.join(random.choices(string.ascii_letters, k=2*1024*1024*1024))

for n in range(100000000):
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    channel.basic_publish('',
                          'test',
                          body,
                          pika.BasicProperties(content_type='text/plain',
                                               delivery_mode=pika.DeliveryMode.Transient))

For transaction:

import pika

parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')

connection = pika.BlockingConnection(parameters)

channel = connection.channel()

channel.tx_select()

import random
import string

body = ''.join(random.choices(string.ascii_letters, k=2*1024*1024))

for n in range(100000000):
    channel.basic_publish('',
                          'test',
                          body,
                          pika.BasicProperties(content_type='text/plain',
                                               delivery_mode=pika.DeliveryMode.Transient))

what's interesting here is that depending on setup rabbit either killed during test or watermarked. In management UI I don't see anything changing except published messages counter and total memory. So if it leaks slowly going to be hard to find.

Expected behavior

Maybe we should add size check right into assembler and also limit transaction storage size or use hmax and binary limit also similar to #8868

Additional context

No response

ikavgo commented 1 year ago

maybe ui could show transaction length on the channel page, just in case :man_shrugging: