Status
Table of Contents DocToc
Nakadi-java is a client driver for the Nakadi Event Broker. It was created for the following reasons:
Completeness. Provide a full reference implementation of the Nakadi API for producers and consumers.
Minimise dependencies. The client doesn't force a dependency on frameworks or libraries. The sole dependency is on the SLF4J API.
Robust HTTP handling. Request/response behaviour and consumer stream handling are given the same importance as functionality.
Operational visibility. Error handling, stream retries, logging and instrumentation are given the same importance as functionality.
Be easy to use. The client should be straightforward to use as is, or as an engine for higher level abstractions.
A number of JVM clients already exist and are in use - nakadi-java is not meant to compete with or replace them. In certain respects they solve different goals. The existing JVM clients looked at as a whole, provide partial implementations with larger dependencies, but which are idiomatic to certain frameworks, whereas the aim of nakadi-java is to provide a full client with a reduced dependency footprint to allow portability.
Nakadi-java is designed for application development. If you're just looking
for a quick way to browse and examine streams, take a look at the excellent
Peek library.
See the installation section on how to add the client library to your project as a jar dependency. The client uses Java 1.8 or later.
The client is pre 1.0.0, with the aim of getting to 1.0.0 quickly.
The client API is relatively stable and unlikely to see massive sweeping changes, though some changes should be expected. The entire Nakadi API is implemented.
The client's had some basic testing to verify it can handle things like consumer stream connection/network failures and retries. It should not be deemed robust yet, but it is a goal to produce a well-behaved production level client especially for producing and consuming events for 1.0.0. See also:
The open issues section has a list of bugs and things to get done.
The help-wanted has a list of things that would be pretty cool to have.
As a client that aims to provide a full implementation, it will post 1.0.0 continue to track the development of the Nakadi Event Broker's API.
This section summarizes what you can do with the client. The nakadi-java-examples project provides runnable examples for most of what you see here.
The API resources this client supports are:
A new client can be created via a builder:
NakadiClient client = NakadiClient.newBuilder()
.baseURI("http://localhost:9080")
.build();
You can create multiple clients if you wish. Every client must have a base URI set and can optionally have other values set (notably for token providers and metrics collection).
Here's a fuller configuration:
NakadiClient client = NakadiClient.newBuilder()
.baseURI("http://localhost:9080")
.metricCollector(myMetricsCollector)
.tokenProvider(myResourceTokenProvider)
.readTimeout(60, TimeUnit.SECONDS)
.connectTimeout(30, TimeUnit.SECONDS)
.build();
By default the client does not send an authorization header with each request. This is useful for working with a development Nakadi server, which will try and resolve bearer tokens if they are sent but will accept requests with no bearer token present.
You can define a token provider by implementing the TokenProvider
interface, which will supply the client with a string that will be
sent to the server as the value of an Authorization header. The
TokenProvider
is called on each request and thus can be implemented as
a dynamic provider to handle token refreshes and recycling.
NakadiClient client = NakadiClient.newBuilder()
.baseURI("http://localhost:9080")
.tokenProvider(new MyTokenProvider())
.build();
There's a ZignTokenProvider
that can connect to the zign process and run in
the background in the
nakadi-java-zign
sub-project.
Some resources support use of OAuth scopes (where the API documents them, it's incomplete as of 2016-11-15):
StreamProcessor
: can be set via StreamProcessor.Builder.scope()
before calling start()
.EventTypeResource
: can be set via EventTypeResource.scope()
before making an API call.EventResource
: can be set via EventResource.scope
before making an API call.SubscriptionResource
: can be set via EventResource.scope
before making an API call.On each request the client will resolve the scope to a token by asking the TokenProvider
to
supply a token via authHeaderValue
. If a custom scope has been applied on the request it will
be used, otherwise the default scope documented by the API will be used.
The scope set on resource instances is stateful, not one-shot, and will be re-used across requests.
To change the scope, call scope()
again will a new scope value, or if you wish to clear the
custom scope and revert to defaults, call scope()
with null
. However the StreamProcessor
scope is fixed once streaming begins after start()
is called and can't be changed.
The client checks certificates. If your target server is using a self-signed
certificate and for some reason you can't install that cert into the system
trust store using something like keytool, you can supply the cert via
the builder's certificatePath
method:
NakadiClient client = NakadiClient.newBuilder()
.baseURI("http://localhost:9080")
.certificatePath("file:///var/certs")
.build();
This will cause the client to install any certificates it finds. There are three loading options
A path beginning with "file:///"
will load from the supplied directory any
files with *.crt
and *.pem
extensions
A path beginning with "classpath:"
and ending with *.crt
or *.pem
will
load that resource item from the classpath.
A path beginning with "classpath:"
will load from the supplied classpath
directory any files with *.crt
and *.pem
extensions.
The classpath option targeting a directory is for local development and not meant for production/deployed situations. If you must use the classpath for deployed apps, use the cert resource option as that will allow the classpath resolver to work more generally.
If no certificatePath
is supplied, the system defaults are used. This is the
strongly recommended option for deployments.
The client emits well known metrics as meters and timers (see MetricCollector
for the available metrics).
By default the client ignores metrics, but you can supply your own collector.
For example, this sets the client to use MetricsCollectorDropwizard
, from
the support library that integrates with
Dropwizard Metrics:
MetricRegistry metricRegistry = new MetricRegistry();
MetricsCollectorDropwizard metrics =
new MetricsCollectorDropwizard("mynamespace", metricRegistry);
NakadiClient client = NakadiClient.newBuilder()
.baseURI("http://localhost:9080")
.metricCollector(metrics)
.build();
To provide your own collector implement the MetricCollector
interface. Each
emitted metric is based on an enum. Implementations can look at the enum and
record as they wish. They can also work with them generally and ask any enum
for its path, which will be a dotted string.
Please note that calls to the collector are currently blocking. This may be changed to asynchronous for 1.0.0, but in the meantime if your collector is making network calls or hitting disk, you might want to hand off them off as Callables or send them to a queue.
Some calls return Response
objects that contain raw json. You can serialize
these using the JsonSupport
helper, available from the client. JsonSupport
accepts classes, and for generic bindings you can supply it with a TypeLiteral
.
When using a TypeLiteral
, please note the following:
TypeLiterals must be an actual subclass. This means declaring the TypeLiteral with a
pair of braces new TypeLiteral<Map<String, Object>>() {};
and not just
new TypeLiteral<Map<String, Object>>();
. The latter won't work and can cause hard to debug
errors.
TypeLiterals for the 3 category classes can't be declared with a String. For example
DataChangeEvent<String>
will cause marshalling errors, because the underlying JSON
processing treats String
as a JSON String type and not escaped JSON. The parser then
fails when it sees structured JSON instead of a JSOn String. Typically you want to declare
something like DataChangeEvent<Map<String, Object>>
to destructure the data properly. The
client might add a stringified option for 1.0.0.
Once you have a client, you can access server resources via the resources()
method. Here's an example that gets an events resource:
EventResource resource = client.resources().events();
All calls you make to the server will be done via these resource classes to make network calls distinct from local requests.
A number of the non streaming resource classes support a backoff policy:
EventTypeResource
SubscriptionResource
EventResource
RegistryResource
MetricsResource
HealthCheckResource
They each take a RetryPolicy
via a retryPolicy()
method; there is an inbuilt ExponentialRetry
that can be used to define a maximum number of requests or maximum total time elapsed. Note that
the retry policy object is stateful and must be reset between results. You can disable the retries
(the default behavior) by setting retryPolicy
to null, or to start a new retry supplying a fresh
RetryPolicy
instance.
Please be careful with EventTypeResource: the ordering and general delivery behaviour for event delivery is undefined under retries. That is, a delivery retry may result in out of order batches being sent to the server. Also retrying a partially delivered (207) batch may result in one or more events being delivered multiple times.
You can create, edit and delete event types as well as list them:
// grab an event type resource
EventTypeResource eventTypes = client.resources().eventTypes();
// create a new event type, using an escaped string for the schema
EventType requisitions = new EventType()
.category(EventType.Category.data)
.name("priority-requisitions")
.owningApplication("weyland")
.partitionStrategy(EventType.PARTITION_HASH)
.enrichmentStrategy(EventType.ENRICHMENT_METADATA)
.partitionKeyFields("id")
.cleanupPolicy("delete")
.schema(new EventTypeSchema().schema(
"{ \"properties\": { \"id\": { \"type\": \"string\" } } }"));
Response response = eventTypes.create(requisitions);
// read the partitions for an event type
PartitionCollection partitions = eventTypes.partitions("priority-requisitions");
partitions.iterable().forEach(System.out::println);
// read a particular partition
Partition partition = eventTypes.partition("priority-requisitions", "0");
System.out.println(partition);
// list event types
EventTypeCollection list = client.resources().eventTypes().list();
list.iterable().forEach(System.out::println);
// find by name
EventType byName = eventTypes.findByName("priority-requisitions");
// update
Response update = eventTypes.update(byName);
// remove
Response delete = eventTypes.delete("priority-requisitions");
You can send one or more events to the server:
EventResource resource = client.resources().events();
// nb: EventMetadata.newPreparedEventMetadata sets defaults for eid, occurred at and flow id fields
EventMetadata em = EventMetadata.newPreparedEventMetadata();
// you can send flowids as strings and tracing spans as Map<String, String>
EventMetadata em1 = new EventMetadata()
.eid(UUID.randomUUID().toString())
.occurredAt(OffsetDateTime.now())
.spanCtx(tracingSpan)
.flowId("decafbad");
// create our domain event inside a typesafe DataChangeEvent
PriorityRequisition pr = new PriorityRequisition("22");
DataChangeEvent<PriorityRequisition> dce = new DataChangeEvent<PriorityRequisition>()
.metadata(em)
.op(DataChangeEvent.Op.C)
.dataType("priority-requisitions")
.data(pr);
Response response = resource.send("priority-requisitions", dce);
// send a batch of two events
DataChangeEvent<PriorityRequisition> dce1 = new DataChangeEvent<PriorityRequisition>()
.metadata(EventMetadata.newPreparedEventMetadata())
.op(DataChangeEvent.Op.C)
.dataType("priority-requisitions")
.data(new PriorityRequisition("23"));
DataChangeEvent<PriorityRequisition> dce2 = new DataChangeEvent<PriorityRequisition>()
.metadata(EventMetadata.newPreparedEventMetadata())
.op(DataChangeEvent.Op.C)
.dataType("priority-requisitions")
.data(new PriorityRequisition("24"));
ArrayList list = new ArrayList();
list.add(dce1);
list.add(dce2);
Response batch = resource.send("priority-requisitions", list);
Event posting can be compressed by configuring the client
with .enablePublishingCompression()
:
NakadiClient client = NakadiClient.newBuilder()
.baseURI("http://localhost:9080")
.enablePublishingCompression()
.build();
Events can be sent with compaction information by setting their metadata.
This is required when the cleanup_policy
of event type is set to compact
.
// create metadata with compaction information for an event
EventMetadata compacted = EventMetadata.newPreparedEventMetadata()
.partitionCompactionKey("329ed3d2-8366-11e8-adc0-fa7ae01bbebc");
PriorityRequisition pr = new PriorityRequisition("23");
DataChangeEvent<PriorityRequisition> dce = new DataChangeEvent<PriorityRequisition>()
.metadata(compacted)
.op(DataChangeEvent.Op.C)
.dataType("priority-requisitions")
.data(pr);
Response response = resource.send("priority-requisitions", dce);
You can create, edit and delete subscriptions as well as list them:
// grab a subscription resource
SubscriptionResource resource = client.resources().subscriptions();
// create a new subscription
Subscription subscription = new Subscription()
.consumerGroup("mccaffrey-cg")
.eventType("priority-requisitions")
.owningApplication("shaper");
Response response = resource.create(subscription);
// create a subscription from a given offset
Cursor c0 = new Cursor("0", "000000000000002009", "priority-requisitions");
Cursor c1 = new Cursor("1", "000000000000002008", "priority-requisitions");
Subscription offsetSubscription = new Subscription()
.consumerGroup("roja-cg")
.eventType("priority-requisitions")
.owningApplication("anarch")
.readFrom("cursors")
.initialCursors(Lists.newArrayList(c0, c1));
// find a subscription
Subscription found = resource.find("a2ab0b7c-ee58-48e5-b96a-d13bce73d857");
// get the cursors and iterate them
SubscriptionCursorCollection cursors = resource.cursors(found.id());
cursors.iterable().forEach(System.out::println);
// get the stats and iterate them
SubscriptionEventTypeStatsCollection stats = resource.stats(found.id());
stats.iterable().forEach(System.out::println);
// list subscriptions
SubscriptionCollection list = resource.list();
list.iterable().forEach(System.out::println);
// list for an owner
list = resource.list(new QueryParams().param("owning_application", "shaper"));
list.iterable().forEach(System.out::println);
// delete a subscription
Response delete = resource.delete(found.id());
You can consume events via stream. Both the named event type and newer
subscription stream APIs are available via the StreamProcessor
class.
A StreamProcessor
accepts a StreamObserverProvider
which is a factory for
creating the StreamObserver
class the events will be sent to. The
StreamObserver
accepts one or more StreamBatchRecord
objects where each
item in the batch has been marshalled to an instance of T
as defined by
it and the StreamObserverProvider
.
A StreamObserver
implements a number of callback methods that are invoked
by the underlying stream processor:
onStart()
: Called before stream connection begins and before a retry is attempted.
onStop()
: Called after the stream is completed and when a retry is needed.
onCompleted()
: Called when the client is finished sending batches.
onError(Throwable t)
: Called when there's been an error.
onNext(StreamBatchRecord<T> record)
: Called for each batch of events. Also contains the current offset observer and the batch cursor.
requestBackPressure()
: request a maximum number of emitted items from the stream.
requestBuffer()
: Ask to have batches buffered before emitting them from the stream.
The interface is influenced by RxJava
and the general style of onX
callback APIs. You can see an example in the
source called LoggingStreamObserverProvider
which maps the events in a
batch to plain strings.
The API also supports a StreamOffsetObserver
- the offset observer is given
to the StreamObserver
object with each onNext
call. Typically the offset
observer is used to provide checkpointing of a consumer's partition in the
stream.
To consume a named event type stream, configure a StreamProcessor
and run it:
// configure a stream for an event type from a given cursor;
// all api settings are available
StreamConfiguration sc = new StreamConfiguration()
.eventTypeName("priority-requisitions")
.cursors(new Cursor("0", "450"));
// set up a processor with an event observer provider
StreamProcessor processor = client.resources().streamBuilder()
.streamConfiguration(sc)
.streamObserverFactory(new LoggingStreamObserverProvider())
.build();
// consume in the background until the app exits or stop() is called
processor.start();
// configure a stream with a bounded number of events retries, keepalives, plus custom timeouts
StreamConfiguration sc1 = new StreamConfiguration()
.eventTypeName("priority-requisitions")
.cursors(new Cursor("0", "450"))
.batchLimit(15)
.batchFlushTimeout(2, TimeUnit.SECONDS)
.maxRetryAttempts(256)
.maxRetryDelay(30, TimeUnit.SECONDS)
.streamLimit(1024)
.connectTimeout(8, TimeUnit.SECONDS)
.readTimeout(3, TimeUnit.MINUTES)
.streamKeepAliveLimit(2048)
.streamTimeout(1, TimeUnit.DAYS);
// create a processor with an observer and an offset observer
StreamProcessor boundedProcessor = client.resources().streamBuilder()
.streamConfiguration(sc1)
.streamObserverFactory(new LoggingStreamObserverProvider())
.streamOffsetObserver(new LoggingStreamOffsetObserver())
.build();
/*
start in the background, stopping when the criteria are reached,
the app exits, or stop() is called
*/
boundedProcessor.start();
If no offset observer is given, the default observer used is
LoggingStreamOffsetObserver
which simply logs when it is invoked.
Subscription stream consumers allow consumers to store offsets with the server and work much like named event type streams:
// configure a stream from a subscription id;
// all api settings are available
StreamConfiguration sc = new StreamConfiguration()
.subscriptionId("27302800-bc68-4026-a9ff-8d89372f8473")
.maxUncommittedEvents(20L);
// create a processor with an observer
StreamProcessor processor = client.resources().streamBuilder(sc)
.streamObserverFactory(new LoggingStreamObserverProvider())
.build();
// consume in the background until the app exits or stop() is called
processor.start();
There are some notable differences:
The StreamConfiguration
is configured with a subscriptionId
instead of an eventTypeName
.
The inbuilt offset observer for a subscription stream will call Nakadi's checkpointing API to update the offset. You can replace this with your own implementation if you wish.
A subscription stream also allows setting the maxUncommittedEvents
as defined by the Nakadi API.
The default behaviour for all streaming consumers is to request a gzipped stream. This can
be changed to a plain stream by setting the Accept-Encoding
header to identity
on
StreamConfiguration
as follows:
StreamConfiguration sc = new StreamConfiguration()
// ask the server for unencoded data
.requestHeader("Accept-Encoding", "identity")
...;
A StreamObserver
can signal for backpressure via the requestBackPressure
method. This is applied with each onNext
call to the StreamObserver
and
so can be used to adjust backpressure dynamically. The client's underlying
stream processor will make a best effort attempt to honor backpressure.
If the user wants events buffered into contiguous batches it can set a buffer
size using requestBuffer
. This is independent of the underlying HTTP
stream - the stream will be consumed off the wire based on the API request
settings - the batches are buffered in memory by the underlying processor.
This is applied during setup and is fixed for the processor's lifecycle.
Users that don't care about backpresure controls can subclass the
StreamObserverBackPressure
class.
You can make healthcheck requests to the server:
HealthCheckResource health = client.resources().health();
// check returning a response object, regardless of status
Response healthcheck = health().healthcheck();
// ask to throw if the check failed (non 2xx code)
Response throwable = health.healthcheckThrowing();
// check with an expoential backoff retry
RetryPolicy retry = ExponentialRetry.newBuilder()
.initialInterval(1000, TimeUnit.MILLISECONDS)
.maxAttempts(5)
.maxInterval(3000, TimeUnit.MILLISECONDS)
.build();
health.retryPolicy(retry).healthcheckThrowing();
You can view the service registry:
RegistryResource resource = client.resources().registry();
// get and iterate available enrichments
EnrichmentStrategyCollection enrichments = resource.listEnrichmentStrategies();
enrichments.iterable().forEach(System.out::println);
// get and iterate available validations
ValidationStrategyCollection validations = resource.listValidationStrategies();
validations.iterable().forEach(System.out::println);
You can view service metrics:
MetricsResource metricsResource = client.resources().metrics();
// print service metrics
MetricsResource metricsResource = client.resources().metrics();
Metrics metrics = metricsResource.get();
Map<String, Object> items = metrics.items();
System.out.println(items);
Note that the structure of metrics is not defined by the server, hence it's
returned as as map within the Metrics
object.
Add sonatype to the repositories element in pom.xml
or settings.xml
to access snapshots:
<repositories>
<repository>
<id>sonatype-nexus-snapshots</id>
<name>sonatype-nexus-snapshots</name>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
and add the project declaration to pom.xml
:
<dependency>
<groupId>net.dehora.nakadi</groupId>
<artifactId>nakadi-java-client</artifactId>
<version>0.19.0</version>
</dependency>
Add sonatype to the repositories
block for snapshots:
repositories {
maven {
url = 'https://oss.sonatype.org/content/repositories/snapshots/'
}
}
repositories {
maven {
url = uri("https://oss.sonatype.org/content/repositories/snapshots")
}
}
and add the project to the dependencies
block in build.gradle
:
dependencies {
implementation 'net.dehora.nakadi:nakadi-java-client:0.19.0'
}
dependencies {
implementation("net.dehora.nakadi:nakadi-java-client:0.19.0")
}
Add sontaype to resolvers
in build.sbt
to access snapshots:
resolvers += Opts.resolver.sonatypeSnapshots
and add the project to libraryDependencies
in build.sbt
:
libraryDependencies += "net.dehora.nakadi" % "nakadi-java-client" % "0.19.0"
The client prefers a fluent style, setters return this
to allow chaining.
Complex constructors use a builder pattern where needed. The JavaBeans
get/set prefixing idiom is not used by the API, as is increasingly typical
with modern Java code.
Any API call that returns a collection, including ones that could be paginated
expose Iterable contracts, allowing forEach
or iterator
access:
EventTypeCollection list = client.resources().eventTypes().list();
list.iterable().forEach(System.out::println);
Iterator<EventType> iterator = list.iterable().iterator();
while (iterator.hasNext()) {
EventType next = iterator.next();
System.out.println(next);
}
Pagination if it happens, is done automatically by the collection's backing
iterable by following the next
relation sent back by the server.
You can if wish work with pages and hypertext links directly via the methods
on ResourceCollection
which each collection implements.
Calls that result in HTTP requests are performed using resource classes. The results can be accessed as HTTP level responses or mapped to API objects.
You don't have to deal with HTTP responses from the API directly. If there
is a failure then a NakadiException
or a subclass will be thrown. The
exception will have Problem
information that can be examined.
Client exceptions are runtime exceptions by default. They extend from
NakadiException
which allows you to catch all errors under one type. The
NakadiException
embeds a Problem
object which can be examined. Nakadi's
API uses Problem JSON (RFC7807) to
describe errors. Local errors also contain Problem descriptions.
The client will also throw an IllegalArgumentException
in a number of places
where null fields are not accepted or sensible as values, such as required
parameters for builder classes. However the client performs no real data
validation for API requests, leaving that to the server. Invalid server
requests resulting in 422s will cause an InvalidException
to be thrown
instead.
In a handful of circumstances the API exposes a checked exception where
it's neccessary the user handles the error; for example some exceptions
from StreamOffsetObserver
are checked.
The project is built with Gradle and uses the
Netflix Nebula plugins. The ./gradlew
wrapper script will bootstrap the right Gradle version if it's not already
installed.
The main client jar file is build using the shadow plugin.
The main tasks are:
./gradlew build
: run a build and test./gradlew clean
: clean down the build ./gradlew clean shadow
: builds the client jarThe wiki page Internals has details on how the client works under the hood.
Please see the issue tracker for things to work on. The help-wanted has a list of things that would be pretty cool to have.
Before making a contribution, please let us know by posting a comment to the relevant issue. If you would like to propose a new feature, create a new issue first explaining the feature you’d like to contribute or bug you want to fix.
The codebase follows Square's code style for Java and Android projects.
MIT License
Copyright (c) 2016 Zalando SE, https://tech.zalando.com
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.