The platform is a generic data ingestion, manipulation and retrieval framework. High level can be described by following scheme:
High level design document can be found here.
Not yet complete documentation can be found here. The documentation should grow over over time to cover all the aspects of the platform. PRs welcome!
First, let's introduce some glossary:
The scheme definition uses HOCON. As a short example we will show definition of data processing of a hypothetic e-commerce site. The site has some goods, some users and generates some events which describe how users interact with the goods. We will use protocol buffers for serialization.
First, let's define our data model. We will model the system which processes events coming from some source in given format and based on these events creates a model of user preferences.
entities {
# user entity, let's make this really simple
user {
attributes {
# some details of user - e.g. name, email, ...
details { scheme: "proto:cz.o2.proxima.example.Example.UserDetails" }
# model of preferences based on events
preferences { scheme: "proto:cz.o2.proxima.example.Example.UserPreferences" }
# selected events are stored to user's history
"event.*" { scheme: "proto:cz.o2.proxima.example.Example.BaseEvent" }
}
}
# entity describing a single good we want to sell
product {
# note: we have to split to separate attributes each attribute that we want to be able
# to update *independently*
attributes {
# price, with some possible additional information, like VAT and other stuff
price { scheme: "proto:cz.o2.proxima.example.Example.Price" }
# some general details of the product
details { scheme: "proto:cz.o2.proxima.example.Example.ProductDetails" }
# list of associated categories
"category.*" { scheme: "proto:cz.o2.proxima.example.Example.ProductCategory" }
}
}
# the events which link users to goods
event {
attributes {
# the event is atomic entity with just a single attribute
data { scheme: "proto:cz.o2.proxima.example.Example.BaseEvent" }
}
}
}
Next, after defining our data model, we need to specify attribute families for our entities. This definition is highly dependent on the access pattern to the data. Mostly, we have to worry about how are we going to read our data. Relevant questions are:
are we going to process the data in batch fashion to build some sort of model?
Let's describe our intentions as follows:
we want to be able to select some events to be stored in user's history and then list this history by time from newest to oldest
To be able to fulfill these requirements, we have chosen the following storages:
This will yield us the following setup for attribute families (some details are ommitted for simplicity):
attributeFamilies {
# we need this to be able to read user attributes 'details' and 'preferences' by user's key
user-random-access {
entity: user
attributes: [ "details", "preferences" ]
storage: "cassandra://"${cassandra.seed}/${cassandra.user-table}"?primary=user"
type: primary
access: random-access
}
# store incoming events to user's history
user-event-history-store {
entity: event
attributes: [ "data" ]
storage: "cassandra://"${cassandra.seed}/${cassandra.user-event-table}/
# this class defines how we transform incoming event to CQL
cqlFactory: cz.o2.proxima.example.EventHistoryCqlFactory
# this is filtering condition, we want to select only some events
filter: cz.o2.proxima.example.EventHistoryFilter
type: replica
access: write-only
}
# this family defines read access to the stored event history
user-event-history-read {
entity: user
attributes: [ "event.*" ]
storage: "cassandra://"${cassandra.seed}/${cassandra.user-event-table}"?primary=user&secondary=stamp&data=event&reversed=true"
# ignore this for now
converter: cz.o2.proxima.core.storage.cassandra.DateToLongConverter
type: replica
# we will not explicitly modify this, it will be updated automatically by incoming events
access: read-only
}
# random access to products
product-random-acesss {
entity: product
attributes: [ "*" ]
storage: "cassandra://"${cassandra.seed}/${cassandra.product-table}
type: primary
access: [ random-access, batch-snapshot ]
}
# event stream storage
event-commit-log {
entity: event
attributes: [ "*" ]
storage: "kafka://"${kafka.brokers}/${kafka.events-topic}
# this is our commit log
type: primary
access: commit-log
}
# store events for batch analytics
event-batch-storage {
entity: event
attributes: [ "*" ]
storage: "hdfs://"${hdfs.authority}/${hdfs.event-path}
type: replica
access: batch-updates
}
}
cassandra {
seed = "cassandra:9042"
user-table = "user"
product-table = "product"
user-event-table = "user_event"
}
kafka {
brokers = "kafka1:9092,kafka2:9092,kafka3:9092"
events-topic = "events"
}
hdfs {
authority = "hdfs-master"
event-path = "/events"
}
By this definition, we have (somewhat simplified) working description of Proxima platform scheme for data manipulation, that can be fed into the ingestion/retrieval service and will start working as described above.
Generally, data are modelled as unbounded stream of updates to attributes of entities. Each update consists of the following:
UUID of the update
Each stream can then be represented as a table (a.k.a table-stream duality), which is essentially a snapshot of a stream at a certain time (in terms of Proxima platform called batch snapshot).
The platform contains maven compiler of scheme specification to java access classes as follows:
<plugin>
<groupId>cz.o2.proxima</groupId>
<artifactId>proxima-compiler-java-maven-plugin</artifactId>
<version>0.14.0</version>
<configuration>
<outputDir>${project.build.directory}/generated-sources/model</outputDir>
<javaPackage>cz.o2.proxima.testing.model</javaPackage>
<className>Model</className>
<config>${basedir}/src/main/resources/test-readme.conf</config>
</configuration>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<dependencies>
<!--
Use direct data operator access, see later
-->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>proxima-direct-compiler-plugin</artifactId>
<version>0.14.0</version>
</dependency>
<!--
The following dependencies define additional
dependencies for this example
-->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>proxima-core</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>proxima-scheme-proto</artifactId>
<version>0.14.0</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>proxima-scheme-proto-testing</artifactId>
<version>0.14.0</version>
</dependency>
</dependencies>
</plugin>
This plugin then generates class cz.o2.proxima.testing.model.Model
into target/generated-sources/model
.
The class can be instantiated via
Model model = Model.of(ConfigFactory.defaultApplication());
or (in case of tests, where some validations and initializations are skipped)
Model model = Model.ofTest(ConfigFactory.defaultApplication());
The platform offers various modes of access to data. As of version 0.14.0, these types are:
This operator is used when accessing data from inside single JVM (or potentially multiple JVMs, e.g. coordinated via distributed consumption of commit log). The operator is constructed as follows:
private DirectDataOperator createDataOperator(Model model) {
Repository repo = model.getRepo();
return repo.getOrCreateOperator(DirectDataOperator.class);
}
Next, we can use the operator to create instances of data accessors, namely:
RandomAccessReader
For instance, observing commit log can be done by
DirectDataOperator operator = model.getRepo().getOrCreateOperator(DirectDataOperator.class);
CommitLogReader commitLog = operator.getCommitLogReader(
model.getEvent().getDataDescriptor())
.orElseThrow(() -> new IllegalArgumentException("Missing commit log for "
+ model.getEvent().getDataDescriptor()));
commitLog.observe("MyObservationProcess", new LogObserver() {
@Override
public boolean onError(Throwable error) {
throw new RuntimeException(error);
}
@Override
public boolean onNext(StreamElement elem, OnNextContext context) {
log.info("Consumed element {}", elem);
// commit processing, so that it is not redelivered
context.confirm();
// continue processing
return true;
}
});
Creating BatchLogReader or RandomAccessReader is analogous.
First, create BeamDataOperator as follows:
BeamDataOperator operator = model.getRepo().getOrCreateOperator(BeamDataOperator.class);
Next, use this operator to create PCollection from Model.
// some imports omitted, including these for clarity
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
Pipeline pipeline = Pipeline.create();
PCollection<StreamElement> input = operator.getStream(
pipeline, Position.OLDEST, false, true,
model.getEvent().getDataDescriptor());
PCollection<KV<String, Long>> counted =
input
.apply(
Window.<StreamElement>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes())
.apply(
WithKeys.of(
el ->
model
.getEvent()
.getDataDescriptor()
.valueOf(el)
.map(BaseEvent::getProductId)
.orElse("")))
.apply(Count.perKey());
// do something with the output
CI is run only against changed modules (and its dependents) in pull requests. To completely rebuild the whole project in a PR push a commit with commit message 'rebuild'. After the build, you can squash and remove the commit.