apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.14k stars 11.65k forks source link

使用 docker compose 部署 2m 2s 并且加入代理组件当 其中一个 master broker 宕机之后部分消息一直重复消费 #8126

Open muzhi9018 opened 4 months ago

muzhi9018 commented 4 months ago

Before Creating the Bug Report

Runtime platform environment

宿主机 Win 11 23H2 Docker 容器基础镜像 ubuntu:22.04

RocketMQ version

Rocket MQ 版本 5.1.4 Rocket MQ 客户端版本 5.0.6 (rocketmq-client-java) 客户端 Maven 坐标

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.6</version>
</dependency>

JDK Version

Zulu JRE 17.0.11

Describe the Bug

使用 Docker Compose 部署 2 个 Master Broker; 2 个 Slave Broker; 当其中一个 Master Broker 宕机之后会有部分消息一直重复消费

Steps to Reproduce

Dockerfile 如下

FROM java:17.0.11


ARG version=5.1.4


# 定义用户名变量
ARG USER_NAME="rocketmq"
# 定义用户id变量
ARG USER_ID="1000"
# 定义用户组名称变量
ARG GROUP_NAME="rocketmq"
# 定义用户组id变量
ARG GROUP_ID="1000"


COPY benchmark /rocketmq/benchmark
COPY bin /rocketmq/bin
COPY conf /rocketmq/conf
COPY lib /rocketmq/lib
COPY LICENSE /rocketmq/LICENSE
COPY NOTICE /rocketmq/NOTICE


# 创建用户组和创建用户; 并且更改文件权限
RUN chown -R $USER_ID:$GROUP_ID /rocketmq \
  && groupadd -g $GROUP_ID $GROUP_NAME \
  && useradd -d /rocketmq -u $USER_ID -g $GROUP_NAME -l -m -s /bin/bash $USER_NAME \
  && chown -R $USER_NAME:$GROUP_NAME /rocketmq \
  && chmod +x -R /rocketmq/bin

# rocketmq home 环境变量
ENV ROCKETMQ_HOME=/rocketmq
ENV PATH=$ROCKETMQ_HOME/bin:$PATH


# 切换用户
USER $USER_NAME

# 设置工作目录
WORKDIR /rocketmq/bin

ENV JAVA_OPT=""
ENV JAVA_OPT_EXT=""


# nameserver 端口
EXPOSE 9876

# broker 端口
EXPOSE 10909 10911 10912


CMD ["dummy"]

docker-compose.yaml 如下

services:
  rocket-nameserver:
    image: apache/rocketmq:5.1.4
    container_name: rocket-nameserver
    restart: no
    stop_grace_period: 5s
    networks:
      - muzhi-cloud-net
#    ports:
#      - "9876:9876"
    env_file:
      - "conf/nameserver.env"
    entrypoint: ["sh", "mqnamesrv"]
  broker-master-a:
    image: apache/rocketmq:5.1.4
    container_name: broker-master-a
    restart: no
    stop_grace_period: 5s
    depends_on:
      - rocket-nameserver
    networks:
      - muzhi-cloud-net
    volumes:
      - "D:/WorkSpace/rocketmq/docker-home/broker-master-a/store:/rocketmq/store"
      - "D:/WorkSpace/rocketmq/docker-home/broker-master-a/logs:/rocketmq/logs"
      - "D:/WorkSpace/rocketmq/docker-home/conf/broker-master-a.conf:/rocketmq/conf/broker.conf"
#    ports:
#      - "10909:10909"
#      - "10911:10911"
#      - "10912:10912"
    env_file:
      - "conf/broker.env"
    entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"]
  broker-slave-a01:
    image: apache/rocketmq:5.1.4
    container_name: broker-slave-a01
    restart: no
    stop_grace_period: 5s
    depends_on:
      - rocket-nameserver
    networks:
      - muzhi-cloud-net
    volumes:
      - "D:/WorkSpace/rocketmq/docker-home/broker-slave-a01/store:/rocketmq/store"
      - "D:/WorkSpace/rocketmq/docker-home/broker-slave-a01/logs:/rocketmq/logs"
      - "D:/WorkSpace/rocketmq/docker-home/conf/broker-slave-a01.conf:/rocketmq/conf/broker.conf"
#    ports:
#      - "11909:11909"
#      - "11911:11911"
#      - "11912:11912"
    env_file:
      - "conf/broker.env"
    entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"]
  broker-master-b:
    image: apache/rocketmq:5.1.4
    container_name: broker-master-b
    restart: no
    stop_grace_period: 5s
    depends_on:
      - rocket-nameserver
    networks:
      - muzhi-cloud-net
    volumes:
      - "D:/WorkSpace/rocketmq/docker-home/broker-master-b/store:/rocketmq/store"
      - "D:/WorkSpace/rocketmq/docker-home/broker-master-b/logs:/rocketmq/logs"
      - "D:/WorkSpace/rocketmq/docker-home/conf/broker-master-b.conf:/rocketmq/conf/broker.conf"
#    ports:
#      - "12909:12909"
#      - "12911:12911"
#      - "12912:12912"
    env_file:
      - "conf/broker.env"
    entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"]
  broker-slave-b01:
    image: apache/rocketmq:5.1.4
    container_name: broker-slave-b01
    restart: no
    stop_grace_period: 5s
    depends_on:
      - rocket-nameserver
    networks:
      - muzhi-cloud-net
    volumes:
      - "D:/WorkSpace/rocketmq/docker-home/broker-slave-b01/store:/rocketmq/store"
      - "D:/WorkSpace/rocketmq/docker-home/broker-slave-b01/logs:/rocketmq/logs"
      - "D:/WorkSpace/rocketmq/docker-home/conf/broker-slave-b01.conf:/rocketmq/conf/broker.conf"
#    ports:
#      - "13909:13909"
#      - "13911:13911"
#      - "13912:13912"
    env_file:
      - "conf/broker.env"
    entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"]
  rocket-proxy:
    image: apache/rocketmq:5.1.4
    container_name: rocket-proxy
    restart: no
    stop_grace_period: 5s
    depends_on:
      - broker-master-a
      - broker-master-b
    networks:
      - muzhi-cloud-net
    volumes:
      - "D:/WorkSpace/rocketmq/docker-home/rocket-proxy/logs:/rocketmq/logs"
      - "D:/WorkSpace/rocketmq/docker-home/conf/rmq-proxy.json:/rocketmq/conf/rmq-proxy.json"
    ports:
      - "9080:9080"
      - "9081:9081"
    entrypoint: [ "sh", "mqproxy", "-pc", "/rocketmq/conf/rmq-proxy.json" ]
  rocketmq-dashboard:
    image: rocketmq/dashboard:1.0.0
    container_name: rocketmq-dashboard
    restart: no
    stop_grace_period: 5s
    depends_on:
      - rocket-nameserver
    networks:
      - muzhi-cloud-net
    ports:
      - "8180:8180"
networks:
  muzhi-cloud-net:
    external: true

Master Borker a 配置文件如下 broker-master-a.conf

namesrvAddr = rocket-nameserver:9876

listenPort = 10911

brokerClusterName = DefaultCluster

brokerName = muzhi-broker-a

brokerId = 0

brokerIP1 = broker-master-a

brokerIP2 = broker-master-a

brokerRole = ASYNC_MASTER

flushDiskType = ASYNC_FLUSH

autoCreateTopicEnable = false

slaveReadEnable = false

storePathCommitLog = /rocketmq/store/commitlog/

storePathConsumerQueue = /rocketmq/store/consumequeue/

deleteWhen = 04

fileReservedTime = 72

mapedFileSizeCommitLog = 1024 * 1024 * 1024a

Slave Borker a01 配置文件如下 broker-slave-a01.conf

namesrvAddr = rocket-nameserver:9876

listenPort = 11911

brokerClusterName = DefaultCluster

brokerName = muzhi-broker-a

brokerId = 1

brokerIP1 = broker-slave-a01

brokerIP2 = broker-slave-a01

brokerRole = SLAVE

flushDiskType = ASYNC_FLUSH

autoCreateTopicEnable = false

slaveReadEnable = false

storePathCommitLog = /rocketmq/store/commitlog/

storePathConsumerQueue = /rocketmq/store/consumequeue/

deleteWhen = 04

fileReservedTime = 72

mapedFileSizeCommitLog = 1024 * 1024 * 1024

Master Borker b 配置文件如下 broker-master-b.conf

namesrvAddr = rocket-nameserver:9876

listenPort = 12911

brokerClusterName = DefaultCluster

brokerName = muzhi-broker-b

brokerId = 0

brokerIP1 = broker-master-b

brokerIP2 = broker-master-b

brokerRole = ASYNC_MASTER

flushDiskType = ASYNC_FLUSH

autoCreateTopicEnable = false

slaveReadEnable = false

storePathCommitLog = /rocketmq/store/commitlog/

storePathConsumerQueue = /rocketmq/store/consumequeue/

deleteWhen = 04

fileReservedTime = 72

mapedFileSizeCommitLog = 1024 * 1024 * 1024

Slave Borker b01 配置文件如下 broker-slave-b01.conf

namesrvAddr = rocket-nameserver:9876

listenPort = 13911

brokerClusterName = DefaultCluster

brokerName = muzhi-broker-b

brokerId = 1

brokerIP1 = broker-slave-b01

brokerIP2 = broker-slave-b01

brokerRole = SLAVE

flushDiskType = ASYNC_FLUSH

autoCreateTopicEnable = false

slaveReadEnable = false

storePathCommitLog = /rocketmq/store/commitlog/

storePathConsumerQueue = /rocketmq/store/consumequeue/

deleteWhen = 04

fileReservedTime = 72

mapedFileSizeCommitLog = 1024 * 1024 * 1024

代理组件配置文件如下 rmq-proxy.json

{
 
    "namesrvAddr": "rocket-nameserver:9876",
    "rocketMQClusterName": "DefaultCluster",
    "remotingListenPort": 9080,
    "grpcServerPort": 9081
}

正常启动 消息生产者,和消息消费者 消息生产者代码如下

package com.muzhi.rocketmq;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Random;
import java.util.Scanner;

public class GrpcProducer {

    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String endpoints = "192.168.2.150:9080";
        String topic = "muzhi-cloud-test";
        String tag = "test-message";

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .enableSsl(false)
                .build();

        Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topic)
                .build();

        Scanner scanner = new Scanner(System.in);
        Random random = new Random();
        DateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");

        while (scanner.hasNext()) {
            String next;
            next = scanner.next();
            if (next.equals("shutdown")) {
                producer.close();
                scanner.close();
            }

            Message message = provider.newMessageBuilder()
                    .setTopic(topic)
                    .setTag(tag)
                    .setKeys(format.format(System.currentTimeMillis()) + (random.nextInt(900000) + 100000))
                    .setBody(next.getBytes(StandardCharsets.UTF_8))
                    .build();

            final SendReceipt sendReceipt = producer.send(message);
            System.out.printf("消息发送成功 id: %s%n",sendReceipt.getMessageId());
        }
    }
}

消息消费者代码如下

package com.muzhi.rocketmq;

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;

public class GrpcConsumer {

    public static void main(String[] args) throws ClientException, InterruptedException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String endpoints = "192.168.2.150:9080";
        String consumerGroup = "muzhi-cloud-group";
        String topic = "muzhi-cloud-test";
        String tag = "test-message";

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .enableSsl(false)
                .build();

        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .setMessageListener(messageView -> {
                    String body = StandardCharsets.UTF_8.decode(messageView.getBody()).toString();
                    System.out.printf("MQ 客户端收到消息 id: %s; body: %s; %n", messageView.getKeys(), body);
                    return ConsumeResult.SUCCESS;
                })
                .build();

        Thread.sleep(Long.MAX_VALUE);
        pushConsumer.close();
    }
}

连续发送多条消息 中途关闭 broker-master-a 已经发送的部分消息一直重复消费 消费者控制台可以清除的看到 消息被重复消费

image

dashboard

image

后续 broker-master-a 启动,则回复正常

What Did You Expect to See?

配置文件 slaveReadEnable 设置为 false 希望 Master 宕机之后能正常 消费者不要重复一直消费

What Did You See Instead?

部分消息一直重复消费

Additional Context

No response

caigy commented 4 months ago

Could you also print message id in your consumer?

muzhi9018 commented 4 months ago

Could you also print message id in your consumer?

The one on the left is the producer console, which prints the message id; the one on the right is the consumer console, which prints the message key.

Willhow-Gao commented 4 months ago

Try using the latest version to see if it can be reproduced