Closed xitep closed 7 years ago
I have experimented a little bit with possibility of adding counters/aggregators to client API. From my point of view following API should be sufficient. Basically in all client functions there will be option to access the runtime context with aggregators. Check the following example:
// MapElements without counters, uses UnaryFunction lambda
Dataset<String> words = MapElements.of(dataset)
.using(s -> s.toLowerCase())
// MapElements with access to context, uses UnaryFunctor lambda
Dataset<String> mapped = MapElements.of(dataset)
.using((String input, Context<String> context) -> {
// use simple counter
context.getCounter("my-counter").increment();
// use custom aggregator type
Histogram aggr = context.getAggregator("custom", Histogram.class);
aggr.add(10L);
context.collect(input.toLowerCase());
})
Client API of the Aggregator
would look like this:
/**
* Aggregators accumulate values through an associative and commutative
* operation from user functions.
* Aggregators allow user to calculate statistics during the flow execution.
* <p>
* Aggregators are inspired by the Hadoop/MapReduce counters.
*
* @param <IN> Type of values added to aggregator.
* @param <OUT> Type of the accumulator result.
*/
public interface Aggregator<IN, OUT> {
/**
* @param value The value that is added to the aggregator.
*/
void add(IN value);
/**
* @return The value of a local instance of the aggregator.
*/
OUT getLocalValue();
/**
* Resets the current instance of aggregator to the default value.
*/
void resetLocal();
/**
* Used internally by an executor to merge collected aggregators from
* different tasks.
*
* @param other Instance of the merged aggregator.
*/
void merge(Aggregator<IN, OUT> other);
}
I think this should be sufficient for the client. I will add this feature to all operators and then open PR for client API changes.
If we really want to make the operation associative commutative, we should reflect this in the client API. Such an operation by definition has to have IN == OUT, the types have to be the same. What is the purpose of resetLocal
? And how do we plan to implement this on the executor side? I see aggregators as just another dataset, onto which a RSBK is applied in global window with early emitting. We might want to think about letting user specify some other kind of windowing on the aggregator,
Not sure if associative and commutative
is the right term to describe it, but when I have something like
class SetAggregator<T> implements Aggregator<T, Set<T>> {
void add(T value);
Set<T> getLocalValue();
}
the aggregator is actually associative and commutative despite IN != OUT
.
The resetLocal
method is clearly inspiration from Flink. And it's true I can't come up with an usage example. So I might omit this method from the interface for now.
It's true aggregators may be some kind of another datasets. Although we're planning first to connect it with native Flink and Spark accumulators, so it's gonna be basically side output from each operator. And I am now totally not sure if we wanna support aggregator windowing in the first version. Seems to be quite troublesome feature.
@vanekjar very nice! thank you for the proposal. also many thanks for the example which made me think of the following:
MapElements
is now semantically equal to FlatMap
. I don't necessary like it at this point in moment but can live with the redundancy if this makes the availability of the aggregators consistent. (Maybe we can even drop FlatMap
as such a replace with the new MapElements
operator.getAccumulator(name, AccumulatorType.class)
method is motivating to allow introducing custom accumulators. That's quite cool!
resetLocal
and getLocalValue
in the client api; i guess on flink this can be used creatively since there operators are long running instances with a proper life-cycle. well, maybe i just fail to come up with a good use-case to justify these methods. did you have anything specific in mind?Anyway many thanks, I hope my feedback can trigger some more directions to look at.
@je-ik regarding "I see aggregators as just another dataset, onto which a RSBK is applied in global window with early emitting. We might want to think about letting user specify some other kind of windowing on the aggregator," ... that's an interesting idea. can you elaborate in more details? i think it doesn't have to be on impl level, just conceptually.
It is imaginable that we create another dataset for each aggregator, which could then be accessed by calling e.g.
Flow flow = Flow.create();
/* Now create the flow and register some named aggregator. */
Dataset<AggregatedValue> aggregator = flow.getAggregator("aggregator-name", AggregatedValue.class);
aggregator.persist(myAggregatorSink);
At the implementation level, this would imply that we are able to push the emitted values to be aggregated somewhere (that would be executor-dependent), so that we can create appropriate DataSource
when reading the data from the aggregator.
I'm not sure if I explained it well and of course, I see it from the perspective of kafka executor, where this would be kind of straightforward - just emit the metrics to another kafka topic, and then process it with RBK appropriately. The result is again Dataset
that can be manupulated just like any other Dataset
.
@xitep Thanks for your feedback!
I agree it is a bit troubling that certain operators that are supposed to return single value (besides MapElements
it is also ReduceByKey
) are having access to the context
now. In case of MapElements
vs FlatMap
it could be really solved by merging them into one. But that's not possible with RBK
. It could be also solved by using of some light-weight context allowing just access to counters and not to write
method. But I don't like this idea very much.
Regarding custom (user) defined aggregators. What exactly makes the auto-completion harder? It works for me pretty well. Anyway I think there shouldn't be any problem supporting arbitrary aggregator on any executor. Basic attribute of each aggregator is that it's serializable
(although it is now not enforced by implementing Serializable
interface because Kryo serialization is less limiting) and can merge after collecting with other aggregated values.
I agree with removing the resetLocal
method, I couldn't come up with any example of usage. Anyway something like getLocalValue
still needs to be there to allow aggregator to provide the output value at least for the executor internal purposes. And it can be useful for client too.
Last but not least question. I couldn't decide whether to name counters as aggregators
or accumulators
. Seems accumulators
are maybe more natural and standard way to name it?
+1 for accumulators
I think that we should disable access to collect
method of the context from operators that should not have it - what speaks against creating a context without collect
and then extending it with the collect
method and pass the extended interface to appropriate operators only?
Hi guys, I have thought a little about this and I must say that I think we can (and should) do it without the counters support at all. Let me explain:
So, what do we really mean when we say "we need accumulators" is in my opinion "we need more than just single output of an operator". There was a strict design decision in the past, that all our operators will have just a single output. I see it as a right decision, because it simplified a lot of things along the way. But - could we do anything to relax this requirement? And I think again - yes, we can :).
We could design a parallel outputs to the main output, by naming them (and therefore construct what looks like counter, but essentially is nothing more than another Dataset
). The code would then look like this:
Flow flow = Flow.create();
Dataset<Integer> input = ...;
Dataset<Integer> output = FlatMap.of(input)
.using((in, ctx) -> {
ctx.collect( /* do whatever transformation of `in` */ );
ctx.collect("input-elements", 1L);
})
.output();
Dataset<Long> inputElements = flow.getNamedStream("input-elements");
// now I can do whatever i want with this stream, I can window it as I wish, aggregate by a function
// of my choice and so on, and finally, persist the dataset where I wish
I think this approach has several benefits:
Executor executor = new SparkExecutor().useCountersFor("input-elements");
I'm aware there are still a few things that need to be clarified, by I see this approach much better, than forcing the users to write their business logic in two completely separate APIs.
Looking forward to your comments on this!
@vanekjar many thanks for your response.
getLocalValue
(but probably also a "reset" method). given a fixed set of "accumulators" known to the executors, i.e. ctx.counter("foo").increment()
the api could be split between the client side and the "supporting", runtime side.@je-ik
i think i do understand where you're heading. the idea is general and - as you implied - unifies what we've so far considered as two separate concepts. exactly, it turns into the direction of supporting multiple outputs per operator. the beam api does support this. would you like to evolve the idea even in more depth? (you're initial comment is already very explaining. 👍) at this moment i see at least the following questions:
many thanks again to both of you so far.
Hi @xitep, yes, I totally agree with the concept of Beam, with a few considerations, that come from the way Euphoria API works. So,
Dataset
s - this goes in the direction of tags
in the sense of BeamFlow
getTaggedStream
)Dataset
is left on the user code, so you can use it for bussiness logic (joining it with some "main" dataset) or monitoring and debugging (storing it into appropriate sink - e.g. elastic search)A little modified example, which covers the above topics:
Flow flow = Flow.create();
Dataset<Integer> input = ...;
NamedTag<Long> elementsTag = NamedTag.named("input-elements").typed(Long.class);
Dataset<Integer> output = FlatMap.of(input)
.using((in, ctx) -> {
ctx.collect( /* do whatever transformation of `in` */ );
ctx.collect(elementsTag, 1L);
})
.withNamedTags(elementsTag)
.output();
Dataset<Long> inputElements = flow.getTaggedStream(elementsTag);
// now I can do whatever i want with this stream, I can window it as I wish, aggregate by a function
// of my choice and so on, and finally, persist the dataset where I wish
@je-ik I totally agree that counters are just another output from an operator. I want to say your idea makes a great sense and it seems to be quite similar to mentioned additional outputs and side inputs from Apache Beam model. It sounds like counters could be really replaced by a more general and more powerful concept of parallel/additional outputs.
But there are still a few questions:
@vanekjar I agree with all of your points, but on the other hand I must mention one other caveat - by incorporating the accumulator API into euphoria core, we force all executors to implement this API. Either explicitly or at least to ignore them. If the possibility to ignore them would be left to the executor, then we have to mark this as optional (and therefore somehow prevent users from using the accumulators in business logic). If we decided to force executors to implement the counters, we go directly against one of Euphoria design goals - to simplify the adoption of new executors by implementing as few features as needed.
I'm not also sure it these "side" outputs are really as work intensive as you suggest, maybe we could open another issue and try to elaborate this a little deeper?
from my perspective:
getLocalValue
is actually undefined since the life-cycle of a UDF is kept intentionally undefined.) i don't thing we can approach "hang your business logic on these counters". this would require reliable additional outputs i believe. and yes, i think it can be executor optional. if i understood @vanekjar correctly, the accumulation of the values would be executor neutral; it merely has to poll for values if it desires to report them elsewhere (the executor understands a UDFs life-cyclie since it manages it.)just my 2cents on the state of the discussion so far. don't take it too seriously.
Client programs often have the need to gather and report metrics, e.g. counters, timers, gauges. The most obvious use-cases are debugging and monitoring. Currently, euphoria itself lacks any client side support in this regard. Actually, it's a question whether it makes sense to have native support through euphoria for metrics/aggregators or whether to leave it to 3rd-party APIs to take over here.
A typically example is to measure the execution time of a lengthy operation, like in the following example:
This example suggests, that euphoria's
Context
object provides a metrics API. However, to make this globally practical, it precludes that such a context object be available to most user defined functions, which is not the case at the moment.If we end up supporting a client side API for metrics, we'll need to think about about an SPI at the same time, such that different metric reporting backends/implementation can be hooked in independently of the actual flow execution engine.