apache / incubator-seata

:fire: Seata is an easy-to-use, high-performance, open source distributed transaction solution.
https://seata.apache.org/
Apache License 2.0
25.08k stars 8.73k forks source link

saga exception: java.lang.NullPointerException: Cannot invoke "io.seata.saga.engine.StateMachineEngine.forward(String, java.util.Map)" #6603

Closed itisreal-dot closed 4 weeks ago

itisreal-dot commented 4 weeks ago

Ⅰ. Issue Description

There's exception when i run saga samples. here's my configs, can you help me to find is there something wrong?

@Configuration
public class StateMachineEngineConfig {
    @Autowired
    private DataSource dataSource;

    @Value("${seata.application-id}")
    private String applicationId;

    @Value("${seata.tx-service-group}")
    private String txServiceGroup;

    @Bean
    public DbStateMachineConfig dbStateMachineConfig() {
        DbStateMachineConfig stateMachineConfig = new DbStateMachineConfig();
        stateMachineConfig.setDataSource(dataSource);
        stateMachineConfig.setResources(new String[]{"statelang/*.json"});
        stateMachineConfig.setEnableAsync(true);
        stateMachineConfig.setThreadPoolExecutor(threadExecutor());

        stateMachineConfig.setApplicationId(applicationId);
        stateMachineConfig.setTxServiceGroup(txServiceGroup);
        return stateMachineConfig;
    }

    @Bean
    public ProcessCtrlStateMachineEngine stateMachineEngine() {
        ProcessCtrlStateMachineEngine processCtrlStateMachineEngine = new ProcessCtrlStateMachineEngine();
        processCtrlStateMachineEngine.setStateMachineConfig(dbStateMachineConfig());
        return processCtrlStateMachineEngine;
    }

    @Bean
    public StateMachineEngineHolder stateMachineEngineHolder() {
        StateMachineEngineHolder engineHolder = new StateMachineEngineHolder();
        engineHolder.setStateMachineEngine(stateMachineEngine());
        return engineHolder;
    }

    @Bean
    public ThreadPoolExecutor threadExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(1);
        //配置最大线程数
        executor.setMaxPoolSize(20);
        //配置队列大小
        executor.setQueueCapacity(99999);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("SAGA_ASYNC_EXE_");

        // 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor.getThreadPoolExecutor();
    }

json config:

{
  "Name": "reduceInventoryAndBalance",
  "Comment": "reduce inventory then reduce balance in a transaction",
  "StartState": "ReduceInventory",
  "Version": "0.0.1",
  "States": {
    "ReduceInventory": {
      "Type": "ServiceTask",
      "ServiceName": "inventoryAction",
      "ServiceMethod": "reduce",
      "CompensateState": "CompensateReduceInventory",
      "Next": "ChoiceState",
      "Input": [
        "$.[businessKey]",
        "$.[count]"
      ],
      "Output": {
        "reduceInventoryResult": "$.#root"
      },
      "Status": {
        "#root == true": "SU",
        "#root == false": "FA",
        "$Exception{java.lang.Throwable}": "UN"
      }
    },
    "ChoiceState": {
      "Type": "Choice",
      "Choices": [
        {
          "Expression": "[reduceInventoryResult] == true",
          "Next": "ReduceBalance"
        }
      ],
      "Default": "Fail"
    },
    "ReduceBalance": {
      "Type": "ServiceTask",
      "ServiceName": "balanceAction",
      "ServiceMethod": "reduce",
      "CompensateState": "CompensateReduceBalance",
      "Input": [
        "$.[businessKey]",
        "$.[amount]",
        {
          "throwException": "$.[mockReduceBalanceFail]"
        }
      ],
      "Output": {
        "compensateReduceBalanceResult": "$.#root"
      },
      "Status": {
        "#root == true": "SU",
        "#root == false": "FA",
        "$Exception{java.lang.Throwable}": "UN"
      },
      "Catch": [
        {
          "Exceptions": [
            "java.lang.Throwable"
          ],
          "Next": "CompensationTrigger"
        }
      ],
      "Next": "Succeed"
    },
    "CompensateReduceInventory": {
      "Type": "ServiceTask",
      "ServiceName": "inventoryAction",
      "ServiceMethod": "compensateReduce",
      "Input": [
        "$.[businessKey]"
      ]
    },
    "CompensateReduceBalance": {
      "Type": "ServiceTask",
      "ServiceName": "balanceAction",
      "ServiceMethod": "compensateReduce",
      "Input": [
        "$.[businessKey]"
      ]
    },
    "CompensationTrigger": {
      "Type": "CompensationTrigger",
      "Next": "Fail"
    },
    "Succeed": {
      "Type": "Succeed"
    },
    "Fail": {
      "Type": "Fail",
      "ErrorCode": "PURCHASE_FAILED",
      "Message": "purchase failed"
    }
  }
}

main snippets:

@SpringBootApplication(scanBasePackages = "org.example.seata.saga", exclude = {SeataAutoConfiguration.class})
public class LocalSagaTransactionStarter {

    public static void main(String[] args) {

        ConfigurableApplicationContext applicationContext = SpringApplication.run(LocalSagaTransactionStarter.class, args);

        StateMachineEngine stateMachineEngine = (StateMachineEngine)applicationContext.getBean("stateMachineEngine");

        transactionCommittedDemo(stateMachineEngine);

        transactionCompensatedDemo(stateMachineEngine);

        new ApplicationKeeper(applicationContext).keep();
    }
    // omit other codes

}

Ⅱ. Describe what happened

If there is an exception, please attach the exception trace:

11:04:14.708 |-INFO  [main] gine.tm.DefaultSagaTransactionalTemplate:257 -| Global Transaction Clients are initialized. 
11:04:14.718 |-INFO  [main] ServiceLoader$InnerEnhancedServiceLoader:524 -| Load compatible class io.seata.saga.engine.pcext.StateRouterInterceptor
11:04:14.730 |-INFO  [main] ServiceLoader$InnerEnhancedServiceLoader:524 -| Load compatible class io.seata.saga.engine.pcext.StateHandlerInterceptor
11:04:14.745 |-INFO  [main] ServiceLoader$InnerEnhancedServiceLoader:524 -| Load compatible class io.seata.saga.statelang.parser.JsonParser
11:04:14.812 |-INFO  [main] ServiceLoader$InnerEnhancedServiceLoader:524 -| Load compatible class io.seata.saga.statelang.validator.Rule
11:04:14.867 |-INFO  [main] ine.repo.impl.StateMachineRepositoryImpl:137 -| StateMachine[reduceInventoryAndBalance] is already exist a same version
11:04:15.266 |-INFO  [rpcDispatch_RMROLE_1_1_16] processor.client.RmBranchCommitProcessor:56  -| rm client handle branch commit process:BranchCommitRequest{xid='10.254.15.30:8091:1333589134566866083', branchId=-1, branchType=SAGA, resourceId='local-saga-sample#default_tx_group', applicationData='CommitRetrying'}
11:04:15.266 |-INFO  [rpcDispatch_RMROLE_1_2_16] processor.client.RmBranchCommitProcessor:56  -| rm client handle branch commit process:BranchCommitRequest{xid='192.168.226.206:8091:1333589055465450145', branchId=-1, branchType=SAGA, resourceId='local-saga-sample#default_tx_group', applicationData='CommitRetrying'}
11:04:15.267 |-INFO  [rpcDispatch_RMROLE_1_3_16] processor.client.RmBranchCommitProcessor:56  -| rm client handle branch commit process:BranchCommitRequest{xid='192.168.226.206:8091:1333589055465447983', branchId=-1, branchType=SAGA, resourceId='local-saga-sample#default_tx_group', applicationData='CommitRetrying'}
11:04:15.268 |-INFO  [rpcDispatch_RMROLE_1_4_16] processor.client.RmBranchCommitProcessor:56  -| rm client handle branch commit process:BranchCommitRequest{xid='192.168.226.206:8091:1333589055465447425', branchId=-1, branchType=SAGA, resourceId='local-saga-sample#default_tx_group', applicationData='CommitRetrying'}
11:04:15.270 |-INFO  [rpcDispatch_RMROLE_1_3_16]    org.apache.seata.rm.AbstractRMHandler:96  -| Branch committing: 192.168.226.206:8091:1333589055465447983 -1 local-saga-sample#default_tx_group CommitRetrying
11:04:15.270 |-INFO  [rpcDispatch_RMROLE_1_2_16]    org.apache.seata.rm.AbstractRMHandler:96  -| Branch committing: 192.168.226.206:8091:1333589055465450145 -1 local-saga-sample#default_tx_group CommitRetrying
11:04:15.270 |-INFO  [rpcDispatch_RMROLE_1_4_16]    org.apache.seata.rm.AbstractRMHandler:96  -| Branch committing: 192.168.226.206:8091:1333589055465447425 -1 local-saga-sample#default_tx_group CommitRetrying
11:04:15.270 |-INFO  [rpcDispatch_RMROLE_1_1_16]    org.apache.seata.rm.AbstractRMHandler:96  -| Branch committing: 10.254.15.30:8091:1333589134566866083 -1 local-saga-sample#default_tx_group CommitRetrying
11:04:15.271 |-ERROR [rpcDispatch_RMROLE_1_4_16]     io.seata.saga.rm.SagaResourceManager:112 -| StateMachine forward failed, xid: 192.168.226.206:8091:1333589055465447425
java.lang.NullPointerException: Cannot invoke "io.seata.saga.engine.StateMachineEngine.forward(String, java.util.Map)" because the return value of "io.seata.saga.rm.StateMachineEngineHolder.getStateMachineEngine()" is null
    at io.seata.saga.rm.SagaResourceManager.branchCommit(SagaResourceManager.java:89) ~[seata-all-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
    at org.apache.seata.rm.AbstractRMHandler.doBranchCommit(AbstractRMHandler.java:98) ~[seata-all-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
    at org.apache.seata.rm.AbstractRMHandler$1.execute(AbstractRMHandler.java:54) ~[seata-all-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
    at org.apache.seata.rm.AbstractRMHandler$1.execute(AbstractRMHandler.java:50) ~[seata-all-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
    at org.apache.seata.core.exception.AbstractExceptionHandler.exceptionHandleTemplate(AbstractExceptionHandler.java:127) ~[seata-all-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
    at org.apache.seata.rm.AbstractRMHandler.handle(AbstractRMHandler.java:50) ~[seata-all-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
    at org.apache.seata.rm.DefaultRMHandler.handle(DefaultRMHandler.java:61) ~[seata-all-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
    at org.apache.seata.core.protocol.transaction.BranchCommitRequest.handle(BranchCommitRequest.java:35) ~[seata-all-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
    at org.apache.seata.rm.AbstractRMHandler.onRequest(AbstractRMHandler.java:150) ~[seata-all-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
    at org.apache.seata.core.rpc.processor.client.RmBranchCommitProcessor.handleBranchCommit(RmBranchCommitProcessor.java:63) ~[seata-all-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
    at org.apache.seata.core.rpc.processor.client.RmBranchCommitProcessor.process(RmBranchCommitProcessor.java:58) ~[seata-all-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
    at org.apache.seata.core.rpc.netty.AbstractNettyRemoting.lambda$processMessage$2(AbstractNettyRemoting.java:280) ~[seata-all-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.106.Final.jar:4.1.106.Final]
    at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
11:04:15.271 |-ERROR [rpcDispatch_RMROLE_1_1_16]     io.seata.saga.rm.SagaResourceManager:112 -| StateMachine forward failed, xid: 10.254.15.30:8091:1333589134566866083

Ⅲ. Describe what you expected to happen

Ⅳ. How to reproduce it (as minimally and precisely as possible)

  1. xxx
  2. xxx
  3. xxx

Minimal yet complete reproducer code (or URL to code):

Ⅴ. Anything else we need to know?

Ⅵ. Environment:

itisreal-dot commented 4 weeks ago

The problem fixed as this is my config error, so close this issue.