oracle / coherence

Oracle Coherence Community Edition
https://coherence.community
Universal Permissive License v1.0
427 stars 70 forks source link

Detail question regarding StreamingAggregator methods and characteristics #127

Open javafanboy opened 3 months ago

javafanboy commented 3 months ago

In order to implement digest calculation of all binary data in a partitioned cache I started looking at the streaming aggregator and I found some aspects of it very suitable for the task. When starting to develop with it I realized there where a few things I was not sure I had understood correctly.

Firstly I am not sure how some of the characteristics really work:

PARALLEL - felt fairly obvious and and an option I generally always want. In combination with BY_MEMBER one aggregator will run on each storage enabled node and with BY_PARTITION as many aggregators will run on each storage enabled node as there are partitions owned by them each. A question mark for me is however if it is the "instance itself" that have the characteristic that will be parallelized or its "sub aggregator" created by calling the supply method? Assuming the initial aggregator have options PARALLEL and PER_PARTITION what options should the aggregato(s) created by "supply" have? Just PARALLEL or?? Is characteristics only valid/obeyed on the initial top-level aggregator object? If not what is the effect when applied to member or partition level aggregator objects??

PRESENT_ONY is however a weird one - when would an aggregator EVER be passed non-existing entries? I do not get the meaning of this one...

RETAIN_ENTRIES is not clear for me either - in my case I will, while an instance of an digest aggregator is running be holding on to passed in entries to be able to to order them by binary key in a canonical order before feeding them into the digest calculation (so that I get the same result irrespective of data insert order to the cache or whatever that could result in different processing order between different caches with the same data). Am I then supposed to specify this characteristic? My guess is that this is to isolate the aggregator from other operations mutating the entries it hold on to - something that is not a cosern for me as I only intend this operation to be used when no mutation is going on....

Also I would like to know HOW the aggregators that are run on the members (storage enabled nodes) in the cluster are "created" - initially I assumed the "supply" method to be called to provide an aggregator instance that is then serialized, sent and executed as many times as was requested (BY_MEMBER or BY_PARTITION) but from my program behaves I am now guessing the original passed in aggregator object itself is serialized and once it is received by each "member" the "supply" method is called to create one or more "sub aggregators" or?

As I understand the documentation the operations described by the StreamingAgregator interface can logically be group as belonging in the top-level aggregator (supply, combine and finalizeResult) or the sub-aggregators (executed once per member or partition) i.e. accumulate, getPartialResult - is this right?

I would like to understand if the "accumulate" method passing in a "streamer" always will be the one that is called and the default implementation of the method calls the single entry "accumulate" method?! And how does the "aggregate" method relate to the "accumulate" ones?

Finally is it correct to say that accumulation will always be just two "levels" first a single top level aggregator and then a second tier running either once on each member or once per partition - or is it possible to create deeper architectures where one aggregator run per member that in turn specify the "PER_PARTITION" would be able to aggregate a third level of aggregators that are run one per partition?

javafanboy commented 3 months ago

Partially answering my own question I can now, after adding an "identity" to my aggregators so my traces made more sense describe what I think is going on - please let me know if anybody who knows better disagrees :-)

  1. The "root" aggregator instance passed in to the "aggregate" method on the target cache receives on call to "supply" on the node where the aggregate call is performed creating what I call "root-0" aggregator.
  2. The "root" aggregator is serialized and passed to each member where "supply" is called once for each partition. As expected the partition level aggregators receive calls to "accumulate" once for each entry in the respective partitions.
  3. The per partition aggregators each receive a call to "getPartialResult" and the results are passed to the per member instances of the "initial" aggregator using its "combine" method.
  4. The per member instances of the "initial" aggregator each receive a call to getPartialResult that are sent back to the initiating node and passed to the "root-0" aggregator using the "combine" method.
  5. Finally the "finalizeResult" is called on the "root-0" aggegator and returned as the result of the "aggregate" call!

The above was partly what I expected except that:

I still do not understand the PRESENT_ONLY option and have only guessed that I, as I do not expect updates in the situations I calculate the digest, have to use RETAIN_ENTRIES and would appreciate some info about those as well as some input on if the PARALLEL, PER_PARTITION are only relevant to specify on the "root" aggregator or if they have any effect if specified on the "supplied" children as well?

javafanboy commented 3 months ago

Eventually I also figured out the meaning of RETAIN_ENTRIES - it seems like the aggregator framework makes some optimization and REUSES the entry object passed in to the accumulate method calls (I had naively assumed these where the actual backing map entries but this is not the case so unless one immediately "consumes them" this "flag" is needed...

Still not sure about PRESEN_ONLY and the question about how to use these settings affects sub aggregators...

mgamanho commented 3 months ago

Hi @javafanboy, that's good sleuthing, this looks correct to me.

RETAIN_ENTRIES saves on serialization cost at the (slight) risk of processing stale data. If your cache is stable you can do this. PRESENT_ONLY is for cases where entries may have been removed. That can absolutely be due to expiry when the backing map supports it. Most OOB aggregators use this setting as they work off of values.