QNJR-GROUP / EasyTransaction

A distribute transaction solution(分布式事务) unified the usage of TCC , SAGA ,FMT (seata/fescar AutoCompensation), reliable message, compensate and so on;
Apache License 2.0
2.36k stars 809 forks source link

EasyTransMsgInitializer 消息消费,如果业务代码抛异常,框架代码空指针问题 #139

Closed andres818 closed 4 years ago

andres818 commented 4 years ago

Describe the bug EasyTransMsgInitializer类中, 消息消费,如果业务代码抛异常(如下代码),在catch中捕获后,return easyTransResult 中的setValue方法未设值则为空,

private void wrapToFilter(final BusinessProvider<?> handler) {

    Class<? extends EasyTransRequest<?, ?>> requestClass = ReflectUtil.getRequestClass(handler);

    EasyTransFilter easyTransFilter = new EasyTransFilter() {
        @Override
        public EasyTransResult invoke(EasyTransFilterChain filterChain, Map<String, Object> header, EasyTransRequest<?, ?> request) {
            EasyTransResult easyTransResult = new EasyTransResult();

            try {
                easyTransResult.setValue(((MessageBusinessProvider<?>) handler).consume(request));
                if (easyTransResult.getValue() == EasyTransConsumeAction.CommitMessage) {
                    logger.info("EasyTrans message consume Success:" + request);
                } else {
                    logger.warn("EasyTrans message consume later:" + request);
                }
            } catch (Throwable e) {
                easyTransResult.setException(e);
                logger.error("EasyTrans message consume Exception Occour:" + request, e);
            }
            return easyTransResult;
        }
    };
    mapHandler.put(requestClass, easyTransFilter);
}

在这个代码中, @Override public EasyTransConsumeAction consume(Map<String,Object> header, EasyTransRequest<?, ?> message) {

    EasyTransFilter easyTransFilter = getFilter(message);
    BusinessIdentifer businessIdentifer = ReflectUtil.getBusinessIdentifer(message.getClass());
    //记不起当时改成applicationName并加上注解的原因了,改成直接从businessIdentifer取。
    //已知的一个与之前的区别是,多个服务共享一个数据库的话,不支持多个服务都消费同一个消息,若出现该情况,只有一个服务会成功消费。因为幂等表里暂时并没有按消费者区分幂等记录,后续可调整完善。

// EasyTransFilterChain filterChain = filterChainFactory.getDefaultFilterChain(applicationName/should use consumer's appId/, businessIdentifer.busCode(), EasyTransFilterChain.MESSAGE_BUSINESS_FLAG); EasyTransFilterChain filterChain = filterChainFactory.getDefaultFilterChain(businessIdentifer.appId(), businessIdentifer.busCode(), EasyTransFilterChain.MESSAGE_BUSINESS_FLAG); filterChain.addFilter(consumeStatusCheckFilter); filterChain.addFilter(easyTransFilter); EasyTransResult result = filterChain.invokeFilterChain(header,message);

    EasyTransConsumeAction consumeResult = (EasyTransConsumeAction) result.getValue();
    if(consumeResult == null){
        result.setValue(EasyTransConsumeAction.ReconsumeLater);
    }

    if(result.getException() != null && result.getException().getClass() != NeedToReconsumeLaterException.class){
        logger.error("Consume Error!",result.getException());
    }

    return consumeResult;
}

设值result.setValue(EasyTransConsumeAction.ReconsumeLater); 最终返回值consumeResult还是为空 EasyTransConsumeAction consumeResult = (EasyTransConsumeAction) result.getValue(); if(consumeResult == null){ result.setValue(EasyTransConsumeAction.ReconsumeLater); }

则在OnsEasyTransMsgConsumerImpl类中,Action.valueOf(consume.name());空指针异常consume为null @Override public void subscribe(String topic, Collection tag, final EasyTransMsgListener listener) {

    consumer.subscribe(topic, getAliTagsString(tag), new MessageListener() {
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @Override
        public Action consume(Message message, ConsumeContext context) {
            Map userProperties = message.getUserProperties();
            int headerLen = Integer.parseInt(userProperties.get(OnsEasyTransMsgPublisherImpl.HEADER_LEN).toString());

// int messageLen = Integer.parseInt(userProperties.get(OnsEasyTransMsgPublisherImpl.MESSAGE_LEN).toString()); byte[] combined = message.getBody(); byte[] headerBytes = Arrays.copyOfRange(combined, 0, headerLen); byte[] messageBytes = Arrays.copyOfRange(combined, headerLen, combined.length);

            EasyTransConsumeAction consume = listener.consume((Map) serializer.deserialize(headerBytes),(EasyTransRequest<?, ?>) serializer.deserialize(messageBytes));
            return Action.valueOf(consume.name());
        }});
}
skyesx commented 4 years ago

嗯,问题存在,但不影响正确性,抛出异常后依然会进行重试。能否帮忙PR修正这个问题?