Closed m-mohr closed 5 years ago
This seems related to #16. What I don't entirely get about reduce with callbacks, is that normally the reduce function takes n inputs and maps them to 1 output. As a result, you can't really apply multiple reducers in a row. For instance: max(min([1,2,3])) == min([1,2,3]) Of course you can apply a transformation to the inputs or outputs of the reducer, but for that you should be using apply. So I guess I'm also in favor of having a simpler version of reduce. As a nice side benefit, the process graphs will be simpler, as the callbacks are quite verbose if you only require 1 named function.
Are we talking about openEO reduce
(which is just a map in other languages) or about general reduce, which we call aggregate
(e.g. GEE Reducers)?
Most comments in this issue seems to apply to openEO aggregate
rather than openEO reduce
, i think.
Related to #6
So perhaps we should rename reduce to aggregate? Or did we already have that discussion? (I'm personally also confused by this difference in interpretation.)
I believe the question is triggered by the fact that I currently interpreted OpenEO reduce in the geopyspark implementation as what you call 'general reduce', so I don't support the map_along_axis behavior (yet).
I vote for better naming, more consistent with other languages and libraries. My proposal:
rename reduce
to map
, to be consistent with Javascript and GEE
remove apply
and apply_dimensions
- everything that these two do can be done using map
. I'm not sure there's much value in having separate support for map-that-keeps-cube-shape
rename aggregate
to reduce
to be consistent with Javascript and GEE; provide specific reducers and don't try to define generic reducer callbacks (yet)
With this change I also propose to specify that the order of iteration (in map
, filter
, reduce
etc.) is indetermined and therefore up to the backend; same for parallelism - optimizations might call the same callback from different threads (i.e. callback should not rely on its internal state in any way).
If we'll ever need ordered iteration, we should define separate dedicated processes for this (see GEE Iterate).
I don't get why openEO reduce
is equal to JS map
. openEO reduce
takes a set of elements and computes a single value for it, but JS map
takes a set of elements and computes an equally-sized set of elements. openEO reduce
does basically what JS reduce
does. openEO apply
does basically what JS map
does. reduce
reduces the dimensionality, aggregate
does not. aggregate
just reduces the resolution. So I'm not sure whether the proposal really works out, but I am open to get convinced. ;-)
cc @edzer as the naming mostly originates from his Initial proposal. See:
in openEO reduce
, dimensionality reduction happens in the implementation of the reducer function, which is mapped (applied) over each element in a set (the set is some subdivision of the cube, defined by the dimensions).
Hence openEO reduce(cube, {time}, min) is equal to javascript cube.map(timeseries => timeseries.reduce(min))
(if cube is a collection of w*h
per-pixel timeseries)
I'm in favor of limiting the scope of reduce to Miha's proposal. As this would solve the issue I tried to raise in #16 As for apply vs map: in most of the frameworks I use, it's called 'map', while in xarray it's called apply. The semantics seem the same, so it's a matter of choosing something that most people will find familiar. Merging apply and apply_dimensions is also an option for me, as proposed here: https://github.com/Open-EO/openeo-processes/issues/15#issuecomment-454758245
The aggregate_temporal process does seem to have semantics which are not yet covered by reduce, as it allows us to specify the time intervals that the function will aggregate over, which to me is different from reduce.
@m-mohr you’re right about aggregate - it’s a different thing.
And openEO reduce does have the right semantics in its description (in that it flattens dimensions into a scalar) - the only issue I/we have is with the definition of the reducer callback in the generic reduce
- in openEO the reducer calback is f(collection-of-items) returns aggregate
and is called once for every output scalar, while elsewhere the reducer is f(item, aggregate) returns aggregate
and is called once for every element of the input colllection.
@mkadunc : it is not clear to us what you refer to by item
vs. collection-of-items
. If with "elsewhere" you mean aggregate
, in my understanding there is a call for every element of the output collection, not input collection. Time for a short phone call with @jdries , @m-mohr you and me?
We do propose to keep the call-back functions generic, and not restrict them to a list of know functions as EE does. A back-end that does not support this could easily check that the callback function has only one node and that the node's name matches the available functions, and raise an error otherwise.
Elsewhere: reduce([1,2,3], rFun)
means rFun(rFun(rFun(null, 1), 2), 3)
, hence item
OpenEO: reduce([1,2,3], rFun)
means rFun([1,2,3])
, hence collection-of-items
Elsewhere: for sum reduction, the function is sum(agg, item) :: return agg+item
openEO: fur sum reduction, the function is sum(items) :: agg = 0; foreach(itm in items) agg+=itm; return agg
How do these "elsewhere" reducers reduce to the median, for example? That doesn't work with individual binary operations. In GEE it is var median = collection.reduce(ee.Reducer.median());
. Where is elsewhere by the way? Any documentation I could read through?
Edit: Or is the problem just the name? I know that JS reduce does something different than openEO reduce. JS reduce works the way you mentioned it, but JS reduce just doesn't allow all the things we need to do (e.g. median), right? apply and apply_dimension also do different things so I don't think they can replace the reduce process. What's your point here @mkadunc ? I think I don't quite understand what the background is.
You're right - median is a bit difficult, because it's not a good reduction operator, but there are several ways to implement it (some are more reduce-like and some not so much):
rFun(histogram, item) :: return histogram.add(item)
) and computing the median from the histogram at the endEven GEE doesn't always do exact median - see docs (this makes me think that their reducers are of the map-reduce variety):
For small numbers of inputs (up to maxRaw) the median will be computed directly; for larger numbers of inputs the median will be derived from a histogram.
Regarding my point here: I associate 'reduce' with operations on a massive amount of data as part of a Map-Reduce schema - this executes the 'map' function on a part of data, reduces the part into something smaller and returns it up the call chain for aggregating into the final result. Such a schema is inherently parallelizable, streamable and can be applied recursively. Furthermore, the same partition-map-reduce-aggregate schema can be used to compute several types of aggregated results on very different kinds of input, with very little case-specific code.
This last point enables the existence of a stable generic map-reduce infrastructure that deals with splitting the workload, scheduling the workers and aggregators etc., - all of this infrastructure will be the same regardless of whether you're counting pixels, totalling purchases, finding top search results, computing 3D histogram of colors or fitting a line to the input data. Now this infrastructure is not trivial to implement by hand, and there are many ways in which the backend that provides it can be optimized, especially if it knows something about the input data (e.g. the partitions of the input data will probably match the way the data is stored) and if it knows that the 'reducer' functions that it needs to call have some nice properties:
As a user, I can free myself from caring how the whole huge dataset is traversed and how many parallel threads or machines are required by the infrastructure to do that, and which indices are used (and which typos I've made in the for loops) - I let the infrastructure handle the iteration, and focus on doing the aggregation. (there are limitations, of course as in the case of medians, but in such cases there's always the option of doing the iterations manually...)
https://en.wikipedia.org/?title=Reduce_(higher-order_function)&redirect=no https://en.wikipedia.org/wiki/MapReduce
Furthermore, the 'per-item' reduce callback that is required for map-reduce is quite easy to write in a functional way, which is compatible with openEO's graph representation of processes. I have a feeling (but haven't tried) that writing a 'per-collection' reduce callback requires a more procedural approach, which is a bit difficult to do for now in openEO process graphs.
To sum up (in this order of importance):
Links:
Caveat: Most of these methods, while they do have a per-item reducer which enables streaming evaluation, don't allow parallel execution. In Javascript, every item needs to be added into the aggregate object, and there's no way for the user to combine aggregates - having such a combining function would allow parallel, recursive and distributed execution of the whole thing.
For efficient server-side implementation of general, parralellizable reduce operations, the kind of reducer that GEE has (median, for example), would need three functions:
Aggregate reduceItemToAggregate(Aggregate currentAgg, Item value) //called on recursion leaves Aggregate combineAggregates(Aggregate a, Aggregate b) //called on recursion branches ResultType computeResult(Aggregate a) //called on root before returning the result
A mean
reducer in this case would:
{total:number, count:number}
as 'Aggregate' typenumber
as Item
typenumber
as ResultType
reduceItemToAggregate= (agg, value) => {total: agg.total+value, count: agg.count+1};
combineAggregates= (a, b) => {total: a.total + b.total, count: a.count + b.count};
computeResult= (a) => a.total/a.count;
Note that in some cases, e.g. when reducer is a simple associative binary operator (+ * ), the three functions can be even simpler, for example, the sum
reducer boils down to:
//setup
plus = (a, b) => a + b;
identity = a =>a;
sumReducer = {
reduceItemToAggregate = plus;
combineAggregates = plus;
computeResult = identity;
}
I apologize for being long. I feel like I needed to write everything down to explain the motives behind my misunderstanding of the proposal ;)
Thanks @mkadunc for explaining the problem with the current OpenEO reducers. I agree with all these points. This:
a stable generic map-reduce infrastructure that deals with splitting the workload, scheduling the workers and aggregators etc.
Is actually what I use Apache Spark for, which also describes reducers as being commutative and associative operations.
Yes, this clarifies a lot. I'm however not convinced that the architecture of a back-end should prescribe how the openEO API should define reducers. If a back-end cannot compute the median or a set of quantiles, or only approximate them, it should say so.
I was just about to write similar things as @edzer just did.
I understand all your points @mkadunc and @jdries and it seems very reasonable to internally implement the processes using map reduce for example, but I'm not sure whether this is something that should be exposed to use actual user. Is it really straightforward to them to work on binary processes instead of n-ary processes? I'm wondering whether you can't simply optimize our processes on the back-end side. The back-end can analyze that there is a reducer call for sum and compute it using map-reduce anyway, can you?
GEE does var median = collection.reduce(ee.Reducer.median());
, which is easy to understand to a users. This is very similar to what we currently have specified, but internally they do other things and I think we can handle it that way, too. We (or maybe the corresponding back-ends) can for sure state in the process description that the median in certain cases is not exact. And just because the reducer is defined on a list doesn't necessarily means it needs to be the actual workflow at the back-end.
I'll define whatever the consortium agrees on and if a majority says lets do binary reducers instead of n-ary reducers, that's totally fine, but I think we should discuss that in a bigger group (e.g. dev telco). I'm not the person deciding this. ;)
@edzer I'm however not convinced that the architecture of a back-end should prescribe how the openEO API should define reducers. If a back-end cannot compute the median or a set of quantiles, or only approximate them, it should say so.
I fully agree that the architecture of any specific backend should not prescribe anything; I'm also not trying to influence how pre-defined operations such as median should be defined in the API or implemented on the back-end. The whole distinction between the two options that we're considering is only relevant for user-provided reducer callbacks; in all of the pre-defined reducers, we can leave the choice of how to implement it to the back-end (it will likely be either reduce-array-at-once or reduce-array-iteratively).
As for what openEO API defines - if the openEO API defines a process syntax that can be used with custom code to efficiently do massive reductions on the backend, i would view it more as a convenience for the user, not a prescription from the back-end to the API. In the same way as a language that supports foreach
loops and switch
statements doesn't do that because it would want to impose its internals to the user (though the internals are probably optimized compared to typical hand-written code), but because they're constructs that occur often and having language-support for them is convenient - it makes the user's code more readable (less boilerplate), it is less error-prone and usually allows for better compiler (or JIT) optimization. It's perfectly OK if openEO decides not to offer this convenience.
@m-mohr I'm wondering whether you can't simply optimize our processes on the back-end side
In order to "simply optimize our processes" on the back-end side - that's all simple for pre-defined reducer operations. It will get complicated if the backend needed to analyse the internals of the callback in order to infer that what's going on can be effectively represented as a map-reduce operation. It will be near impossible to do anything efficient in case the user-provided reducer is a UDF.
To demonstrate the difference in efficiency: imagine that the user tried to compute some reduction of e.g. NDVI for all Sentinel-2 acquisitions of Europe in 2018, and let's assume they decide to implement it as a UDF (one of the reasons we have UDFs is to allow extending openEO's predefined processes, so this is not an unreasonable assumption):
(sum, count)
and the single value item
- the memory required to evaluate the whole schema is 3 * 8 = 24 bytes
(if each number is 64-bits).8 * areaOfEurope / areaOfPixel * daysPerYear / daysPerRevisit ~ 8 * 1E7 km2 / 100 m2 * 365 days / 5 days = 58.4 terabytes
. There is no way for the back-end to make this operation parallel or to evaluate it in a streaming way, because the UDF is a black box.Again - the difference only applies to custom callbacks - for predefined reducers there's absolutely no difference. Comparing with GEE doesn't help much in this regard as they don't have custom callbacks, neither do they publish internals of implementation of their predefined reducers.
Aha, that's a good point, @mkadunc. I must admit I forgot about UDFs in this discussion and were focused to much on our pre-defined functions. I have no solution yet for this, but I'll definitely work on it and/or happily take proposals.
What about asking the user to declare if the UDF is unary, n-ary or reduce? This way a back-end can implement different strategies to optimize the processing (for proper optimization, additional information is required for n-ary and reduce).
@lforesta I don't think I fully understand the suggestion - even if a UDF provides this information, I don't see how the backend could use it to execute the process differently... The execution semantics will be defined by the user in the process graph, so there won't be much room for optimization.
Maybe an analogy with sort, which I think is very similar to reduce, might help understand what our choices are regarding different kinds of reduce support we might offer in openEO:
reduce_sum(dim, cube)
, reduce_median(dim, cube)
are analogous to predefined sort functions along the lines of sort_ascending(dim, cube)
and sort_descending(dim, cube)
.
sort_uppercase(sort_ascending(dim, cube))
) or pre-process the data, execute the preset operation, and then restore it back (e.g. sorting by absolute value would require pre-sort mapping a => [abs(a), a]
and post-sort restoring of a => a[1]
),reduce(dim, cube, Reducers.SUM)
, reduce(dim, cube, Reducers.MEDIAN)
— are analogous to generic sort functions with a fixed list of sort types, e.g. sort(dim, cube, Sorters.ASC)
and sort(dim, cube, Sorters.DESC)
.
sort
function - easy to find for the user, extensions happen in constants, not new functionssort(dim, cube, Sorters.compose(Sorters.UPPERCASE_ASC, Sorters.DESC))
gives you case-insensitive ascending ordering with case-sensitive tie-breaking). Reduction is a bit more difficult in this regard, because you cannot really chain them (as @jdries commented), but you can combine them to get more than one aggregate value in a single call, e.g. reduce(dim, cube, Reducers.combine(Reducers.MEAN, Reducers.MIN, Reducers.MAX))
.
reduce(dim, cube, array => sum_elements(array))
) — this would be analogous to custom sorters that take the whole array as input: e.g. sort(dim, cube, array => array.sort(Sorters.ASC).sort(Sorters.UPPERCASE_ASC))
.
sort(dim, cube, callback)
instead of apply_dimensions(dim, cube, callback)
— there's nothing sort-specific in the signature of the sort
function, as all sorting is done inside the callback.reduce(dim, cube, (agg, item) => agg + item)
) — these are analogous to sorting with custom comparator — sort(dim, cube, (a, b) => a.toUpper() > b.toUpper() ? 1 : a > b)
.
Thanks! I haven't read it yet (but will do!). I quickly want to ask: How would you personally rank the choices, @mkadunc?
5 > 3 > 2 > 1 > 4
preset operations
custom callbacks
@lforesta I don't think I fully understand the suggestion - even if a UDF provides this information, I don't see how the backend could use it to execute the process differently... The execution semantics will be defined by the user in the process graph, so there won't be much room for optimization.
@mkadunc I was thinking of basically being able to know the input/output of the custom-udf itself and being able to parallelize at least according to that, but I realized that this information is provided in the process that uses the custom-udf as a callback, e.g reduce(data: ('some_previous_process'), dimension: ('temporal'), reducer: (call my_udf)), so my comment is not relevant.
However, can I ask a clarification? in your previous example of a custom-NDVI used from openEO reduce, wouldn't the back-end know that each pixel (x,y,t) can be processed independently from all others? E.g. with the following sketched process graph: get_collection('sentinel2') filter_bbox('europe') filter_temporal('01-01-2018','01-01-2019') filter_bands('4', '8') reduce(data: 'filter_bands', dimension: 'spectral', reducer: 'custom-NDVI')
in this case the back-end knows the initial dimension of the dataset 'sentinel2' and (if) any dimension was removed/added by some process called before reduce. The reduce process here takes a cube with dimensions (x, y, t, s) and reduces on the spectral dimension, returning a cube in (x,y,t), so there would be no need to use 58 TB memory since the operation can be done for each pixel (x,y) for each time stamp t concurrently, no?
@mkadunc For the custom callbacks, would that be based on choosing a particular programming language in OpenEO, or would it be similar to the current UDF proposal, where a backend can choose what language to support?
@jdries As far as I understand Miha, it is language independent and the custom callbacks are exactly what we already have in the processes as of now.
@mkadunc First of all, thanks for the list of choices. I'm not quite sure whether our sort is the best analogy. ascending/descending is a boolean flag in many other environments as well and case (in)sensitive sort is not required as we have not defined sort for strings (yet). Also, sort without callbacks as we have them now can easily be optimized so that it doesn't necessarily need to work on the whole array. If we want sort with callbacks that's a new topic to be discussed separately, I think.
I'm not against 5, but I am concerned that this limits some use cases, for example median (yes, there's a workaround), but there could probably others that don't work with binary processes. So most flexible would be to allow both working on binary and n-ary basis. In the best case this is hidden from the user, they should not care about it, but I don't know yet how we can do that and with UDFs they need to know it anyway. We'll probably need to figure out what users need. Maybe we support both and wait for feedback? The current processes are just a first proposal and we need to iteratively improve them over time anyway.
Deciding for 5 requires a lot work in the processes again as we designed the processes to avoid binary processes as much as possible, e.g. divide, subtract and the reducers. We would probably need to revert this. Also, how does it work with sum, product and other similar processes that are usually n-ary? Do we need a n-ary and binary process for them? Or could we remove sum and replace it with plus/addition etc? This is not really a problem, except regarding the time (deliverables in March and May).
By the way: Binary UDFs would be very inefficient, right? For a million values this could potentially lead to about a million HTTP calls to the UDF API... 🤔
@lforesta
so there would be no need to use 58 TB memory since the operation can be done for each pixel (x,y) for each time stamp t concurrently, no?
My example was a bit more involved - your example computes NDVI, my example wanted to get a mean of all NDVIs over Europe in the whole year (the NDVI computation is not a problem - the mean reduction is).
Side node: My example included reductions in 3 dimensions (2 spatial and temporal) — i realize now that openEO currently supports only one at a time, so you'd need something like this to achieve the same result:
europeCube = collection("S-2_L1C").filterBBOX(europeBBOX).filterTime("2018");
// transform the cube of reflectances into a cube of NDVI values
cubeNdviXYT = reduce(cube, 'spectral', sample => NDI(sample["nir"], sample["r"]));
// aggregate all NDVI values into a single number
meanNdviYT = reduce(cubeNdviXYT, 'x', arr => UDF("myAggregatorUDF", arr)); //arr is 500000 elements
meanNdviT = reduce(meanNdviYT, 'y', arr => UDF("myAggregatorUDF", arr)); //arr is 400000 elements
meanNdvi = reduce(meanNdviT, 'time', arr => UDF("myAggregatorUDF", arr)); //arr is 73 elements
The above looks more reasonable. Some difficulty remains in the back-end implementation, because the backend is forced to produce the whole row (or column) of pixels over the area for passing to the UDF, which means that its internal splitting of the input data cannot be optimized much...
By the way: Binary UDFs would be very inefficient, right? For a million values this could potentially lead to a million minus one HTTP calls to the UDF API... 🤔
Sure, if the UDF is acessed over HTTP; if it's executed in the same process as the rest of the backend, it doesn't matter.
But this is not a problem (only) for reduce
...
@mkadunc First of all, thanks for the list of choices. I'm not quite sure whether our sort is the best analogy. ascending/descending is a boolean flag in many other environments as well and case (in)sensitive sort is not required as we have not defined sort for strings (yet). Also, sort without callbacks as we have them now can easily be optimized so that it doesn't necessarily need to work on the whole array. If we want sort with callbacks that's a new topic to be discussed separately, I think.
I'm not against 5, but I am concerned that this limits some use cases... Deciding for 5 requires a lot work in the processes again as we designed the processes to avoid binary processes as much as possible, e.g. divide, subtract and the reducers. We would probably need to revert this. Also, how does it work with sum, product and other similar processes that are usually n-ary? Do we need a n-ary and binary process for them? Or could we remove sum and replace it with plus/addition etc? This is not really a problem, except regarding the time (deliverables in March and May).
I'm OK with not having 5, either for sorting or reducing. Especially in light of the time constraints - option 2 is quite good enough for all our scenarios.
I also fully support having 4 (which we do already), but I am strongly against calling it reduce
in this case — map
would be much more appropriate.
Regarding support for binary addition - could we have a way to construct an array inline from two values? This way we kill a couple of similar birds with one stone... ;)
We discussed today that we might split into multiple methods and add a map_reduce process and also something similar like an apply_dimension, but one that can reduce the data cardinality. apply_dimension currently expects that the cardinality doesn't change. I'll make proposals for this until the telco next week.
Okay, we changed our opinion. We discussed the following:
The decision whether a process is binary (a commutative and associative process accepting two inputs / map-reduce) or n-nary (a process working on lists) is only relevant for UDFs (and to some extent complex callbacks with more than one process). All other cases can be optimized internally by the back-ends.
Therefore, we add a parameter to reduce
that allows switching between reduce with two inputs and arrays. This parameter defaults to "unary". I still need to find a name for the parameter and appropriate values. Depending on the parameter the UDFs will work and get different inputs. For all other cases we can determine and optimize based on the process graph. The reducers will not be changed and will still work officially on lists, but back-ends can do whatever they want under the hood. If this is too complicated for a user the alternative would be to define a separate reduce_udf process.
What we also discussed is that the reducers should also be able to reduce more than one value, e.g. extrema. So reduce needs to clarify this and remove the "reducer must return a single value" constraint. If multiple values are returned by a reducer a new dimension is created.
Both implemented, please review the new parameters (target_dimension and binary) in the process reduce
.
Discussed in the telco today, no objections (yet). If there are further things to discuss/change, please open a new issue.
I just realized that this issue also applies to all other processes, which accept reducers:
So each of them also needs this new parameter!
Many underlying software packages just allows a set of named reducers in the reduce operation. As far as I can see from @jdries implementation in the GeoPyspark back-end this is the case for them and it seems at least partially true for GEE. The current approach within the openEO processes is that this is a flexible callback (also in other operations than the reduce operation). This is very flexible, but not very often available natively so that custom implementations are required. What do you think about this case? I like the flexibility, but see that there are potentially issues in implementation.