mirromutth / r2dbc-mysql

R2DBC MySQL Implementation
Apache License 2.0
656 stars 98 forks source link
database java mysql non-blocking r2dbc reactive reactive-streams

⚠️ Archived ⚠️

The current repository is NO LONGER MAINTAINED, please switch to io.asyncer:r2dbc-mysql.

See also https://github.com/asyncer-io/r2dbc-mysql. It is currently being maintained by @jchrys.

Reactive Relational Database Connectivity MySQL Implementation

Maven Central Apache 2.0 Unit tests status

This project contains the MySQL implementation of the R2DBC SPI. This implementation is not intended to be used directly, but rather to be used as the backing implementation for a humane client library to delegate to. See R2DBC Homepage.

This driver provides the following features:

Version compatibility / Integration tests states

MySQL 5.5 status MySQL 5.6 status MySQL 5.7 status MySQL 8.0 status

In fact, it supports lower versions, in the theory, such as 4.1, 4.0, etc.

However, Docker-certified images do not have these versions lower than 5.5.0, so tests are not integrated on these versions.

Maven

<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <version>0.8.2.RELEASE</version>
</dependency>

If you'd rather like the latest snapshots of the upcoming major version, use SonaType Maven snapshot repository and declare the appropriate dependency version.

<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <version>${r2dbc-mysql.version}.BUILD-SNAPSHOT</version>
</dependency>

<repository>
    <id>sonatype-snapshots</id>
    <name>SonaType Snapshots</name>
    <url>https://oss.sonatype.org/content/repositories/snapshots</url>
    <snapshots>
        <enabled>true</enabled>
    </snapshots>
</repository>

Gradle

Groovy DSL

dependencies {
    implementation 'dev.miku:r2dbc-mysql:0.8.2.RELEASE'
}

Kotlin DSL

dependencies {
    // Maybe should to use `compile` instead of `implementation` on the lower version of Gradle.
    implementation("dev.miku:r2dbc-mysql:0.8.2.RELEASE")
}

Getting Started

Here is a quick teaser of how to use R2DBC MySQL in Java:

URL Connection Factory Discovery

// Notice: the query string must be URL encoded
ConnectionFactory connectionFactory = ConnectionFactories.get(
    "r2dbcs:mysql://root:database-password-in-here@127.0.0.1:3306/r2dbc?" +
    "zeroDate=use_round&" +
    "sslMode=verify_identity&" +
    "useServerPrepareStatement=true&" +
    "tlsVersion=TLSv1.3%2CTLSv1.2%2CTLSv1.1&" +
    "sslCa=%2Fpath%2Fto%2Fmysql%2Fca.pem&" +
    "sslKey=%2Fpath%2Fto%2Fmysql%2Fclient-key.pem&" +
    "sslCert=%2Fpath%2Fto%2Fmysql%2Fclient-cert.pem&" +
    "sslKeyPassword=key-pem-password-in-here"
)

// Creating a Mono using Project Reactor
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

It is just example, see also Programmatic Connection Factory Discovery for more options.

Or use unix domain socket like following:

// Minimum configuration for unix domain socket
ConnectionFactory connectionFactory = ConnectionFactories.get("r2dbc:mysql://root@unix?unixSocket=%2Fpath%2Fto%2Fmysql.sock")

Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

Programmatic Connection Factory Discovery

ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
    .option(DRIVER, "mysql")
    .option(HOST, "127.0.0.1")
    .option(USER, "root")
    .option(PORT, 3306)  // optional, default 3306
    .option(PASSWORD, "database-password-in-here") // optional, default null, null means has no password
    .option(DATABASE, "r2dbc") // optional, default null, null means not specifying the database
    .option(CONNECT_TIMEOUT, Duration.ofSeconds(3)) // optional, default null, null means no timeout
    .option(Option.valueOf("socketTimeout"), Duration.ofSeconds(4)) // optional, default null, null means no timeout
    .option(SSL, true) // optional, default sslMode is "preferred", it will be ignore if sslMode is set
    .option(Option.valueOf("sslMode"), "verify_identity") // optional, default "preferred"
    .option(Option.valueOf("sslCa"), "/path/to/mysql/ca.pem") // required when sslMode is verify_ca or verify_identity, default null, null means has no server CA cert
    .option(Option.valueOf("sslCert"), "/path/to/mysql/client-cert.pem") // optional, default null, null means has no client cert
    .option(Option.valueOf("sslKey"), "/path/to/mysql/client-key.pem") // optional, default null, null means has no client key
    .option(Option.valueOf("sslKeyPassword"), "key-pem-password-in-here") // optional, default null, null means has no password for client key (i.e. "sslKey")
    .option(Option.valueOf("tlsVersion"), "TLSv1.3,TLSv1.2,TLSv1.1") // optional, default is auto-selected by the server
    .option(Option.valueOf("sslHostnameVerifier"), "com.example.demo.MyVerifier") // optional, default is null, null means use standard verifier
    .option(Option.valueOf("sslContextBuilderCustomizer"), "com.example.demo.MyCustomizer") // optional, default is no-op customizer
    .option(Option.valueOf("zeroDate"), "use_null") // optional, default "use_null"
    .option(Option.valueOf("useServerPrepareStatement"), true) // optional, default false
    .option(Option.valueOf("tcpKeepAlive"), true) // optional, default false
    .option(Option.valueOf("tcpNoDelay"), true) // optional, default false
    .option(Option.valueOf("autodetectExtensions"), false) // optional, default false
    .build();
ConnectionFactory connectionFactory = ConnectionFactories.get(options);

// Creating a Mono using Project Reactor
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

Or use unix domain socket like following:

// Minimum configuration for unix domain socket
ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
    .option(DRIVER, "mysql")
    .option(Option.valueOf("unixSocket"), "/path/to/mysql.sock")
    .option(USER, "root")
    .build();
ConnectionFactory connectionFactory = ConnectionFactories.get(options);

Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

Programmatic Configuration

MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builder()
    .host("127.0.0.1")
    .user("root")
    .port(3306) // optional, default 3306
    .password("database-password-in-here") // optional, default null, null means has no password
    .database("r2dbc") // optional, default null, null means not specifying the database
    .serverZoneId(ZoneId.of("Continent/City")) // optional, default null, null means query server time zone when connection init
    .connectTimeout(Duration.ofSeconds(3)) // optional, default null, null means no timeout
    .socketTimeout(Duration.ofSeconds(4)) // optional, default null, null means no timeout
    .sslMode(SslMode.VERIFY_IDENTITY) // optional, default SslMode.PREFERRED
    .sslCa("/path/to/mysql/ca.pem") // required when sslMode is VERIFY_CA or VERIFY_IDENTITY, default null, null means has no server CA cert
    .sslCert("/path/to/mysql/client-cert.pem") // optional, default has no client SSL certificate
    .sslKey("/path/to/mysql/client-key.pem") // optional, default has no client SSL key
    .sslKeyPassword("key-pem-password-in-here") // optional, default has no client SSL key password
    .tlsVersion(TlsVersions.TLS1_3, TlsVersions.TLS1_2, TlsVersions.TLS1_1) // optional, default is auto-selected by the server
    .sslHostnameVerifier(MyVerifier.INSTANCE) // optional, default is null, null means use standard verifier
    .sslContextBuilderCustomizer(MyCustomizer.INSTANCE) // optional, default is no-op customizer
    .zeroDateOption(ZeroDateOption.USE_NULL) // optional, default ZeroDateOption.USE_NULL
    .useServerPrepareStatement() // Use server-preparing statements, default use client-preparing statements
    .tcpKeepAlive(true) // optional, controls TCP Keep Alive, default is false
    .tcpNoDelay(true) // optional, controls TCP No Delay, default is false
    .autodetectExtensions(false) // optional, controls extension auto-detect, default is true
    .extendWith(MyExtension.INSTANCE) // optional, manual extend an extension into extensions, default using auto-detect
    .build();
ConnectionFactory connectionFactory = MySqlConnectionFactory.from(configuration);

// Creating a Mono using Project Reactor
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

Or use unix domain socket like following:

// Minimum configuration for unix domain socket
MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builder()
    .unixSocket("/path/to/mysql.sock")
    .user("root")
    .build();
ConnectionFactory connectionFactory = MySqlConnectionFactory.from(configuration);

Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

Configuration items

name valid values required description
driver A constant "mysql" Required in R2DBC discovery This driver needs to be discovered by name in R2DBC
host A hostname or IP Required when unixSocket does not exists The host of MySQL database server
unixSocket An absolute or relative path Required when host does not exists The .sock file of Unix Domain Socket
port A positive integer less than 65536 Optional, default 3306 The port of MySQL database server
user A valid MySQL username and not be empty Required Who wants to connect to the MySQL database
password Any printable string Optional, default no password The password of the MySQL database user
database A valid MySQL database name Optional, default does not initialize database Database used by the MySQL connection
connectTimeout A Duration which must be positive duration Optional, default has no timeout TCP connect timeout
socketTimeout A Duration which must be positive duration Optional, default has no timeout TCP socket timeout
serverZoneId An id of ZoneId Optional, default query time zone when connection init Server time zone id
tcpKeepAlive true or false Optional, default disabled Controls TCP KeepAlive
tcpNoDelay true or false Optional, default disabled Controls TCP NoDelay
sslMode A value of SslMode Optional, default PREFERRED when using hosting connection, DISABLED when using Unix Domain Socket SSL mode, see following notice
sslCa A path of local file which type is PEM Required when sslMode is VERIFY_CA or VERIFY_IDENTITY The CA cert of MySQL database server
sslCert A path of local file which type is PEM Required when sslKey exists The SSL cert of client
sslKey A path of local file which type is PEM Required when sslCert exists The SSL key of client
sslKeyPassword Any valid password for PEM file Optional, default sslKey has no password The password for client SSL key (i.e. sslKey)
tlsVersion Any value list of TlsVersions Optional, default is auto-selected by the server The TLS version for SSL, see following notice
sslHostnameVerifier A HostnameVerifier Optional, default use RFC standard Used only if SslMode is VERIFY_CA or higher
sslContextBuilderCustomizer A Function<SslContextBuilder, SslContextBuilder> Optional, default is NO-OP function Used only if SslMode is not DISABLED
zeroDateOption Any value of ZeroDateOption Optional, default USE_NULL The option indicates "zero date" handling, see following notice
autodetectExtensions true or false Optional, default is true Controls auto-detect Extensions
useServerPrepareStatement true, false or Predicate<String> Optional, default is false See following notice

Should use enum in Programmatic configuration that not like discovery configurations, except TlsVersions (All elements of TlsVersions will be always String which is case-sensitive).

Pooling

See r2dbc-pool.

Simple statement

connection.createStatement("INSERT INTO `person` (`first_name`, `last_name`) VALUES ('who', 'how')")
    .execute(); // return a Publisher include one Result

Parametrized statement

connection.createStatement("INSERT INTO `person` (`birth`, `nickname`, `show_name`) VALUES (?, ?name, ?name)")
    .bind(0, LocalDateTime.of(2019, 6, 25, 12, 12, 12))
    .bind("name", "Some one") // Not one-to-one binding, call twice of native index-bindings, or call once of name-bindings.
    .add()
    .bind(0, LocalDateTime.of(2009, 6, 25, 12, 12, 12))
    .bind(1, "My Nickname")
    .bind(2, "Naming show")
    .returnGeneratedValues("generated_id")
    .execute(); // return a Publisher include two Results.

Batch statement

connection.createBatch()
    .add("INSERT INTO `person` (`first_name`, `last_name`) VALUES ('who', 'how')")
    .add("UPDATE `earth` SET `count` = `count` + 1 WHERE `id` = 'human'")
    .execute(); // return a Publisher include two Results.

The last ; will be removed if and only if last statement contains ';', and statement has only whitespace follow the last ;.

Transactions

connection.beginTransaction()
    .then(Mono.from(connection.createStatement("INSERT INTO `person` (`first_name`, `last_name`) VALUES ('who', 'how')").execute()))
    .flatMap(Result::getRowsUpdated)
    .thenMany(connection.createStatement("INSERT INTO `person` (`birth`, `nickname`, `show_name`) VALUES (?, ?name, ?name)")
        .bind(0, LocalDateTime.of(2019, 6, 25, 12, 12, 12))
        .bind("name", "Some one")
        .add()
        .bind(0, LocalDateTime.of(2009, 6, 25, 12, 12, 12))
        .bind(1, "My Nickname")
        .bind(2, "Naming show")
        .returnGeneratedValues("generated_id")
        .execute())
    .flatMap(Result::getRowsUpdated)
    .then(connection.commitTransaction());

Data Type Mapping

The default built-in Codecs reference table shows the type mapping between MySQL and Java data types:

MySQL Type Unsigned Support Data Type
INT UNSIGNED Long, BigInteger
INT SIGNED Integer, Long, BigInteger
TINYINT UNSIGNED Short, Integer, Long, BigInteger, Boolean (Size is 1)
TINYINT SIGNED Byte, Short, Integer, Long, BigInteger, Boolean (Size is 1)
SMALLINT UNSIGNED Integer, Long, BigInteger
SMALLINT SIGNED Short, Integer, Long, BigInteger
MEDIUMINT SIGNED/UNSIGNED Integer, Long, BigInteger
BIGINT UNSIGNED BigInteger, Long (Not check overflow)
BIGINT SIGNED Long, BigInteger
FLOAT SIGNED / UNSIGNED Float, BigDecimal
DOUBLE SIGNED / UNSIGNED Double, BigDecimal
DECIMAL SIGNED / UNSIGNED BigDecimal, Float (Size less than 7), Double (Size less than 16)
BIT - ByteBuffer, BitSet, Boolean (Size is 1), byte[]
DATETIME / TIMESTAMP - LocalDateTime, ZonedDateTime, OffsetDateTime, Instant
DATE - LocalDate
TIME - LocalTime, Duration, OffsetTime
YEAR - Short, Integer, Long, BigInteger, Year
VARCHAR / NVARCHAR - String
VARBINARY - ByteBuffer, Blob, byte[]
CHAR / NCHAR - String
ENUM - String, Enum<?>
SET - String[], String, Set<String> and Set<Enum<?>> (Set<T> need use ParameterizedType)
BLOBs (LONGBLOB, etc.) - ByteBuffer, Blob, byte[]
TEXTs (LONGTEXT, etc.) - String, Clob
JSON - String, Clob
GEOMETRY - byte[], Blob

Add a Codec

This is an extension of a highly customized driver behavior of encoding parameter or decoding field data.

Example for an extending Codec of JSON based-on Jackson.

First, implement a Codec, ParametrizedCodec, MassiveCodec or MassiveParametrizedCodec.

Actually, JSON can store large json data, and its byte size can be UnsignedInteger.MAX_VALUE. However, this is just an example.

public final class JacksonCodec implements Codec<Object> {

    /**
     * JUST for example, should configure it in real applications.
     */
    private static final ObjectMapper MAPPER = new ObjectMapper();

    private final ByteBufAllocator allocator;

    /**
     * Used for encoding/decoding mode, see also registrar in second step.
     */
    private final boolean encoding;

    public JacksonCodec(ByteBufAllocator allocator, boolean encoding) {
        this.allocator = allocator;
        this.encoding = encoding;
    }

    @Override
    public Object decode(ByteBuf value, FieldInformation info, Class<?> target, boolean binary, CodecContext context) {
        // If you ensure server is using UTF-8, you can just use InputStream
        try (Reader r = new InputStreamReader(new ByteBufInputStream(value), CharCollation.fromId(info.getCollationId(), context.getServerVersion()).getCharset())) {
            return MAPPER.readValue(r, target);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public Parameter encode(Object value, CodecContext context) {
        return new JacksonParameter(allocator, value, context);
    }

    @Override
    public boolean canDecode(FieldInformation info, Class<?> target) {
        return !encoding && info.getType() == DataTypes.JSON && info.getCollationId() != CharCollation.BINARY_ID;
    }

    @Override
    public boolean canEncode(Object value) {
        return encoding;
    }

    private static final class JacksonParameter implements Parameter {

        private final ByteBufAllocator allocator;

        private final Object value;

        private final CodecContext context;

        private JacksonParameter(ByteBufAllocator allocator, Object value, CodecContext context) {
            this.allocator = allocator;
            this.value = value;
            this.context = context;
        }

        @Override
        public Mono<ByteBuf> publishBinary() {
            // JSON in binary protocol should be a var-integer sized encoded string.
            // That means we should write a var-integer as a size of following content
            // bytes firstly, then write the encoded string as content.
            //
            // Binary protocol may be different for each type of encoding, so if do not
            // use binary protocol, just return a Mono.error() instead.
            return Mono.fromSupplier(() -> {
                Charset charset = context.getClientCollation().getCharset();
                ByteBuf content = allocator.buffer();

                // Encode and calculate content bytes first, we should know bytes size.
                try (Writer w = new OutputStreamWriter(new ByteBufOutputStream(content), charset)) {
                    MAPPER.writeValue(w, value);
                } catch (IOException e) {
                    content.release();
                    throw new CustomRuntimeException(e);
                } catch (Throwable e) {
                    content.release();
                    throw e;
                }

                ByteBuf buf = null;
                try {
                    buf = allocator.buffer();
                    // VarIntUtils is an unstable, internal utility.
                    VarIntUtils.writeVarInt(buf, content.readableBytes());
                    return buf.writeBytes(content);
                } catch (Throwable e) {
                    if (buf != null) {
                        buf.release();
                    }
                    throw e;
                } finally {
                    content.release();
                }
            });
        }

        @Override
        public Mono<Void> publishText(ParameterWriter writer) {
            return Mono.fromRunnable(() -> {
                try {
                    MAPPER.writeValue(writer, value);
                } catch (IOException e) {
                    throw new CustomRuntimeException(e);
                }
            });
        }

        @Override
        public short getType() {
            return DataTypes.VARCHAR;
        }
    }
}

Second, implement a CodecRegistrar.

// It is just an example of package name and does not represent any company, individual or organization.
package org.example.demo.json;

// Some imports...

public final class JacksonCodecRegistrar implements CodecRegistrar {

    @Override
    public void register(ByteBufAllocator allocator, CodecRegistry registry) {
        // Decoding JSON by highest priority, encoding anything by lowest priority.
        registry.addFirst(new JacksonCodec(allocator, false))
            .addLast(new JacksonCodec(allocator, true));
    }
}

Finally, create a file in META-INF/services, which file name is dev.miku.r2dbc.mysql.extension.Extension, it contains this line:

org.example.demo.json.JacksonCodecRegistrar

Reporting Issues

The R2DBC MySQL Implementation uses GitHub as issue tracking system to record bugs and feature requests. If you want to raise an issue, please follow the recommendations below:

Before use

License

This project is released under version 2.0 of the Apache License.

Acknowledgements

Contributors

Avatars of contributors

Thanks a lot for your support!

Supports