Connect NATS to Flink with Java
Current Release: 2.0.0-beta1 Current Snapshot: 2.0.0-beta2-SNAPSHOT
The connector requires Java version 11 or later to be compatible with Flink libraries. The JNATS library is built with Java 8 and is compatible being run by a later version of Java.
This library adheres to semver.
In order to allow for completely different implementations for JetStream support,
the implementations have been put into distinct Java packages, for instance
io.synadia.flink.v0
This has allowed us to release this while provide the future ability
to change the api without requiring a semver major bump. To accomplish this, the
new implementation will be put in the io.synadia.flink.v1
package hierarchy.
The reason the current release is also beta is that we are hoping for use and feedback.
In order to construct a source, you must use the builder.
The NatsSourceBuilder is generic. It's generic type, <OutputT>
is the type of object that will be created from a
message's subject, headers and payload data byte[]
You must set or include properties to construct a connection unless you are connecting to 'nats://localhost:4222' with no security.
The builder has these methods:
subjects(String... subjects)
subjects(List<String> subjects)
connectionProperties(Properties connectionProperties)
connectionPropertiesFile(String connectionPropertiesFile)
minConnectionJitter(long minConnectionJitter)
maxConnectionJitter(long maxConnectionJitter)
payloadDeserializer(PayloadDeserializer<InputT> payloadDeserializer)
payloadDeserializerClass(String payloadDeserializerClass)
sourceProperties(Properties properties)
When using the builder, the last call value is used, they are not additive.
subjects
connectionProperties
or connectionPropertiesFile
payloadSerializer
or payloadSerializerClass
sourceProperties
You can supply source settings with code
NatsSource<String> source = new NatsSourceBuilder<String>()
.subjects("subject1", "subject1")
.connectionPropertiesFile("/path/to/jnats_client_connection.properties")
.minConnectionJitter(1000)
.maxConnectionJitter(5000)
.payloadDeserializer("io.synadia.payload.StringPayloadDeserializer")
.build();
You can also supply source properties from a file.
Properties props = Utils.loadPropertiesFromFile("/path/to/source.properties");
NatsSource<String> source = new NatsSourceBuilder<String>()
.sourceProperties(props)
.connectionPropertiesFile("/path/to/jnats_client_connection.properties")
.build();
The source supports these property keys
source.subjects
source.payload.deserializer
source.startup.jitter.min
source.startup.jitter.max
It's okay to use the combine the source properties and the connection properties
Properties props = Utils.loadPropertiesFromFile("/path/to/sourceAndConnection.properties");
NatsSource<String> source = new NatsSourceBuilder<String>()
.sourceProperties(props)
.connectionProperties(props)
.build();
-or-
Properties props = Utils.loadPropertiesFromFile("/path/to/sourceAndConnection.properties");
NatsSource<String> source = new NatsSourceBuilder<String>()
.sourceProperties(props)
.connectionPropertiesFile("/path/to/sourceAndConnection.properties")
.build();
In order to construct a sink, you must use the builder.
The NatsSinkBuilder is generic. It's generic type, <InputT>
is the type of object you expect from a source that will become the byte[] payload of a message.
You must set or include properties to construct a connection unless you are connecting to 'nats://localhost:4222' with no security.
The builder has these methods:
subjects(String... subjects)
subjects(List<String> subjects)
connectionProperties(Properties connectionProperties)
connectionPropertiesFile(String connectionPropertiesFile)
minConnectionJitter(long minConnectionJitter)
maxConnectionJitter(long maxConnectionJitter)
payloadSerializer(PayloadSerializer<InputT> payloadSerializer)
payloadSerializerClass(String payloadSerializerClass)
sinkProperties(Properties properties)
When using the builder, the last call value is used, they are not additive.
subjects
connectionProperties
or connectionPropertiesFile
payloadSerializer
or payloadSerializerClass
sinkProperties
You can supply sink settings with code
NatsSink<String> sink = new NatsSinkBuilder<String>()
.subjects("subject1", "subject2")
.connectionPropertiesFile("/path/to/jnats_client_connection.properties")
.minConnectionJitter(1000)
.maxConnectionJitter(5000)
.payloadSerializerClass("io.synadia.payload.StringPayloadSerializer")
.build();
You can also supply sink properties from a file.
Properties props = Utils.loadPropertiesFromFile("/path/to/sink.properties");
NatsSink<String> sink = new NatsSinkBuilder<String>()
.sinkProperties(props)
.connectionPropertiesFile("/path/to/jnats_client_connection.properties")
.build();
The sink supports these property keys
sink.subjects
sink.payload.serializer
sink.startup.jitter.min
sink.startup.jitter.max
It's okay to use the combine the sink properties and the connection properties
Properties props = Utils.loadPropertiesFromFile("/path/to/sinkAndConnection.properties");
NatsSink<String> sink = new NatsSinkBuilder<String>()
.sinkProperties(props)
.connectionProperties(props)
.build();
-or-
Properties props = Utils.loadPropertiesFromFile("/path/to/sinkAndConnection.properties");
NatsSink<String> sink = new NatsSinkBuilder<String>()
.sinkProperties(props)
.connectionPropertiesFile("/path/to/sinkAndConnection.properties")
.build();
If it was not clear from the source and sink sections, there are two ways to get the connection properties into the sink or source:
1. by passing a file location via .connectionPropertiesFile(String)
When the sink or source are given a properties file location, this must be an existing path on every instance of Flink in the cluster environment, otherwise it will not be found and the sink or source won't be able to connect.
NatsSink<String> sink = new NatsSinkBuilder<String>()
...
.connectionPropertiesFile("/path/to/jnats_client_connection.properties")
...
.build();
2. by passing a Properties object prepared in code .connectionProperties(Properties)
When the properties are prepared in code and given as an object, it is serialized during construction so it can be sent over the wire to nodes that will be running instances of sink or source. This will reduce the requirement for having a properties file on each instance but at the tradeoff of having this information passed over the wire in normal java object serialization form, which is not necessarily secure.
Properties connectionProperties = ...
NatsSink<String> sink = new NatsSinkBuilder<String>
...
.connectionProperties(connectionProperties)
...
.build();
For full connection properties see the NATS - Java Client, README Options Properties
io.nats.client.keyStorePassword=kspassword
io.nats.client.keyStore=/path/to/keystore.jks
io.nats.client.trustStorePassword=tspassword
io.nats.client.trustStore=/path/to/truststore.jks
io.nats.client.url=tls://myhost:4222
There are 3 ways to get a de/serializer into your source/sink.
1. By giving the fully qualified class name.
NatsSink<String> sink = new NatsSinkBuilder<String>
...
.payloadSerializerClass("io.synadia.flink.payload.StringPayloadSerializer")
...
.build();
NatsSource<String> source = new NatsSourceBuilder<String>
...
.payloadDeserializerClass("io.synadia.flink.payload.StringPayloadDeserializer")
...
.build();
2. By supplying an instance of the serializer
StringPayloadSerializer serializer = new StringPayloadSerializer();
NatsSink<String> sink = new NatsSinkBuilder<String>
...
.payloadSerializer(serializer)
...
.build();
StringPayloadDeserializer serializer = new StringPayloadDeserializer();
NatsSource<String> source = new NatsSourceBuilder<String>
...
.payloadDeserializer(serializer)
...
.build();
3. By supplying the fully qualified name as a property
sink.payload.serializer=com.mycompany.MySerializer
Properties props = Utils.loadPropertiesFromFile("/path/to/sink.properties");
NatsSink<MySerializerType> sink = new NatsSinkBuilder<MySerializerType>
...
.sinkProperties(props)
...
.build();
source.payload.deserializer=com.mycompany.MyDeserializer
Properties props = Utils.loadPropertiesFromFile("/path/to/source.properties");
NatsSource<MyDeserializerType> source = new NatsSourceBuilder<MyDeserializerType>
...
.sourceProperties(props)
...
.build();
There is nothing significantly different, it is simply a developer's preference. In either case the class
implements Serializable
, which is usually trivial since there is should not be any state.
If any of these conditions are not met, a terminal exception will be thrown.The NATS client is available in the Maven central repository, and can be imported as a standard dependency in your build.gradle
file:
dependencies {
implementation 'io.synadia:flink-connector-nats:{major.minor.patch}'
}
If you need the latest and greatest before Maven central updates, you can use:
repositories {
mavenCentral()
maven {
url "https://s01.oss.sonatype.org/content/repositories/releases"
}
}
If you need a snapshot version, you must add the url for the snapshots and change your dependency.
repositories {
mavenCentral()
maven {
url "https://s01.oss.sonatype.org/content/repositories/snapshots"
}
}
dependencies {
implementation 'io.synadia:flink-connector-nats:{major.minor.patch}-SNAPSHOT'
}
The NATS client is available on the Maven central repository, and can be imported as a normal dependency in your pom.xml file:
<dependency>
<groupId>io.synadia</groupId>
<artifactId>flink-connector-nats</artifactId>
<version>{major.minor.patch}</version>
</dependency>
If you need the absolute latest, before it propagates to maven central, you can use the repository:
<repositories>
<repository>
<id>sonatype releases</id>
<url>https://s01.oss.sonatype.org/content/repositories/releases</url>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
If you need a snapshot version, you must enable snapshots and change your dependency.
<repositories>
<repository>
<id>sonatype snapshots</id>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependency>
<groupId>io.synadia</groupId>
<artifactId>flink-connector-nats</artifactId>
<version>{major.minor.patch}-SNAPSHOT</version>
</dependency>
Copyright (c) 2023-2024 Synadia Communications Inc. All Rights Reserved. See LICENSE and NOTICE file for details.