Onyx-Java is designed to allow users who write Java code to use the Onyx Platform.
The package provides the following features:
[org.onyxplatform/onyx-java "0.1.0"]
Automatically generated documentation for the Java API can be found here.
Onyx-Java mirrors the Onyx Platform core API by providing a Java equivalent for each component of an Onyx workflow. This means the parts of Onyx such as Catalogs, Lifecycles, Jobs, Tasks, and so on, each have a new equivalent Java class.
These classes provide methods for adding entries to components, such as adding tasks to catalogs, edges to workflows, catalogs to jobs, etc.
Please note:
Onyx-Java does not validate semantic correctness of entries - parameter and type checking enforcement is left to Onyx Platform itself, which takes place at runtime.
This means that it is up to the user to refer to the Onyx Platform information model documentation to ensure that they are passing the expected parameters of types expected by the Onyx Platform.
Onyx-Java provides the following utilities for high-level package consumption:
The Onyx Platform is powerful, but using it can be complex - the platform allows for multiple configuration options, needs to make several calls during spin up and shutdown, and requires that persistent peer and environment metadata is available to core API calls, such as job execution.
Onyx-Java provides a utility class called OnyxEnv to simplify management of the environment and provide proxying methods to encapsulate this complexity. It provides convenience functions for setting up, controlling, and using the Onyx environment and the jobs that the environment contains.
The native language of the Onyx Platform is Clojure, and the ubiquitous data structure that it uses for all communication is an implementation of the IPersistentMap. While Java can manipulate these maps directly, it is often easier or preferable to use tools that mirror the natural way Onyx is used to using for argument manipulation.
Onyx-Java provides this ability in the utility class MapFns, which offers pure Java versions of useful Clojure map manipulation functions, such as get, getIn, assoc, dissoc, and others, making it easier to directly manipulate the arguments used by Onyx.
MapFns also provides support for loading edn files that contain map specifications, allowing the environment, peer configuration, and task specifications to be configured rather than coded. This promotes highly patterned design, allows high reuse, and guarantees efficiency.
The Onyx Platform provides excellent support for asynchronous task execution via a core-async plugin.
Core-async jobs in Onyx are common and stereotyped, and setting them up generally includes rewriting boilerplate code.
Onyx-Java provides the ability to set up these common core-async jobs up automatically, via the utility classes AsyncCatalog and AsyncLifecycles. These two classes encapsulate generating the correct catalog and lifecycle entries for setting up, using, and collecting the resulting output from async channels. This utility provides a way for users to avoid writing boilerplate code in this common scenario.
Onyx-Java allows pure Java objects to be added to workflows and jobs as tasks by providing a simple abstract base class for user classes to extend and a utility class to add objects created by those user classes to a catalog.
The abstract base class OnyxFn takes a Clojure map as the single argument and provides an abstract method that consumes the map. This abstract method is overridden by the user class.
Object based on the user class can then be added to the job catalog using the BindUtils, which will create a new instance of the class at runtime. BindUtils also provides methods for releasing the instances once they are no longer needed.
Onyx-Java is designed to be used in a way that mirrors as closely as possible the traditional use of Onyx Platform while being implemented in a way that reflects Java convention. This makes it easy for Java users to quickly feel comfortable using the package without obfuscating the power of the Onyx Platform itself.
Just as in Onyx Platform, users have the responsibility to set up jobs and job components, initialize environment and peer configurations, and control job execution;
however, unlike Onyx Platform, Onyx-Java users can do this using an API written entirely in Java that can be implemented in a conventional Java way.
This section outlines out a general approach for using Onyx-Java by providing a working bare-bones implementation.
More detailed examples can be found in the test documentation here.
import clojure.lang.IPersistentMap;
import clojure.lang.PersistentVector;
import org.onyxplatform.api.java.OnyxEnv;
import org.onyxplatform.api.java.OnyxMap;
import org.onyxplatform.api.java.OnyxVector;
import org.onyxplatform.api.java.Job;
import org.onyxplatform.api.java.Catalog;
import org.onyxplatform.api.java.Lifecycles;
import org.onyxplatform.api.java.utils.AsyncCatalog;
import org.onyxplatform.api.java.utils.AsyncLifecycles;
import org.onyxplatform.api.java.instance.BindUtils;
// Configure and start the Onyx Platform runtime.
//
OnyxEnv onyxEnv = new OnyxEnv("onyx-env.edn", true);
// Constuct a simple single-function job that uses
// a pure Java object for segment processing backed
// by core.async.
//
Job job = new Job(onyxEnv.taskScheduler());
job.addWorkflowEdge("in", "pass");
job.addWorkflowEdge("pass", "out");
Catalog c = job.getCatalog();
int batchSize = 5;
int batchTimeout = 50;
// Add core.async plugin catalog entries
AsyncCatalog.addInput(c, "in", batchSize, batchTimeout);
AsyncCatalog.addOutput(c, "out", batchSize, batchTimeout);
// Use the fully-qualified onyxplatform.test.PassFn class
//
BindUtils.addFn(c, "pass", batchSize, batchTimeout,
"onyxplatform.test.PassFn", MapFns.emptyMap());
// Add accompanying core.async plugin lifecycles
Lifecycles lc = job.getLifecycles();
AsyncLifecycles.addInput(lc, "in");
AsyncLifecycles.addOutput(lc, "out");
OnyxMap m = new OnyxMap().addKeywordParameter("test-input", "TEST");
OnyxVector inputs = new OnyxVector().addElement(m);
IPersistentMap job-meta = onyxEnv.submitAsyncJob(job, inputs);
PersistentVector output = AsyncLifecycles.collectOutputs(job, "out");
// Release the cached instance
BindUtils.releaseInstances(job);
Inclusion of Java task objects in a workflow requires that you provide a concrete subclass of OnyxFn:
package onyxplatform.test;
import clojure.lang.IPersistentMap;
import org.onyxplatform.api.java.instance.OnyxFn;
public class PassFn extends OnyxFn {
public PassFn(IPersistentMap m) {
super(m);
}
public Object consumeSegment(IPersistentMap m) {
return m;
}
}
Then, using the fully qualified name of your class and any constructor parameters, you use BindUtils to generate a matching catalog entry:
import org.onyxplatform.api.java.instance.BindUtils;
Catalog catalog = new Catalog();
String taskName = "pass";
String fullyQualifiedName = "onyxplatform.test.PassFn";
IPersistentMap ctrArgs = MapFns.emptyMap();
int batchSize = 5;
int batchTimeout = 50;
BindUtils.addFn(catalog, batchSize, batchTimeout, fullyQualifiedName, ctrArgs);
This will add the appropriate entry to bind task processing to a specific instance of your class that is loaded and called at job runtime.
You are responsible for instance management as your instances are cached in memory. This is accomplished via the release static methods provided by BindUtils:
BindUtils.releaseInstances(job);
There are parallel clojure namespaces and functions that provide affordances for clojure-based workflows;
i.e., Onyx-Java does not prohibit users from writing the system in Java while running tasks that are written in Clojure.
Copyright © 2016 Distributed Masonry
Distributed under the Eclipse Public License either version 1.0 or (at
your option) any later version.