将MyBatis Reactive化,将底层的JDBC替换为R2DBC,从而实现全面异步化,提升数据库访问性能。
和MyBatis的使用流程一样,如下:
// construct MyBatis Configuration
XMLConfigBuilder xmlConfigBuilder = new XMLConfigBuilder(this.getClass().getResourceAsStream("/mybatis-config.xml"));
Configuration configuration = xmlConfigBuilder.parse();
// construct Reactive SQL Session Factory
ReactiveSqlSessionFactory reactiveSqlSessionFactory = new DefaultReactiveSqlSessionFactory(configuration);
ReactiveSqlSession reactiveSqlSession = reactiveSqlSessionFactory.openSession();
// construct Reactive Mapper interface
UserReactiveMapper userMapper = reactiveSqlSession.getMapper(UserReactiveMapper.class);
R2DBC有标准的Data Types支持,可以在 https://r2dbc.io/spec/0.8.1.RELEASE/spec/html/#datatypes 进行查阅
考虑到MyBatis JDBC支持的类型,以下的类型也是被支持的:
Please use R2dbcTransactionManager from Spring Data R2DBC
从目前开发的情况来看,可能MariaDB R2DBC更具有前景一些。
MyBatis针对resultMap的result有一个typeHandler的设置,可以进行Java对象字段和数据库表的column直接进行值转换。 在MyBatis R2DBC中我们保留这一特性。 由于R2DBC和JDBC的API完全不同,所以新增了一个R2DBCTypeHandler接口,代码如下:
public interface R2DBCTypeHandler<T> {
void setParameter(Statement statement, int i, T parameter, JdbcType jdbcType) throws R2dbcException;
T getResult(Row row, String columnName, RowMetadata rowMetadata) throws R2dbcException;
T getResult(Row row, int columnIndex, RowMetadata rowMetadata) throws R2dbcException;
Class<?> getType();
}
但是考虑到实际的MyBatis的配置验证,新建的TypeHandler实现需要同时继承TypeHandler和R2DBCTypeHandler接口,同时也保证在JDBC和R2DBC同时生效。
你可以参考 org.apache.ibatis.r2dbc.type 开发包下的type handler进行编写,同时JDBC的时间类型的兼容,也是通过type handler完成的。
Java Enum type handler和MyBatis处理机制类似,只是类名调整到R2DBC下的EnumOrdinalTypeHandler,代码如下:
<typeHandlers>
<!-- Enum ordinal type handler-->
<typeHandler handler="org.apache.ibatis.r2dbc.type.EnumOrdinalTypeHandler"
javaType="java.math.RoundingMode"/>
</typeHandlers>
注意: @MappedJdbcTypes类型的TypeHandler不支持
Reactive Transaction 支持 https://spring.io/blog/2019/05/16/reactive-transactions-with-spring
Spring的R2dbcTransactionManager是基于R2DBC的ConnectionFactory,所以我们不需要进行什么配置,基本代码如下:
public class R2dbcTransactionManagerAutoConfiguration {
@Bean
@ConditionalOnMissingBean(ReactiveTransactionManager.class)
public R2dbcTransactionManager connectionFactoryTransactionManager(ConnectionFactory connectionFactory) {
return new R2dbcTransactionManager(connectionFactory);
}
}
如果是基于R2DBC Pool,R2dbcTransactionManager就是基于ConnectionPool的,当然ConnectionPool就是继承自ConnectionFactory的。
和R2DBC相关的配置参数,主要是通过MyBatis的mybatis-config.xml文件中的properties配置完成的,如下:
<configuration>
<properties>
<property name="metrics.enabled" value="true"/>
<property name="r2dbc.pool.initial-size" value="1"/>
<property name="r2dbc.pool.max-size" value="10"/>
<!-- max idle time, the unit is Minute-->
<property name="r2dbc.pool.max-idle-time" value="5"/>
</properties>
</configuration>
背后的原因其实主要是JDBC和R2DBC的区别,我们都知道JDBC同步Block的,所以我们需要有连接池DataSource,防止应为JDBC的等待造成Thread的堵塞。
MyBatis的一个核心是SqlSession,但是两者的设计还有很大的区别的:
JDBC使用连接池这个大家都明白, 通过增加连接数减少因为JDBC Connection等待的结果造成的block,不然应用整体的性能下降的非常快。 在R2DBC中也有Connection Pool的概念,其主要是兼容传统数据库同步协议,如MySQL,解释如下:
Traditionally, many MySQL drivers used a synchronous approach when executing SQL statements. This meant that operations such as opening connections and executing queries were blocked until completion, which could take a long time. To allow for parallel execution, a developer had to write a multithreaded application. Any MySQL client that supports the X Protocol can provide asynchronous execution, either using callbacks, Promises, or by explicitly waiting on a specific result at the moment in time when it is actually needed.
如果数据库通讯协议是全部异步的,那么是没有必要介入Connection Pool的。 如mariadb-connector-r2dbc的实现中就有:
allowPipelining: Permit to send queries to server without waiting for previous query to finish
如果allowPipelining为false,这个时候,出于性能的考虑,你还是需要介入R2DBC Pool来提升应用性能。
Pipelining versus Parallel Query Execution请参考: https://www.percona.com/blog/2016/07/06/pipelining-versus-parallel-query-execution-with-mysql-5-7-x-plugin/
对于Reactive来说,默认就包含了cache特性,也就是你可以调用 Mono.cache() 或 Flux.cache 就可以缓存响应的结果集,然后提供给后续的订阅者消费。
对应MyBatis来说,cache也并复杂,Cache也是基于Mapper的,我们只需要基于paramObject缓存对应的Mono或者Flux对象就可以。
Mono<String> user = Mono.defer(() -> {
// r2dbc operation
return r2dbcResult;
}).cache(Duration.ofSeconds(2));
但是,这个只是基于内存的对象缓存,如果是分布式的,上述机制不能工作,这涉及到对象序列化和反序列化机制的机制。
至于选择哪一种,这个可能要根据实际的缓存类型进行决定。
如果你项目中已经包含spring-boot-starter-data-r2dbc,那么你可以完全使用Spring Boot提供的R2DBC的ConnectionFactory bean完成数据库连接相关的任务。
@Bean
public ReactiveSqlSessionFactory reactiveSqlSessionFactory(ConnectionFactory r2dbcConnectionFactory) {
XMLConfigBuilder xmlConfigBuilder = new XMLConfigBuilder(this.getClass().getResourceAsStream("/mybatis-config.xml"));
Configuration configuration = xmlConfigBuilder.parse();
return new DefaultReactiveSqlSessionFactory(configuration, r2dbcConnectionFactory);
}
<mappers>
<mapper class="org.mybatis.builder.AuthorReactiveMapper"/>
</mappers>