Closed ijaychang closed 7 years ago
对于问题3
我有个想法,但是貌似不太行,比如继承自MessageExt然后我再加一个属性,比如叫 T msgBody,用来存放解析byte[] body 好的对象实例。但我现在还不知道,怎么把AbstractMQPushConsumer
@jaychang9
今天本来想加个AbstractTransactionMQProducer,后来发现要想发事务消息,还比较麻烦 ` protected LocalTransactionExecuter localTransactionExecuter;
protected TransactionCheckListener transactionCheckListener;`
必须得让TransactionMQProducer知道localTransactionExecuter与transactionCheckListener,才能发事务消息,而且transactionCheckListener必须是在调用transactionMQProducer.start()之前要设置好的,不知道@suclogger有没有考虑过这个问题。 所以很难自动生成transactionMQProducer实例
@jaychang9 目前的rocketmq事务消息的功能还不完整,所以暂不考虑实现。 ROCKETMQ-jira-issue
@suclogger 感谢及时回答 OK
我试了下,事务消息发送是可以的,也就是说能保证,本地事务与发送消息的一致性。
可能“没有实现broker超时回查producer本地事务的执行状态” 这个没有实现吧,因为这个目前也没法测。
@PostConstruct public void init() throws MQClientException, InterruptedException { transactionMQProducer = new TransactionMQProducer("transProducerGroup"); transactionMQProducer.setNamesrvAddr("127.0.0.1:9876"); transactionMQProducer.setInstanceName("transactionMQProducer"); transactionMQProducer.setTransactionCheckListener(new TransactionCheckListener() { @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) { String key = msg.getKeys(); if(Integer.parseInt(key) % 2 == 0) { return LocalTransactionState.COMMIT_MESSAGE; }else{ return LocalTransactionState.ROLLBACK_MESSAGE; } } }); transactionMQProducer.start(); // 待生产者启动 Thread.sleep(2000L); }
`@GetMapping("sendTrans") public String sendTransaction() throws MQClientException, InterruptedException {
Demo demo = new Demo("jaychang","do something...");
Gson gson = new Gson();
Message message = new Message("TP_TRANS_DEMO","A", gson.toJson(demo).getBytes());
message.setKeys(RandomStringUtils.randomAlphanumeric(1));
transactionMQProducer.sendMessageInTransaction(message, new LocalTransactionExecuter() {
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
int random = RandomUtils.nextInt(0,4);
System.out.println("executeLocalTransactionBranch random = "+random);
if( random == 1 || random == 3) {
return LocalTransactionState.COMMIT_MESSAGE;
}else{
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
},demo);`
1.AbstractMQPushConsumer的dealMessage方法,感觉还是保留重试次数信息的日志打印比较好呢
// register default mq producer to spring context registerBean(DefaultMQProducer.class.getName(), producer);
3.还有那个问题就是将MessageExt对象转换成Map<String,Object> extMap,我觉得不是很合理。我个人建议还是直接把MessageExt就给使用者暴露出来吧 4.消息的轨迹这个特性挺不错的哈 5.还有我看到加了延迟发消息的特性,不错。不过我记得这个level是可以修改配置的,后续可以改成从namesrv取到配置好的level值,而不是将DELAY_ARRAY写死