apache / incubator-seata

:fire: Seata is an easy-to-use, high-performance, open source distributed transaction solution.
https://seata.apache.org/
Apache License 2.0
25.37k stars 8.79k forks source link

Seata does not support DynamicDataSource #1511

Closed CarolineHuang5954 closed 5 years ago

CarolineHuang5954 commented 5 years ago

Ⅰ. Issue Description

目前分布式事务应用中使用的是动态数据源,通过对动态数据源A和B的操作发现可以正常插入回滚日志到对应数据源的undo_log表中,但是分布式事务回滚和提交时无法正确路由到对应的数据源对回滚日志和业务数据进行操作

Ⅱ. Describe what happened

场景描述: 应用1::事务参与方,通过动态数据源连接数据库,有默认数据源 应用2:事务发起方,无数据源配置,调用应用1的接口,通过传参动态对数据源进行操作 数据源A:业务表person 数据源B:业务表user 数据源C:默认数据源,seata-server配置此数据源,global_table, branch_table, lock_table在这

场景1:应用2通过调用两次应用1的dubbo接口进行A和B的数据库操作

  1. 新增person成功,新增user成功
  2. 数据源A和B分别插入一条undo_log
  3. 提交全局事务时,应用1报错:数据源C的undo_log表找不到(C里没有undo_log表)
  4. 数据源A和B的undo_log都没有删除

场景2:应用2通过调用两次应用1的dubbo接口进行A和B的数据库操作

  1. 新增person成功,新增user失败(主键冲突)
  2. 数据源A插入一条undo log
  3. 回滚全局事务时,应用1报错: 数据源C的undo_log表找不到
  4. lock_table插入一条数据,但是resource_id的数据源为C(我认为应该为A)
  5. 全局事务回滚失败,A的undo log没有删除,person表的数据没有删除,C的lock_table的对应数据也没有删除

以下为报C数据源的undo_log找不到的堆栈信息 If there is an exception, please attach the exception trace:

2019-08-21 15:57:09.469  INFO 8568 --- [atch_RMROLE_8_8] io.seata.rm.AbstractRMHandler            : Branch committing: 172.16.68.197:8091:2020086258 2020086260 jdbc:mysql://172.16.68.118:3306/zhttest null
2019-08-21 15:57:09.469 ERROR 8568 --- [atch_RMROLE_7_8] druid.sql.Statement                      : {conn-10001, pstmt-20433} execute error. SELECT * FROM undo_log WHERE branch_id = ? AND xid = ? FOR UPDATE

com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'zhttest.undo_log' doesn't exist
    at sun.reflect.GeneratedConstructorAccessor46.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
    at com.mysql.jdbc.Util.getInstance(Util.java:386)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1054)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4237)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4169)
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2617)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2778)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2825)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2156)
    at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2323)
    at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_executeQuery(FilterChainImpl.java:3188)
    at com.alibaba.druid.filter.FilterEventAdapter.preparedStatement_executeQuery(FilterEventAdapter.java:465)
    at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_executeQuery(FilterChainImpl.java:3185)
    at com.alibaba.druid.wall.WallFilter.preparedStatement_executeQuery(WallFilter.java:641)
    at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_executeQuery(FilterChainImpl.java:3185)
    at com.alibaba.druid.filter.FilterEventAdapter.preparedStatement_executeQuery(FilterEventAdapter.java:465)
    at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_executeQuery(FilterChainImpl.java:3185)
    at com.alibaba.druid.proxy.jdbc.PreparedStatementProxyImpl.executeQuery(PreparedStatementProxyImpl.java:181)
    at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeQuery(DruidPooledPreparedStatement.java:228)
    at io.seata.rm.datasource.undo.UndoLogManager.undo(UndoLogManager.java:154)
    at io.seata.rm.datasource.DataSourceManager.branchRollback(DataSourceManager.java:185)
    at io.seata.rm.AbstractRMHandler.doBranchRollback(AbstractRMHandler.java:124)
    at io.seata.rm.AbstractRMHandler$2.execute(AbstractRMHandler.java:68)
    at io.seata.rm.AbstractRMHandler$2.execute(AbstractRMHandler.java:64)
    at io.seata.core.exception.AbstractExceptionHandler.exceptionHandleTemplate(AbstractExceptionHandler.java:117)
    at io.seata.rm.AbstractRMHandler.handle(AbstractRMHandler.java:64)
    at io.seata.rm.DefaultRMHandler.handle(DefaultRMHandler.java:63)
    at io.seata.core.protocol.transaction.BranchRollbackRequest.handle(BranchRollbackRequest.java:35)
    at io.seata.rm.AbstractRMHandler.onRequest(AbstractRMHandler.java:149)
    at io.seata.core.rpc.netty.RmMessageListener.handleBranchRollback(RmMessageListener.java:81)
    at io.seata.core.rpc.netty.RmMessageListener.onMessage(RmMessageListener.java:71)
    at io.seata.core.rpc.netty.AbstractRpcRemotingClient.dispatch(AbstractRpcRemotingClient.java:166)
    at io.seata.core.rpc.netty.AbstractRpcRemoting$3.run(AbstractRpcRemoting.java:371)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

Ⅲ. Describe what you expected to happen

  1. undo log能正常插入到对应的数据源中,那么在全局事务提交或回滚的时候就应该找到对应的数据源对回滚日志进行操作(直接删除,或者执行完删除)
  2. 目前应用中的动态数据源实现方式如下,希望能帮助完善对动态数据源的分布式事务支持

应用启动时通过beanFactory手动注册了全部数据源(从数据源服务中查询来,支持动态添加和更改,没有配置)

Map<Object, Object> targetDataSources = new HashMap<>();
        // 将主数据源添加到更多数据源中
        targetDataSources.put("dataSource", defaultDataSource);
        // 添加更多数据源
        targetDataSources.putAll(dynamicDataSources);
        // 创建DynamicDataSource
        GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
        beanDefinition.setBeanClass(DynamicDataSource.class);
        beanDefinition.setSynthetic(true);
        MutablePropertyValues mpv = beanDefinition.getPropertyValues();
        mpv.addPropertyValue("defaultTargetDataSource", defaultDataSource);
        mpv.addPropertyValue("targetDataSources", targetDataSources);
        beanFactory.registerBeanDefinition("dataSource", beanDefinition);
        logger.info("多数据源注册成功");

SeataAutoConfig.java中通过applicationContext获取dataSource

    @Autowired
    private ApplicationContext applicationContext;

    @Bean
    @Primary
    public DataSourceProxy dataSourceProxy(DataSource dataSource ){
        return new DataSourceProxy(dataSource);
    }

通过在DAO层添加切面,改变线程变量dataSourceType动态设置数据源

public class DynamicDataSource extends AbstractRoutingDataSource {

    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();

    private static Map<Object, Object> dynamicDataSources = new HashMap<>();

    public static Map<String, DataSource> dynamicDataSourcesPublic = new HashMap<>();

    @Override
    protected Object determineCurrentLookupKey() {
        String dataSourceType = getDataSourceType();
        if (dataSourceType != null && !(dynamicDataSources.containsKey(dataSourceType) || dynamicDataSourcesPublic.containsKey(dataSourceType))){
            logger.info("appKey" + dataSourceType + "对应的数据源配置未找到,请检查数据源配置");
            throw new RuntimeException("appKey:" + dataSourceType + "对应的数据源配置未找到,请检查数据源配置");
        }
        if(!dynamicDataSources.containsKey(dataSourceType) && dynamicDataSourcesPublic.containsKey(dataSourceType)){
            dynamicDataSources.put(dataSourceType, dynamicDataSourcesPublic.get(dataSourceType));
            super.afterPropertiesSet();
        }
        return dataSourceType;
    }

    public static void setDataSourceType(String dataSourceType) {
        contextHolder.set(dataSourceType);
    }

    public static String getDataSourceType() {
        return contextHolder.get();
    }

    public static void clearDataSourceType() {
        contextHolder.remove();
    }

    @Override
    public void setTargetDataSources(Map<Object, Object> targetDataSources) {
         dynamicDataSources.putAll(targetDataSources);
        super.setTargetDataSources(dynamicDataSources);
        super.afterPropertiesSet();
    }
}

Ⅲ. Seata Version

    <dependency>
        <groupId>io.seata</groupId>
        <artifactId>seata-all</artifactId>
        <version>0.8.0</version>
    </dependency>
fescar-robot commented 5 years ago

Hi @CarolineHuang5954, we detect non-English characters in the issue. This comment is an auto translation from @fescar-robot to help other users to understand this issue. We encourage you to describe your issue in English which is more friendly to other users.

Seata does not support DynamicDataSource

Ⅰ. Issue Description

Currently, distributed data applications use dynamic data sources. Through the operation of dynamic data sources A and B, it is found that the rollback log can be normally inserted into the undo_log table of the corresponding data source, but the distributed transaction cannot be correctly rolled back and submitted. Routing to the corresponding data source to operate on the rollback log and business data

Ⅱ. Describe what happened

Scene description: Application 1:: Transaction Participant, connecting to the database via a dynamic data source, with a default data source Application 2: The transaction initiator, no data source configuration, invokes the interface of application 1, and dynamically operates the data source by passing parameters. Data source A: business table person Data Source B: Business Table user Data source C: default data source, seata-server configures this data source, global_table, branch_table, lock_table in this

Scenario 1: Application 2 performs database operations of A and B by calling the dubbo interface of application 1 twice.

  1. The new person succeeds and the new user succeeds.
  2. Data sources A and B are inserted into an undo_log respectively.
  3. When submitting a global transaction, Application 1 reports an error: the undo_log table of data source C cannot be found (there is no undo_log table in C)
  4. The undo_log of data source A and B are not deleted.

Scenario 2: Application 2 performs database operations of A and B by calling the dubbo interface of application 1 twice.

  1. Adding a new user succeeds, adding a new user fails (primary key conflict)
  2. Data source A inserts an undo log
  3. When rolling back the global transaction, the application 1 reports an error: the undo_log table of the data source C cannot be found.
  4. lock_table inserts a piece of data, but the data source of resource_id is C (I think it should be A)
  5. Global transaction rollback failed, A's undo log is not deleted.

The following is the stack information that can not be found by the undo_log of the C data source. If there is an exception, please attach the exception trace:

2019-08-21 15:57:09.469  INFO 8568 --- [atch_RMROLE_8_8] io.seata.rm.AbstractRMHandler            : Branch committing: 172.16.68.197:8091:2020086258 2020086260 jdbc:mysql://172.16.68.118:3306/zhttest null
2019-08-21 15:57:09.469 ERROR 8568 --- [atch_RMROLE_7_8] druid.sql.Statement                      : {conn-10001, pstmt-20433} execute error. SELECT * FROM undo_log WHERE branch_id = ? AND xid = ? FOR UPDATE

com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'zhttest.undo_log' doesn't exist
    at sun.reflect.GeneratedConstructorAccessor46.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
    at com.mysql.jdbc.Util.getInstance(Util.java:386)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1054)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4237)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4169)
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2617)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2778)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2825)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2156)
    at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2323)
    at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_executeQuery(FilterChainImpl.java:3188)
    at com.alibaba.druid.filter.FilterEventAdapter.preparedStatement_executeQuery(FilterEventAdapter.java:465)
    at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_executeQuery(FilterChainImpl.java:3185)
    at com.alibaba.druid.wall.WallFilter.preparedStatement_executeQuery(WallFilter.java:641)
    at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_executeQuery(FilterChainImpl.java:3185)
    at com.alibaba.druid.filter.FilterEventAdapter.preparedStatement_executeQuery(FilterEventAdapter.java:465)
    at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_executeQuery(FilterChainImpl.java:3185)
    at com.alibaba.druid.proxy.jdbc.PreparedStatementProxyImpl.executeQuery(PreparedStatementProxyImpl.java:181)
    at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeQuery(DruidPooledPreparedStatement.java:228)
    at io.seata.rm.datasource.undo.UndoLogManager.undo(UndoLogManager.java:154)
    at io.seata.rm.datasource.DataSourceManager.branchRollback(DataSourceManager.java:185)
    at io.seata.rm.AbstractRMHandler.doBranchRollback(AbstractRMHandler.java:124)
    at io.seata.rm.AbstractRMHandler$2.execute(AbstractRMHandler.java:68)
    at io.seata.rm.AbstractRMHandler$2.execute(AbstractRMHandler.java:64)
    at io.seata.core.exception.AbstractExceptionHandler.exceptionHandleTemplate(AbstractExceptionHandler.java:117)
    at io.seata.rm.AbstractRMHandler.handle(AbstractRMHandler.java:64)
    at io.seata.rm.DefaultRMHandler.handle(DefaultRMHandler.java:63)
    at io.seata.core.protocol.transaction.BranchRollbackRequest.handle(BranchRollbackRequest.java:35)
    at io.seata.rm.AbstractRMHandler.onRequest(AbstractRMHandler.java:149)
    at io.seata.core.rpc.netty.RmMessageListener.handleBranchRollback(RmMessageListener.java:81)
    at io.seata.core.rpc.netty.RmMessageListener.onMessage(RmMessageListener.java:71)
    at io.seata.core.rpc.netty.AbstractRpcRemotingClient.dispatch(AbstractRpcRemotingClient.java:166)
    at io.seata.core.rpc.netty.AbstractRpcRemoting$3.run(AbstractRpcRemoting.java:371)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

Ⅲ. Describe what you expected to happen

  1. The undo log can be inserted into the corresponding data source normally. Then, when the global transaction commits or rolls back, the corresponding data source should be found to operate on the rollback log (directly delete, or delete).
  2. The current dynamic data source implementation in the application is as follows, hoping to help improve distributed transaction support for dynamic data sources.

All data sources are manually registered by the beanFactory at application startup (query from the data source service, support for dynamic additions and changes, no configuration)

Map<Object, Object> targetDataSources = new HashMap<>();
// Add the primary data source to more data sources
        targetDataSources.put("dataSource", defaultDataSource);
// Add more data sources
        targetDataSources.putAll(dynamicDataSources);
// Create a DynamicDataSource
        GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
        beanDefinition.setBeanClass(DynamicDataSource.class);
        beanDefinition.setSynthetic(true);
        MutablePropertyValues mpv = beanDefinition.getPropertyValues();
        mpv.addPropertyValue("defaultTargetDataSource", defaultDataSource);
        mpv.addPropertyValue("targetDataSources", targetDataSources);
        beanFactory.registerBeanDefinition("dataSource", beanDefinition);
Logger.info("Multiple data source registration success");

Get the dataSource through the applicationContext in SeataAutoConfig.java

    @Autowired
    private ApplicationContext applicationContext;

    @Bean
    @Primary
    public DataSourceProxy dataSourceProxy(DataSource dataSource ){
        return new DataSourceProxy(dataSource);
    }

Dynamically set the data source by adding a slice in the DAO layer and changing the thread variable dataSourceType

public class DynamicDataSource extends AbstractRoutingDataSource {

    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();

    private static Map<Object, Object> dynamicDataSources = new HashMap<>();

    public static Map<String, DataSource> dynamicDataSourcesPublic = new HashMap<>();

    @Override
    protected Object determineCurrentLookupKey() {
        String dataSourceType = getDataSourceType();
        if (dataSourceType != null && !(dynamicDataSources.containsKey(dataSourceType) || dynamicDataSourcesPublic.containsKey(dataSourceType))){
Log.info("appKey" + dataSourceType + "The corresponding data source configuration was not found, please check the data source configuration");
Throw new RuntimeException("appKey:" + dataSourceType + "The corresponding data source configuration was not found, please check the data source configuration");
        }
        if(!dynamicDataSources.containsKey(dataSourceType) && dynamicDataSourcesPublic.containsKey(dataSourceType)){
            dynamicDataSources.put(dataSourceType, dynamicDataSourcesPublic.get(dataSourceType));
            super.afterPropertiesSet();
        }
        return dataSourceType;
    }

    public static void setDataSourceType(String dataSourceType) {
        contextHolder.set(dataSourceType);
    }

    public static String getDataSourceType() {
        return contextHolder.get();
    }

    public static void clearDataSourceType() {
        contextHolder.remove();
    }

    @Override
    public void setTargetDataSources(Map<Object, Object> targetDataSources) {
         dynamicDataSources.putAll(targetDataSources);
        super.setTargetDataSources(dynamicDataSources);
        super.afterPropertiesSet();
    }
}
helloworlde commented 5 years ago

I think your datasource configuration looks not very correctly, every datasource should have the corresponding proxy datasource which join distribution transaction. In your description, only the primary datasource has proxy.

You can reference the PR Add multiple datasource module for SpringBoot which integration Seata with multiple datasource, and helloworlde/SpringBoot-DynamicDataSource for more detail of dynamic datasource with SpringBoot

archerForzmy commented 5 years ago

It's obvious that your dynamic data source routing rules need to be changed.

You should implement your own dynamic data sources according to seata's specifications.

Justice-love commented 5 years ago

动态数据源属于多数据源的一种, 看你的表述不像是会做数据源切换 seata是支持多数据源的, 首先你需要所有数据源都使用DataSourceProxy进行代理 seata的回滚是倒序回滚每个分支事务, 而一个分支事务表示一次本地事务. 回滚时会根据注册分支事务时的resouceID来查找对应的数据源, DataSourceProxy dataSourceProxy = get(resourceId); 见下面代码

    @Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
        DataSourceProxy dataSourceProxy = get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException();
        }
        try {
            if(JdbcConstants.ORACLE.equalsIgnoreCase(dataSourceProxy.getDbType())) {
                UndoLogManagerOracle.undo(dataSourceProxy, xid, branchId);
            }
            else if(JdbcConstants.MYSQL.equalsIgnoreCase(dataSourceProxy.getDbType())){
                UndoLogManager.undo(dataSourceProxy, xid, branchId);
            } else {
                throw new NotSupportYetException("DbType[" + dataSourceProxy.getDbType() + "] is not support yet!");
            }
        } catch (TransactionException te) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("branchRollback failed reason [{}]", te.getMessage());
            }
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
        }
        return BranchStatus.PhaseTwo_Rollbacked;

    }

这样, 能够保证多数据源回滚时找到正确的undolog 所以从你的错误日志来看, 报找不到undolog 表, 你可以首先确定下你的resourceId是否不同, 再看看是否都用了proxy进行代理, 再看看undolog

Justice-love commented 5 years ago

your error log looks strange, if undo log does not exist, will rise error on build undo image and insert into undo log, not only on roll back stage

CarolineHuang5954 commented 5 years ago

动态数据源属于多数据源的一种, 看你的表述不像是会做数据源切换 seata是支持多数据源的, 首先你需要所有数据源都使用DataSourceProxy进行代理 seata的回滚是倒序回滚每个分支事务, 而一个分支事务表示一次本地事务. 回滚时会根据注册分支事务时的resouceID来查找对应的数据源, DataSourceProxy dataSourceProxy = get(resourceId); 见下面代码

    @Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
        DataSourceProxy dataSourceProxy = get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException();
        }
        try {
            if(JdbcConstants.ORACLE.equalsIgnoreCase(dataSourceProxy.getDbType())) {
                UndoLogManagerOracle.undo(dataSourceProxy, xid, branchId);
            }
            else if(JdbcConstants.MYSQL.equalsIgnoreCase(dataSourceProxy.getDbType())){
                UndoLogManager.undo(dataSourceProxy, xid, branchId);
            } else {
                throw new NotSupportYetException("DbType[" + dataSourceProxy.getDbType() + "] is not support yet!");
            }
        } catch (TransactionException te) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("branchRollback failed reason [{}]", te.getMessage());
            }
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
        }
        return BranchStatus.PhaseTwo_Rollbacked;

    }

这样, 能够保证多数据源回滚时找到正确的undolog 所以从你的错误日志来看, 报找不到undolog 表, 你可以首先确定下你的resourceId是否不同, 再看看是否都用了proxy进行代理, 再看看undolog

  1. 动态数据源切换这里不用怀疑,不加seata情况下,我使用的jdbcTemplate,是可以切换的,而且我描述的文字上可以看出,业务数据可以正常插入,undo log也可以正常插入。我使用的dao层切面进行的数据源切换,只是没有贴上来切面的代码而已

  2. 对于您贴的代码我也已经看过,我是这样理解的。目前业务数据的插入,undo日志的插入,以及缩表数据的插入都能路由到正确的数据源,但是回滚的时候并没有找到正确的数据源,这是因为插入和回滚的数据源路由策略是不同的,插入的时候是通过ConnectionProxy代理里面的flushUndoLogs 插进去的undo log,正确; 回滚的时候,是seata-server发来的message,这个message我没找着从哪来的,但是根本没用数据源代理,因为这里面的branchRollback 确实是拿了一个dataSourceProxy,但是是从dataSourceCache这里面拿的 图片 而这个dataSourceCache是个map,但只put了一个resourceId 图片 所以导致的这个map永远只有一个。 不知道我理解的有没有问题

CarolineHuang5954 commented 5 years ago

your error log looks strange, if undo log does not exist, will rise error on build undo image and insert into undo log, not only on roll back stage、

不是undo log数据找不到,而是这个表找不到,插入undo log是没有error的,数据源对,就能插入,数据源不对,找不到表,就会报错,我想我上面的场景描述已经很清楚了

CarolineHuang5954 commented 5 years ago

I think your datasource configuration looks not very correctly, every datasource should have the corresponding proxy datasource which join distribution transaction. In your description, only the primary datasource has proxy.

You can reference the PR Add multiple datasource module for SpringBoot which integration Seata with multiple datasource, and helloworlde/SpringBoot-DynamicDataSource for more detail of dynamic datasource with SpringBoot

非常感谢您,您提供的代码helloworlde/SpringBoot-DynamicDataSource我已经拜读过了,跟我写的有差异,但是实现原理是相同的,您是用@Bean做的spring注入,我是使用的BeanFactory.registerBeanDefinition(). 我上面写了自己的理解,希望您能抽空看一下,有什么不对的地方请指正

Justice-love commented 5 years ago

just guess, you use DataSourceProxy as proxy of DynamicDataSource, and DynamicDataSource refer multiple physical datasource. if it is right, i think you should change this structure, DataSourceProxy should as proxy of physical datasource

Justice-love commented 5 years ago

your error log looks strange, if undo log does not exist, will rise error on build undo image and insert into undo log, not only on roll back stage、

不是undo log数据找不到,而是这个表找不到,插入undo log是没有error的,数据源对,就能插入,数据源不对,找不到表,就会报错,我想我上面的场景描述已经很清楚了

i mean, on build undo image stage, should rise table not exist error, not until rollback stage

CarolineHuang5954 commented 5 years ago

just guess, you use DataSourceProxy as proxy of DynamicDataSource, and DynamicDataSource refer multiple physical datasource. if it is right, i think you should change this structure, DataSourceProxy should as proxy of physical datasource

You're right, but I can' change this structure because of force majeure; we have to use dynamic datasource proxy to manage transactions. Would you please help to extend this feature to support our complex senario? :)

CarolineHuang5954 commented 5 years ago

your error log looks strange, if undo log does not exist, will rise error on build undo image and insert into undo log, not only on roll back stage、

不是undo log数据找不到,而是这个表找不到,插入undo log是没有error的,数据源对,就能插入,数据源不对,找不到表,就会报错,我想我上面的场景描述已经很清楚了

i mean, on build undo image stage, should rise table not exist error, not until rollback stage

That's why I raise this issue, the datasource was correct when build the undo image, so no table not exists error, then routing the wrong datasource when roll back, so the error reported.

CarolineHuang5954 commented 5 years ago

动态数据源属于多数据源的一种, 看你的表述不像是会做数据源切换 seata是支持多数据源的, 首先你需要所有数据源都使用DataSourceProxy进行代理 seata的回滚是倒序回滚每个分支事务, 而一个分支事务表示一次本地事务. 回滚时会根据注册分支事务时的resouceID来查找对应的数据源, DataSourceProxy dataSourceProxy = get(resourceId); 见下面代码

这样, 能够保证多数据源回滚时找到正确的undolog 所以从你的错误日志来看, 报找不到undolog 表, 你可以首先确定下你的resourceId是否不同, 再看看是否都用了proxy进行代理, 再看看undolog

请问一下,“回滚时会根据注册分支事务时的resouceID来查找对应的数据源” 这里的这个resoureID是从哪里来的?是seata-server传过来的吗?还是从global_table, branch_table, lock_table表中取的呢?如果是从branch_table表中所取,那么是在什么时候插入的呢?是客户端应用插入的还是seata-server插入的?这里我没有debug到,而且global_table, branch_table这两个表中一直没看到有数据, 还望解答

Justice-love commented 5 years ago

just guess, you use DataSourceProxy as proxy of DynamicDataSource, and DynamicDataSource refer multiple physical datasource. if it is right, i think you should change this structure, DataSourceProxy should as proxy of physical datasource

You're right, but I can' change this structure because of force majeure; we have to use dynamic datasource proxy to manage transactions. Would you please help to extend this feature to support our complex senario? :)

i think its hard to implement, because seata exec rollback depend on datasource which proxied by DataSourceProxy. this is seata structure. so, i think this is not seata error, it is the DynamicDataSource error which has wrong router rule for undo SQL. also i think is hard to build the router for rollback sql.

you can ask @slievrly for more suggestion.

Justice-love commented 5 years ago

动态数据源属于多数据源的一种, 看你的表述不像是会做数据源切换 seata是支持多数据源的, 首先你需要所有数据源都使用DataSourceProxy进行代理 seata的回滚是倒序回滚每个分支事务, 而一个分支事务表示一次本地事务. 回滚时会根据注册分支事务时的resouceID来查找对应的数据源, DataSourceProxy dataSourceProxy = get(resourceId); 见下面代码 这样, 能够保证多数据源回滚时找到正确的undolog 所以从你的错误日志来看, 报找不到undolog 表, 你可以首先确定下你的resourceId是否不同, 再看看是否都用了proxy进行代理, 再看看undolog

请问一下,“回滚时会根据注册分支事务时的resouceID来查找对应的数据源” 这里的这个resoureID是从哪里来的?是seata-server传过来的吗?还是从global_table, branch_table, lock_table表中取的呢?如果是从branch_table表中所取,那么是在什么时候插入的呢?是客户端应用插入的还是seata-server插入的?这里我没有debug到,而且global_table, branch_table这两个表中一直没看到有数据, 还望解答

io.seata.rm.datasource.DataSourceProxy#init

ggndnn commented 5 years ago

@CarolineHuang5954 Please check DataSourceProxy.init, resourceId is actually jdbcUrl from DataSource connection. I think you can use DynamicDataSource now, just make sure every DataSource in it wrapped by DataSourceProxy.

CarolineHuang5954 commented 5 years ago

@CarolineHuang5954 Please check DataSourceProxy.init, resourceId is actually jdbcUrl from DataSource connection. I think you can use DynamicDataSource now, just make sure every DataSource in it wrapped by DataSourceProxy.

我知道resourceId is actually jdbcUrl。。。我是想问 图片 这个resourceId是作为参数传进来的,用来获取dataSourceProxy代理的。。这个参数我看了代码应该是在发起事务回滚的时候传进来的,我是不明白这里这个参数是从哪传来的,应该不是init方法吧

huangjian888 commented 5 years ago

@CarolineHuang5954 Please check DataSourceProxy.init, resourceId is actually jdbcUrl from DataSource connection. I think you can use DynamicDataSource now, just make sure every DataSource in it wrapped by DataSourceProxy.

我知道resourceId is actually jdbcUrl。。。我是想问 图片 这个resourceId是作为参数传进来的,用来获取dataSourceProxy代理的。。这个参数我看了代码应该是在发起事务回滚的时候传进来的,我是不明白这里这个参数是从哪传来的,应该不是init方法吧

new DataSourceProxy(each.getDataSource()) ,实例化时候传进去的,你的数据源多半是不对的,检查

ggndnn commented 5 years ago

@CarolineHuang5954 sent to server when register branch, sent back to rm when rollback branch, please have a look at DefaultCore.

helloworlde commented 5 years ago

This issue come from datasource route strategy when rollback, so please pay attention on the route. Seata not support custom strategy to switch datasource now, so the easiest way to resolve this is to proxy all your datasource and then Seata can rollback normally.

There has no difference when create bean by using @Bean or beanFactory.registerBeanDefinition , you need add proxy for the datasource bean in any way.

CarolineHuang5954 commented 5 years ago

This issue come from datasource route strategy when rollback, so please pay attention on the route. Seata not support custom strategy to switch datasource now, so the easiest way to resolve this is to proxy all your datasource and then Seata can rollback normally.

There has no difference when create bean by using @Bean or beanFactory.registerBeanDefinition , you need add proxy for the datasource bean in any way.

Thanks for your answer, I think you are right, the rollback request was sent from seata-server. the resourceId should be sent correctly from seata-server. But now I have no idea how does the seata-server determine which resourceId should be sent. Could u please help to find out? In addition, as your words above, if seata doesn't support custom strategy to switch datasource now, would u please kindly provide the sample that I can configure the proxy of the datasource correctly? I really appreciate.

Justice-love commented 5 years ago

This issue come from datasource route strategy when rollback, so please pay attention on the route. Seata not support custom strategy to switch datasource now, so the easiest way to resolve this is to proxy all your datasource and then Seata can rollback normally. There has no difference when create bean by using @Bean or beanFactory.registerBeanDefinition , you need add proxy for the datasource bean in any way.

Thanks for your answer, I think you are right, the rollback request was sent from seata-server. the resourceId should be sent correctly from seata-server. But now I have no idea how does the seata-server determine which resourceId should be sent. Could u please help to find out? In addition, as your words above, if seata doesn't support custom strategy to switch datasource now, would u please kindly provide the sample that I can configure the proxy of the datasource correctly? I really appreciate.

io.seata.rm.datasource.ConnectionProxy#register()

helloworlde commented 5 years ago

This issue come from datasource route strategy when rollback, so please pay attention on the route. Seata not support custom strategy to switch datasource now, so the easiest way to resolve this is to proxy all your datasource and then Seata can rollback normally. There has no difference when create bean by using @Bean or beanFactory.registerBeanDefinition , you need add proxy for the datasource bean in any way.

Thanks for your answer, I think you are right, the rollback request was sent from seata-server. the resourceId should be sent correctly from seata-server. But now I have no idea how does the seata-server determine which resourceId should be sent. Could u please help to find out? In addition, as your words above, if seata doesn't support custom strategy to switch datasource now, would u please kindly provide the sample that I can configure the proxy of the datasource correctly? I really appreciate.

Just proxy every datasource, and then Seata could route correctly when rollback. You can reference the configuration file DataSourceProxyConfig.java for more detail.

CarolineHuang5954 commented 5 years ago

This issue come from datasource route strategy when rollback, so please pay attention on the route. Seata not support custom strategy to switch datasource now, so the easiest way to resolve this is to proxy all your datasource and then Seata can rollback normally. There has no difference when create bean by using @Bean or beanFactory.registerBeanDefinition , you need add proxy for the datasource bean in any way.

Thanks for your answer, I think you are right, the rollback request was sent from seata-server. the resourceId should be sent correctly from seata-server. But now I have no idea how does the seata-server determine which resourceId should be sent. Could u please help to find out? In addition, as your words above, if seata doesn't support custom strategy to switch datasource now, would u please kindly provide the sample that I can configure the proxy of the datasource correctly? I really appreciate.

Just proxy every datasource, and then Seata could route correctly when rollback. You can reference the configuration file DataSourceProxyConfig.java for more detail.

Really appreciate. Finally I resolved this problem refer to you configuration class. I missed to proxy all the datasource, I add some code ande it works. So close this issue. Thanks.