seznam / euphoria

Euphoria is an open source Java API for creating unified big-data processing flows. It provides an engine independent programming model which can express both batch and stream transformations.
Apache License 2.0
82 stars 11 forks source link

euphoria-flink: data type transparency #45

Open xitep opened 7 years ago

xitep commented 7 years ago

Execution engines can do much better at optimizations if they transparently know what types they are working with. Efforts in the Spark as well as the Flink community proof optimization potentials in this regard. As a layer above such execution engines Euphoria must provide type specific information to executors in order to stay relevant (in terms of performance). In this ticket we'll focus on Flink.

Motivation

The primary background for this ticket was triggered by an attempt to shave off some of the overhead mentioned in #13 and #14.

Experiment

Using opaque data types with general purpose serialization has considerable, negative effects on optimizations that Flink tries to apply by default. I was able to see the effect in an experiment, where the "core" operation of the flow is the following (basically just a windowed word-count):

      ReduceByKey
            .of(input)
            .keyBy(Pair::getSecond)
            .valueBy(e -> 1L)
            .combineBy(Sums.ofLongs())
            .windowBy(Time.of(shortInterval), Pair::getFirst)
            .output();

Changing Euphoria's flink batch executor such that it uses Flink's native and Flink's pojo based serializers for the types involved during the reduce operation, squeezed out about half of the original execution time of the program. The amount of data shuffled was approximately the same. Unfortunately, such an approach required me to explicitly provide the return type of the .keyBy function to Euphoria's FlinkExecutor's internals. I was not able to derive the return type of a lambda in an automatic manner without the user having to explicitly state it.

What to do next

horkyada commented 7 years ago

We shouldn't forget that to allow Flink to fully operate on serialized data, it needs to determine the right offset of the desired data. Flink does it through the possibility of setting the number of the field(s) in tuples and names of the filed(s) in POJO. E.g.:

 DataStream<MyPojo<String, Integer>> longTrends = prepared
        .keyBy("query")
        .timeWindow(longInterval, shortInterval)
        .sum("count");

This should be probably covered by Euphoria too.