apache / rocketmq-spring

Apache RocketMQ Spring Integration
https://rocketmq.apache.org/
Apache License 2.0
2.12k stars 899 forks source link

the message was consumed twice because of the same instanceName #99

Closed xiaotanxingchen closed 5 years ago

xiaotanxingchen commented 5 years ago

BUG REPORT Recently I encountered a problem, the message was consumed twice. I deployed three applications which are same, two of them on the same machine. When I send a message,it was consumed twice by the application on the same machine. But the other is ok.

<dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.3</version>
</dependency>

`@Service @Transactional public class TestMqBusinessImpl implements ITestMqBusiness {

private static Logger logger = LoggerFactory.getLogger(TestMqBusinessImpl.class);

@Autowired
IBaseService bs;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/** 
 * {@inheritDoc}   
 * @see org.aurora.mq.business.ITestMqBusiness#get() 
 */
@Override
public void get() {
    // TODO Auto-generated method stub
    Message msg1 = MessageBuilder.withPayload("测试".getBytes()).build();
    Student st = new Student();
    st.setName("张三");
    TransactionSendResult ret = rocketMQTemplate.sendMessageInTransaction("my-group1", "test-topic:tag1", msg1, st);

    System.out.println(ret.getSendStatus());

}

private void test(Object a){
    Student st = (Student)a;
    System.out.println(st.getName());

}

// Define transaction listener with the annotation @RocketMQTransactionListener
@RocketMQTransactionListener(txProducerGroup ="my-group1")
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
      @Override
      public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
          try {
              test(arg);
        } catch (Exception e) {
            logger.error("消息发送业务逻辑异常,回滚消息", e);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
      }

      @Override
      public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
          logger.info("checkLocalTransaction");
        return RocketMQLocalTransactionState.COMMIT;
      }
}

}`

`@Service @RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumerGroup1", messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.ORDERLY) public class MyConsumer1 implements RocketMQListener{

/** 
  * {@inheritDoc}   
  * @see org.apache.rocketmq.spring.core.RocketMQListener#onMessage(java.lang.Object) 
  */
@Override
public void onMessage(MessageExt message) {
    // TODO Auto-generated method stub
    System.out.println("consumer1:" + message.getMsgId());
}

}`

1 2

xiaotanxingchen commented 5 years ago

The reason is because consumers have the same instanceName.

RongtongJin commented 5 years ago

@xiaotanxingchen Currently, you can temporarily solve this issue by implementing RocketMQPushConsumerLifecycleListener and setting a unique instanceName in prepareStart. Contributors are welcome to solve this issue.

ShannonDing commented 5 years ago

link #107