apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.19k stars 11.67k forks source link

Check message size before message compression #7758

Open Aresxue opened 9 months ago

Aresxue commented 9 months ago

Before Creating the Bug Report

Runtime platform environment

All

RocketMQ version

All

JDK Version

All

Describe the Bug

The rocketmq client does both message size and message compression, but check message size before message compression.This makes it possible for a 5M message in the default configuration to be smaller than 4M by compression, but now you have to manually adjust the maxMessageSize parameter, which makes the semantics of the client and broker's check message size not match, please confirm this is a problem, I will submit a pr to fix it if necessary.

Steps to Reproduce

At will

What Did You Expect to See?

If the size of a 5M message is smaller than 4M, the message can pass the verification

What Did You See Instead?

A message that is 5M in size but smaller than 4M in size fails to pass the check

Additional Context

No response

zyhui98 commented 9 months ago

at 5.1.4 delay check message is about (header + body), but send to broke is only body. is also a problem.

zyhui98 commented 9 months ago

@Aresxue yes, i think this is a problem

francisoliverlee commented 9 months ago

at 5.1.4 delay check message is about (header + body), but send to broke is only body. is also a problem.

i found some size checks here but no delay message check, can you pls give out the code?

1. Client check

  1. checked in Validators.checkMessage(msg, this.defaultMQProducer);
    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
                "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
        }
  2. checked in DefaultMQProducer.batch()
    private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
        MessageBatch msgBatch;
        try {
            msgBatch = MessageBatch.generateFromList(msgs);
            for (Message message : msgBatch) {
                Validators.checkMessage(message, this); //. one check
                MessageClientIDSetter.setUniqID(message);
                message.setTopic(withNamespace(message.getTopic()));
            }
            MessageClientIDSetter.setUniqID(msgBatch);
            msgBatch.setBody(msgBatch.encode()); // check in Validators.checkMessage(msg, this.defaultMQProducer); 
    in calling send()
        } catch (Exception e) {
            throw new MQClientException("Failed to initiate the MessageBatch", e);
        }
        msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
        return msgBatch;
    }

2. Proxy check message body size in SendMessageActivity.SendMessageActivity()

image

3. Broker not check message size

image

4. Store's config key=maxMessageSize checked in MessageExtEncoder.java WITH 2 checks.

image