This library is a toolkit for describing data transformation pipelines by compositing simple reusable components.
A typical data-pipeline
use-case can be:
data-pipeline
data model retains intermediate results from all steps, and each result is tagged with lineage metadataOn top of its core feature-set, complying to the data-pipeline
model comes with rather nice benefits:
The library requires Java 17+, in order to use it, add the following in your pom.xml
:
<dependency>
<groupId>tech.illuin</groupId>
<artifactId>data-pipeline</artifactId>
<version>0.17</version>
</dependency>
Additionally, some optional extension libraries can be added, at the time of this writing this includes:
data-pipeline-resilience4j
(for the resilience4j
integration)The main goals behind its design were:
A simplified high-level view of a data-pipeline
pipeline looks like the following diagram:
Step
functions, they return a Result
and are expected to have no side effect (think of them as almost-pure functions)Sink
functions, they return void
and are expected to induce side effects (e.g. database persistence, message queue push, etc.)flowchart LR
INPUT((input))
OUTPUT((output))
subgraph PIPELINE[Pipeline]
direction LR
subgraph STEP_PHASE[Steps]
direction LR
STEP_1(Step 1):::step
STEP_2(Step 2):::step
STEP_N(...):::step
end
subgraph SINK_PHASE[Sinks]
direction LR
SINK_1(Sink 1):::sink
SINK_N(...):::sink
end
end
INPUT --> PIPELINE --> OUTPUT
STEP_PHASE --> SINK_PHASE
STEP_1 --> STEP_2 --> STEP_N
classDef optional stroke-dasharray: 6 6;
classDef step stroke:#0f0;
classDef sink stroke:#f00;
We'll go through a quick example in order to demonstrate what data-pipeline
looks like in action.
The goal of this example is to have a simple pipeline for:
flowchart LR
INPUT((sentence))
subgraph PIPELINE[Pipeline]
direction LR
subgraph STEP_PHASE[Steps]
direction LR
TOKENIZER(Tokenizer):::step
MATCHER(Matcher):::step
end
subgraph SINK_PHASE[Sinks]
direction LR
MATCH_LOGGER(Match Logger):::sink
end
end
INPUT --> PIPELINE
STEP_PHASE --> SINK_PHASE
TOKENIZER --> MATCHER
classDef optional stroke-dasharray: 6 6;
classDef step stroke:#0f0;
classDef sink stroke:#f00;
First, we'll design the Tokenizer
step, with a basic regex split.
Three things to note here, which will remain true for the following pieces:
@StepConfig
, it will be identified at the pipeline build time@Input
annotationResult
subtype, here we chose to go with a dedicated record
public class Tokenizer
{
@StepConfig(id = "tokenizer")
public TokenizedSentence tokenize(@Input String sentence)
{
return new TokenizedSentence(Stream.of(sentence.split("[^\\p{L}]+"))
.map(String::toLowerCase)
.toList()
);
}
public record TokenizedSentence(
List<String> tokens
) implements Result {}
}
Next up, the Matcher
step, with a blacklist specified upon instantiation.
It will recover the tokenizer's output, and produce a Matches
record of its findings.
Note the @Current
annotation for requesting the currently known value for tokenized sentence.
There is more to be said about the semantics of this annotation, which we'll cover in details in the documentation section.
public class Matcher
{
private final Set<String> blacklist;
public Matcher(String... blacklist)
{
this.blacklist = Set.of(blacklist);
}
@StepConfig(id = "matcher")
public Matches match(@Current TokenizedSentence tokenized)
{
long wordCount = tokenized.tokens().stream().distinct().count();
Set<String> matches = tokenized.tokens().stream()
.filter(this.blacklist::contains)
.collect(Collectors.toSet())
;
return new Matches(wordCount, matches);
}
public record Matches(
long wordCount,
Set<String> blacklistMatches
) implements Result {}
}
Finally, our MatchLogger
sink works very similarly, except we need the @SinkConfig
annotation.
public class MatchLogger
{
private static final Logger logger = LoggerFactory.getLogger(MatchLogger.class);
@SinkConfig(id = "logger")
public void log(@Current TokenizedSentence tokenized, @Current Matches matches)
{
logger.info("Found {} unique tokens in {}, with {} blacklisted {}", matches.wordCount(), tokenized.tokens(), matches.blacklistMatches().size(), matches.blacklistMatches());
}
}
Now that we have all our building blocks, creating a Pipeline
is simply a matter of combining them.
The Pipeline
interface offers a builder initialization method, we'll start from there.
Pipeline<String> pipeline = Pipeline.<String>of("string-processor")
.registerStep(new Tokenizer())
.registerStep(new Matcher("mostly", "relatively"))
.registerSink(new MatchLogger())
.build()
;
Now, calling the pipeline with some sentences:
pipeline.run("This is a relatively short and mostly meaningless sentence.");
pipeline.run("This is a much longer sentence that should go through the blacklist unscathed.");
pipeline.run("Relatively cool objects (temperatures less than several thousand degrees) emit their radiation primarily in the infrared, as described by Planck's law.");
pipeline.run("The principles were deliberately non dogmatic, since the brotherhood wished to emphasise the personal responsibility of individual artists to determine their own ideas and methods of depiction.");
pipeline.run("The Mystical Nativity, a relatively small and very personal painting, perhaps for his own use, appears to be dated to the end of 1500.");
We should get the following output (given a simplelogger
or somesuch properly configured):
[main] INFO MatchLogger - Found 9 unique tokens in [this, is, a, relatively, short, and, mostly, meaningless, sentence], with 2 blacklisted [mostly, relatively]
[main] INFO MatchLogger - Found 13 unique tokens in [this, is, a, much, longer, sentence, that, should, go, through, the, blacklist, unscathed], with 0 blacklisted []
[main] INFO MatchLogger - Found 22 unique tokens in [relatively, cool, objects, temperatures, less, than, several, thousand, degrees, emit, their, radiation, primarily, in, the, infrared, as, described, by, planck, s, law], with 1 blacklisted [relatively]
[main] INFO MatchLogger - Found 23 unique tokens in [the, principles, were, deliberately, non, dogmatic, since, the, brotherhood, wished, to, emphasise, the, personal, responsibility, of, individual, artists, to, determine, their, own, ideas, and, methods, of, depiction], with 0 blacklisted []
[main] INFO MatchLogger - Found 21 unique tokens in [the, mystical, nativity, a, relatively, small, and, very, personal, painting, perhaps, for, his, own, use, appears, to, be, dated, to, the, end, of], with 1 blacklisted [relatively]
As pipelines may use resources (notably a ServiceExecutor
for the —optional— async sink execution), it is best to close it down when you are done using it (or consider using a try-with
pattern):
pipeline.close();
This project will require you to have the following: