QNJR-GROUP / EasyTransaction

A distribute transaction solution(分布式事务) unified the usage of TCC , SAGA ,FMT (seata/fescar AutoCompensation), reliable message, compensate and so on;
Apache License 2.0
2.36k stars 809 forks source link
base distributed-transaction saga tcc transaction

English Readme

中文

零、SEO

柔性事务,分布式事务,TCC,SAGA,可靠消息,最大努力交付消息,事务消息,补偿,全局事务,soft transaction, distribute transaction, compensation,自动补偿

本框架可一站式解决分布式SOA(包括微服务等)的事务问题。

一、由来 及 特性

这个框架是结合公司之前遇到的分布式事务场景以及 支付宝程立分享的一个PPT<大规模SOA系统的分布式事务处理>而设计实现。

本框架意在解决之前公司对于每个分布式事务场景中都自行重复设计 中间状态、幂等实现及重试逻辑 的状况。

采纳本框架后能解决现有已发现的所有分布式事务场景,减少设计开发设计工作量,提高开发效率,并统一保证事务实现的可靠性。

特性:

二、分布式事务场景及框架对应实现

分布式事务场景

关于如何选择分布式事务形态,更详细的可参考,请点赞以帮助SEO:https://www.cnblogs.com/skyesx/p/9697817.html

框架对应实现及基本原理

框架实现了上述所有事务场景的解决方案,并提供了统一易用的接口。以下介绍基本实现原理

无需分布式事务

对于此类事务,框架完全不介入,不执行一行额外代码

其他事务场景

框架的核心依赖是Spring的TransactionSynchronization,只要使用的事务管理器继承自AbstractPlatformTransactionManager都能使用本框架(基本上事务管理器都继承自本实现),在此之外,1.0.0版本之后,框架使用了SPRING BOOT的配置功能及JDK8的特性,因此SPRING BOOT及JDK8也是必选项

对于分布式事务,框架会在调用远程事务方法前,将对应的框架操作挂载到TransactionSynchronization中,如:

框架有后台线程负责CRASH恢复,其根据“在执行分布式服务调用前写入的WriteAheadLog获得可能已经调用的业务”以及“跟随业务一起提交的一条框架记录以确认的业务最终提交状态”来进行最终的CRASH具体操作(如TCC的Confirm或者Rollback)

框架对于幂等也有完整的实现(可选),框架能保证业务方法逻辑上只执行一遍(有可能执行多遍,但多次执行的方法会被回滚掉,因此,涉及不可回滚的外部资源时,业务程序需自行把控其幂等性)

框架对于方法间有调用关系依赖的也进行妥善的处理,例如基于传统补偿完成的最终一致性事务中

所有远程调用的结果都是以Future形式返回,这给框架的性能优化提供了空间,在第一次获取结果前,所有的日志都不会被写入所有远程方法都不会被调用。一旦业务程序尝试获取执行结果时,才会批量写入日志及后续并发调用远程方法。

如果业务程序没有尝试获取执行结果,框架COMMIT前会统一尝试GET一遍,对于所有远程方法一旦抛出了Exception,框架都会在最后commit前将业务回滚,而无论之前是否catch住了,这样能保证编程模型的简洁,方便写出正确易理解的代码。

三、使用简介

业务代码

组件已上传到中央仓库,对于业务代码可以直接加入以下内容到pom中

  <dependency>
    <groupId>com.yiqiniu.easytrans</groupId>
    <artifactId>easytrans-starter</artifactId>
    <version>1.4.3</version>
  </dependency>

Starter里包含了若干默认的组件实现:基于mysql的分布式事务日志存储,基于ribbon-rest的RPC实现,基于KAFKA的消息队列,若不需要或者要替换,可以EXCLUDE掉

业务发起者

对于业务发起者,框架只暴露一个接口

public interface EasyTransFacade {
/**
 * start easy Transaction
 * @param busCode appId+busCode+trxId is a global unique id,no matter this transaction commit or roll back
 * @param trxId see busCode
 */
void startEasyTrans(String busCode, String trxId);

/**
 * execute remote transaction
 * @param params
 * @return
 */
<P extends EasyTransRequest<R, E>, E extends EasyTransExecutor, R extends Serializable> Future<R> execute(P params);
}

使用方法如下,使用方直接调用远程方法即可,无需考虑具体的分布式事务类型及后续处理:

@Transactional
public void buySomething(int userId,long money){
/**
    * 本地业务方法,下订单,并获得订单号
    */
    JdbcTemplate jdbcTemplate = util.getJdbcTemplate();
    Integer id = saveOrderRecord(jdbcTemplate,userId,money);

    /**
     * 声明全局事务ID,其由appId,业务代码,业务代码内ID构成
     * 这个方法也可以不调用,框架用默认的业务代码及自动生成的ID号,但这样做的缺点是全局事务ID无法直接关联到具体业务
     * 不过这个可以通过自定义BusinessCodeGenerator及TrxIdGenerator来产生关联
     */
    transaction.startEasyTrans(BUSINESS_CODE, id);

    /**
     * 调用远程服务扣除所需的钱,这个远程服务实现了TCC接口,
     * 框架会根据buySomething方法的事务结果来维护远程服务的最终一致性
     */
    WalletPayTccMethodRequest deductRequest = new WalletPayTccMethodRequest();
    deductRequest.setUserId(userId);
    deductRequest.setPayAmount(money/2);
    // 业务上可多次调用同一方法,最后生效的也是实际调用的次数
    Future<WalletPayTccMethodResult> deductFuture = transaction.execute(deductRequest);
    Future<WalletPayTccMethodResult> deductFuture = transaction.execute(deductRequest);

    /**
     * 调用远程服务进行账务登记,账务服务是一个可补偿服务
     * 当buySomething方法对应的事务回滚了,框架将会自动调用补偿对应的业务方法
     * 
     */
    AccountingRequest accountingRequest = new AccountingRequest();
    accountingRequest.setAmount(money);
    accountingRequest.setUserId(userId);
    Future<AccountingResponse> accountingFuture = transaction.execute(accountingRequest);

     /**
     * 
     * 发布消息以触发相关的业务处理,例如增加积分。这是一个可靠的消息。
     * 这个消息会在buySomething()的事务提交后,保证成功发布出去
     * 但至于消息是否成功消费,这取决于Queue接口的具体实现
     */
    OrderMessage orderMessage = new OrderMessage();
    orderMessage.setUserId(userId);
    orderMessage.setAmount(money);
    Future<PublishResult> reliableMessage = transaction.execute(orderMessage);
}

服务提供者

对于服务提供者,则实现对应的接口,并将其注册到Spring的Bean容器即可

如提供TCC服务的业务,需实现TccMethod接口:

 public interface TccMethod<P extends TccTransRequest<R>, R  extends Serializable> extends RpcBusinessProvider<P> {
    R doTry(P param);
    void doConfirm(P param);
    void doCancel(P param);
}

具体实现:

@Component
public class WalletPayTccMethod implements TccMethod<WalletPayTccMethodRequest, WalletPayTccMethodResult>{

@Resource
private WalletService wlletService;

@Override
public WalletPayTccMethodResult doTry(WalletPayTccMethodRequest param) {
    return wlletService.doTryPay(param);
}

@Override
public void doConfirm(WalletPayTccMethodRequest param) {
    wlletService.doConfirmPay(param);
}

@Override
public void doCancel(WalletPayTccMethodRequest param) {
    wlletService.doCancelPay(param);
}
}

其中WalletPayTccMethodRequest是请求参数,其为继承自EasyTransRequest类的POJO,且其需要添加BusinessIdentifer注解,以便于框架确认本请求对应的业务ID

@BusinessIdentifer(appId=Constant.APPID,busCode=METHOD_NAME)
public class WalletPayTccMethodRequest implements TccTransRequest<WalletPayTccMethodResult>{
    private static final long serialVersionUID = 1L;

    private Integer userId;

    private Long payAmount;

    public Long getPayAmount() {
        return payAmount;
    }

    public void setPayAmount(Long payAmount) {
        this.payAmount = payAmount;
    }

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }
}

以上的示例是传统的调用形式,无业务代码入侵的使用形式如下,更具体的使用请参考demo(interfacecall):

@Transactional
public String buySomething(int userId,long money){

    int id = saveOrderRecord(jdbcTemplate,userId,money);

    //WalletPayRequestVO 是一个VO,无任何继承及注解,只有相关的属性及GETTER,SETTER
    WalletPayRequestVO request = new WalletPayRequestVO();
    request.setUserId(userId);
    request.setPayAmount(money);

    //payService是通过框架生成的一个PayService接口实例,调用该实例的方法即完成了分布式事务调用
    WalletPayResponseVO pay = payService.pay(request);

    return "id:" + id + " freeze:" + pay.getFreezeAmount();
    }

更多例子

请参考easytrans-demos里面的代码,这里提供了一个简明的代码。 更完整的配置及相关用例请参考easytrans-starter里的UT案例,UT中有一个MockSerivice,使用了各种场景的事务。并对事务的各种异常场景做了测试。

运行配置

每个运行业务的库都需要新增两张表,表的字段类型经过了修改,若之前建了表,需要重建

-- 用于记录业务发起方的最终业务有没有执行
-- p_开头的,代表本事务对应的父事务id
-- select for update查询时,若事务ID对应的记录不存在则事务一定失败了
-- 记录存在,但status为0表示事务成功,为1表示事务失败(包含父事务和本事务)
-- 记录存在,但status为2表示事务最终状态未知
CREATE TABLE `executed_trans` (
  `app_id` smallint(5) unsigned NOT NULL,
  `bus_code` smallint(5) unsigned NOT NULL,
  `trx_id` bigint(20) unsigned NOT NULL,
  `p_app_id` smallint(5) unsigned DEFAULT NULL,
  `p_bus_code` smallint(5) unsigned DEFAULT NULL,
  `p_trx_id` bigint(20) unsigned DEFAULT NULL,
  `status` tinyint(1) NOT NULL,
  PRIMARY KEY (`app_id`,`bus_code`,`trx_id`),
  KEY `parent` (`p_app_id`,`p_bus_code`,`p_trx_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

CREATE TABLE `idempotent` (
  `src_app_id` smallint(5) unsigned NOT NULL COMMENT '来源AppID',
  `src_bus_code` smallint(5) unsigned NOT NULL COMMENT '来源业务类型',
  `src_trx_id` bigint(20) unsigned NOT NULL COMMENT '来源交易ID',
  `app_id` smallint(5) NOT NULL COMMENT '调用APPID',
  `bus_code` smallint(5) NOT NULL COMMENT '调用的业务代码',
  `call_seq` smallint(5) NOT NULL COMMENT '同一事务同一方法内调用的次数',
  `handler` smallint(5) NOT NULL COMMENT '处理者appid',
  `called_methods` varchar(64) NOT NULL COMMENT '被调用过的方法名',
  `md5` binary(16) NOT NULL COMMENT '参数摘要',
  `sync_method_result` blob COMMENT '同步方法的返回结果',
  `create_time` datetime NOT NULL COMMENT '执行时间',
  `update_time` datetime NOT NULL,
  `lock_version` smallint(32) NOT NULL COMMENT '乐观锁版本号',
  PRIMARY KEY (`src_app_id`,`src_bus_code`,`src_trx_id`,`app_id`,`bus_code`,`call_seq`,`handler`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

(基于数据库实现的事物日志,若使用REDIS记录事务日志则无需以下表)需要有一个记录事务日志的数据库,并为其创建两张表。每个业务服务都必须有对应的事务日志数据库。可多个服务共用一个,也可以一个服务单独一个事务日志。

CREATE TABLE `trans_log_detail` (
  `log_detail_id` int(11) NOT NULL AUTO_INCREMENT,
  `trans_log_id` binary(12) NOT NULL,
  `log_detail` blob,
  `create_time` datetime NOT NULL,
  PRIMARY KEY (`log_detail_id`),
  KEY `app_id` (`trans_log_id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

CREATE TABLE `trans_log_unfinished` (
  `trans_log_id` binary(12) NOT NULL,
  `create_time` datetime NOT NULL,
  PRIMARY KEY (`trans_log_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
SELECT * FROM translog.trans_log_detail;

若需要使用自动补偿功能,则需要在业务库额外增加以下两个表

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

CREATE TABLE `fescar_lock` (
  `t_name` varchar(64) NOT NULL,
  `t_pk` varchar(128) NOT NULL,
  `xid` varchar(256) NOT NULL,
  `create_time` datetime NOT NULL,
  PRIMARY KEY (`t_name`,`t_pk`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

详细的配置会后续作出手册,但在此之前各位可以参考easytrans-starter里的UT案例 或者 参考demo进行

四、扩展性

框架采用接口粘合各个模块,具有较强的扩展性,推荐 扩展、替换 的模块:

五、最佳实践

基于数据库的事务日志

参数及返回值

六、FAQ

  1. 如何在CRASH后判断一个柔性事务是否提交?
    • 在调用startEasyTrans()方法时,框架将插入一条记录到executed_trans中
    • 在调用startEasyTrans()方法后,才可以执行远程事务方法
    • 业务发起者(主控事务)将持有executed_trans记录的锁直到主控事务回滚或者提交
    • 因此CRASH恢复进程使用select for update 查询executed_trans记录时,必然能得到准确的是否已经提交的结果(若主控事务仍在进行中,select for update将会等待)
    • 使用select for update是为了避免在MVCC情况下错误查询出最终事务提交结果的情况

七、外部组件版本兼容性

八、其他

欢迎加作者个人微信及公众号

公众号

wechat public account

微信(加微信请注明来源 ET)

wechat

若觉得框架不错,希望能STAR,THX

email: skyes.xu@qq.com

我写的关于ET的一些额外文章,请点赞以帮助SEO: