yapp is yet another parallel pipeline. It is a zero dependency, header only library providing a multi-threaded implementation of the pipeline pattern. It enables users to define a series of stages where:
input
/output
type, provided that the resulting chain is feasible, e.g. output of stage 2 can be an input to stage 3. Correctness in this respect is checked during compilation.Using a pipeline can be as simple as:
auto pln = yap::Pipeline{} | dataReader | stage1 | stage2 | storeOutput;
pln.run(); // Non-blocking call.
yapp is provided as an alternative to pipelines in large general purpose libraries for multi-threading. If you want to:
you can try-out yapp, which sports:
<thread>
and friends.For a short introduction to the parallel pipeline pattern you can check this post.
This section outlines how to create a pipeline. To help drive our points, assume the existence of:
auto generator = [val = 0] () mutable { return val++; };
auto transform = [](int val) { return std::to_string(val); };
auto sink = [](std::string const& s) { std::cout << s << std::endl; };
The example above shows the following data-flow from generator to sink:
[void, int] -> [int, string] -> [string, void]
infeasible type transformations result in compilation errors.
To construct a pipeline with a type mandated by the input/output types of each stage, simply pipe the required stages into yap::Pipeline{}
:
auto ps = yap::Pipeline{} | generator | transform | sink;
The pipeLine
object above will be of type yap::Pipeline<void,int, int,string, string,void>
. A user does not have to specify the type that results from the specified transformations, since CTAD handles this process. A strongly typed pipeline can be moved into a another pipeline to be chained with additional stages at a later point, provided that a sink stage has not yet been attached.
There are cases where having a strong type is cumbersome or plainly of little benefit, e.g. when no special treatment is planned for pipeline objects of different types or when a pipeline member object needs to create little noise to the containing class. In such cases a user can use a pipeline through its polymorphic base class:
auto pp = yap::make_pipeline(generator, transform, sink);
The object returned from the make_pipeline
function, is a unique_ptr<yap::pipeline>
. This lower-case pipeline
base class, is the abstract definition of a pipeline and even though information on type transformations is lost, all operations are carried out in way consistent to its construction properties. A polymorphic pipeline cannot be chained further, since information on how to relay types is lost.
std::thread
treats its callable argument, callables provided as operations are move constructed in their respective stage. If an l-value is provided instead of a temporary object, the callable must be copy-constructible.This section describes the operations available to a pipeline. Bear in mind that depending on the construction method, you'd be calling an operation on a value or a pointer:
ps.operation(); // Strongly typed pipeline.
pp->operation(); // Polymorphic pipeline.
The run
method fires up the task processing threads on each stage. Note that since there's buffering between stages, unstable latency of stages is accounted for and data is pushed as forward as possible to be available for processing as soon as possible.
ps.run(); // Non blocking-call. Fires up worker threads and continues.
// A running pipeline will stop on its destructor.
No preconditions are imposed to the run
method apart from having a properly constructed object.
The stop
method only has effect on a running or paused pipeline. It ceases all processing threads, meaning after its call no invocation of the user provided operations is possible. Additionally, it clears the intermediate buffers, meaning non-processed data left in the pipeline will be discarded.
auto res = pp.stop();
if (yap::ReturnValue::NoOp == res)
{
std::cout << "The pipeline was not running" << std::endl;
}
pp.stop(); // No effect, we just stopped above.
The pause
method only has effect on running pipelines. It ceases all processing threads but unlike stop
, it does NOT clear the intermediate buffers, meaning a subsequent call to run
will resume processing.
auto res = pp.pause();
// ...
// Other task, e.g. non threadsafe modification of a stage.
// ...
pp.run(); // Non-processed data will resume processing.
A use case might be that of processing a fixed amount of data. When this need arises, the generator stage can inform the pipeline on the end of the data stream by throwing a GeneratorExit
exception. To run the pipeline until all data up to that point is processed, the consume
method exists:
auto gen = [val = 0] () mutable {
if (val > 1'000'000) throw yap::GeneratorExit{};
return val++;
};
auto pp = yap::make_pipeline(gen, stage1, stage2, stage3, sink);
// Run the pipeline until all data is processed. Blocking call.
pp->consume();
Consuming a pipeline leaves it in an idle state, with no threads running. run
can be called once again, assuming the generator can produce more data, but stop
or pause
have no effect. A pipeline whose generator throws yap::GeneratorExit
will anyways cease when all input is processed. The consume
method is a way to explicitly wait for data to be processed and make the pipeline "runable" again.
This section describes the tools to modify a pipeline's topology. Such a modification alters the linear flow of information from one stage to its subsequent, to provide properties that are attractive to specific computational patterns.
A filtering stage is one that can discard part of its input. As depicted below, S2
can control the input items to pass to subsequent stages, while being free to perform any type of transformation:
A callable returning yap::Filtered
is considered a filtering stage. The filtered object is just a wrapper around std::optional<T>
:
template <class T>
struct Filtered
{
std::optional<T> data;
Filtered() = default;
explicit Filtered(T &&data) : data(std::move(data)) {}
explicit Filtered(std::optional<T> &&data) : data(std::move(data)) {}
};
The std::optional
type was not used directly, since explicit use cases exist for nullopt
, for example a stage handling "empty" or "filler" inputs. To avoid propagating data further down the pipeline, simply place an empty optional in the Filtered<T>
return value of your stage. Conversely, filling the data
member with a value means passing the data to the next stage.
To provide explicit syntax to your pipeline declaration, a helper Filter
caller can be used. This is a "call forwarding" wrapper that can use either std::optional
or yap::Filtered
return types:
auto oddPrinter = yap::Pipeline{}
| gen
| yap::Filter(s2) // Explicitly declared filtering stage.
| intPrinter{};
A stage following a filter should accept a yap::Filtered<T>
input. It can safely assume that the data
member of the input is not nullopt
.
A hatching stage is one that can produce more than one outputs for a single input. In the diagram below, S2
is such a stage:
Note that this process is fundamentally different from producing a collection of objects. A typical example where you might want to hatch your input is when processing text files, say line by line. If the stage that produced the lines was to scan the whole text file and output a vector of text lines (strings) then you'd face the following deficiencies:
Such a situation can be greatly improved if the "text reader" stage produces its output in a piece wise fashion: Each line that is ready, gets immediately pushed to the next stage for processing.
To create a hatching stage use a callable that accepts yap::Hatchable
objects as input, a class with logic similar to yap::Filtered
that conveys how the stage does its processing:
yap::Hatchable
is convertible to bool
. true
means new input while false
(empty optional) means you're still processing the last input.bool
, e.g. an std::optional
or again yap::Hatchable
. The pipeline stops processing the same input when the output is false
, alternatively it keeps invoking the stage with an empty yap::Hatchable
to produce more output from the last input.auto exampleHatchingStage = [](yap::Hatchable<int> input)
{
std::optional<char> ret;
if (val)
{
// New Input from previous stage. Input data is non empty.
std::optional<int> &curInput = input.data;
assert(curInput);
}
else
{
// Keep processing the last input from previous stage. Input data is empty.
assert(!input.data);
}
return ret; // Returning a contextually "false" object, here empty
// optional, means the input won't be hatched any more and
// the stage can process new values produced from the
// previous stage.
};
To provide explicit syntax to your pipeline declaration, a helper OutputHatchable
caller can be used to denote the stage producing input for the hatching stage. This is a "call forwarding" wrapper that wraps the output of a stage into yap::Hatchable
return types:
auto hp = yap::Pipeline{}
| yap::OutputHatchable(generator) // The previous stage can be annotated.
| exampleHatchingStage
| sinkStage{};
Utilities that accompany the library are described here. Creating a huge suite of accompanying tools is a non-goal for this library, however there should be provision for patterns that are often encountered. In that spirit, the following tools are made.
A consumer is a generator that can use a standard iterable container. It handles:
Usage on a container c
is pretty straightforward:
// Input values are copied into the pipeline. Container is left untouched.
auto p1 = yap::Pipeline{} | yap::Consume(c.begin(), c.end()) | ...
// Input values are moved into the pipeline. Container has "moved-from" objects.
auto p2 = yap::Pipeline{} |
yap::Consume(std::make_move_iterator(c.begin()), std::make_move_iterator(c.end())) | ...
Examples can be found in the respective folder. Each example folder is accompanied by a README.md
file that documents it. In summary, the contents are:
These examples showcase simple usages of the library, and how it successfully manages:
We analyze an input file and output the k
most frequent words in the text.
Usage of the framework with stages of substantial computational effort, while passing 2d data from one stage to another. Serves as a profiling experiment to deduce next steps and drive optimization efforts.
This is a header only library. You can download and build the test suite which is using Google Test, but all you need to do to use the library is to point your build system to the include
folder.