Closed akashdw closed 7 years ago
@akashdw Is this on a historical or on realtime or on a broker?
If it's on realtime, maybe it's due to non-thread-safe use of aggregators by incremental index (see #3956, #3578)?
@gianm historical and realtime
@gianm note that SketchBufferAggregator.aggregate() is not threadsafe while SketchAggregator.aggregator() is threadsafe to handle concurrency from IncrementalIndex.
with v2, is it possible that BufferAggregator.aggregate() would get called concurrently ?
@himanshug, groupBy v2 still won't call aggregator methods concurrently.
I think this is because this map in SketchBufferAggregator assumes that sketches will never be relocated:
private final Map<Integer, Union> unions = new HashMap<>(); //position in BB -> Union Object
But groupBy v2 does relocate aggregation buffers when it expands its hash table. Can you try working around this by adding "bufferGrouperInitialBuckets" : 2147483647
in your query context and confirm if that fixes the problem? That will start out the hash table at max size and so it won't expand. Note that this will slow down queries that only need a small hash table (it takes time to allocate a big one) so you don't want to do this blindly for every query.
If that's what's going on, then for a real fix, groupBy could close and re-open BufferAggregators when it resizes its hash table. We could do this for 0.10.0.
@gianm yeah, in case of relocation that might not work. they are cached for performance , creating Union objects from buffers is expensive.
@akashdw is gonna try max size and verify if this is indeed the issue.
@gianm Yes, groupBy V2 query worked with "bufferGrouperInitialBuckets" : 2147483647
close/re-open aggregators on hashtable growth sounds good to me.
great, let's do that patch then. @akashdw @himanshug will one of you work on that or shall I?
@gianm Yes, Working on it.
thanks 🌟
I disagree with that fix. The aggregator is breaking contract by caching the position... The aggregator should change to not cache objects anymore. We can try to get the union object to not be expensive to make on a buffer.
also, open/close I don't believe will work because the current implementation assumes that hte object in its local cache is the "good" object. which is broken.
That sounds like a better fix if feasible.
@akashdw and @himanshug what do you think?
@cheddar by close/open I mean close, throw away, and re-create with factorizeBuffered. That should work around this, I think, although it'd be better for the aggregator not to store state inside itself.
@cheddar @gianm @akashdw I'm good with removing caching of Unions and that would be ideal, however that would require some level of performance testing at least. also, we can check with @leerho if things have changed in Union creation since that code was written.
Yeah, I'll chat with @leerho about whether a faster wrap method could be provided.
@gianm looking at the code again, it does lazily reinstantiate the objects, so I think what you described would actually work (I had thought it didn't). That said, the current contract on the interface is that init is called first and then it is used for storing. In the close/reopen, we would end up breaking that contract as well.
I think fixing SketchBufferAggregator
not to cache position is something long term fix b/c it requires some level of performance testing. close/reopen could be a short term fix? @gianm @cheddar @himanshug What do you think going forward with close/reopen approach for now?
If fixing SketchBufferAggregator isn't feasible in the 0.10.0 timeframe then I'm ok with closing and re-creating the buffer aggs as a workaround. So I guess it depends on how much work fixing the aggregator would be.
@gianm I'm not a fan of close/reopen because it is breaking the contract of the interface. Also, we realized that there is another thing that is going to break what we are doing which close/reopen will not do. For quantile sketches, they don't fit entirely in a fixed memory size, so we decided that when they need to grow beyond the available memory, we would pull them on heap and use the position to find them in an on-heap Map. close/open will break that.
So, I'm thinking that what we might need to do to support this type of activity is add an "ordinal" to the interface, which is an id that doesn't change even when the position changes. We should be able to introduce that using default method implementations on the interface so that it would be backwards compatible. Is it difficult to provide such an ordinal value with groupBy v2?
@cheddar What's the contract that is being broken? To me it seems fine to close one aggregator and then use a different one to keep aggregating on the same buffer, since I imagined that "close" is supposed to release resources (but not modify the buffer), and so it should be fine for some other aggregator instance to do further processing on the same buffer state, which I imagined would store all the state for the aggregator. groupBy v2 is written under this assumption. Reading over the BufferAggregator interface again it doesn't seem like this is spelled out one way or the other, so probably both of our interpretations could be read into what's there…
It's not easy for groupBy to pass in an ordinal like that, but it could pass along the key instead. The aggregator state is a value in the hash table, and so it does have a consistent key, which is a byte array. But tracking that still seems to me like something that BufferAggregators shouldn't do.
Are you suggesting that it's OK for buffer aggregators to store aggregation state inside themselves that is not actually part of the buffer? That seems really sketchy (sorry about the pun) for memory management -- in my mind the purpose of using the buffer is to manage memory better and avoid OOMs, which is defeated if the aggregators are keeping a lot of state outside of the buffer.
@gianm the contract that I feel is getting broken is that a particular aggregator instance will see the space with its init
method before it sees it with any subsequent method. In the close/reopen case, the reopened aggregator will start seeing things in get()
calls that it never saw in init()
calls.
So, the problem in the quantile sketch case is that the algorithm isn't actually amenable to a fixed size. So, we are taking a strategy of setting a fixed size that we expect to work in a vast majority of cases and having a fallback of pulling the thing on heap for the small percentage of the time that it needs it. The alternative to this strategy would be Druid actually exposing something that allows for variable sizes for the aggregators. So far, I've thought that variable sizes for the Aggregators is really too much and so opted for pulling it on heap instead. Does that make sense?
I think we can make variable sizes for buffer aggregators work. At least for groupBy, we can let aggregators grow into some of the space currently reserved for hashtable expansion. If the space fills up for either reason then we spill to disk. And after the spill we might reserve some extra space for aggregator growth if they needed it before the spill. That seems friendlier to me than letting them grow into the Java heap and hoping for the best.
@cheddar I guess , we could do below which will do the "close/reopen" and still follow all aggregator contract if we do the init() call but discard updates done to BB like described below....
oldBuffAgg.close();
//say above old buffer aggregator was working on oldBB at oldPos
BufferAggregator newBuffAgg = factory.factorize(..);
newBuffAgg.init(newBB, newPos);
//copy contents at oldPos, oldBB to newBB, newPos
//afterwards regular newBuffAgg.aggregate(newBB, newPos) calls
above assumes that BufferAggregator.close() does not destroy the BB contents. to be safer we can do following instead
//copy oldBB at oldPos contents to a temporary location
oldBuffAgg.close();
//now do the rest and copy temporary location's bytes to newBB at newPos
Or we could add a "relocate" method that tells the BufferAggregator that the sketch at a certain location has been relocated to a different location.
The growable stuff would be useful for theta sketches too, even though they don't strictly need it, it would still potentially save a lot of space.
Hmm, trying to keep things simple for 0.10.0 but hopefully nicer in the future, how do you all feel about the following:
For 0.10.0 add a relocate()
method to BufferAggregator, that when called, tells the BufferAggregator that its state has been relocated somewhere and it should throw away any position based caches it has.
Post 0.10.0, deprecate that relocate()
method, improve Union creation for theta sketches, and implement growable space for BufferAggregators, which together should get rid of the need for BufferAggregators to be storing state outside the buffer.
If we enable resizing, then it has to be done for TopN as well as any other place that chooses to use BufferAggregators. Also, enabling resizing will introduce questions of how to reuse space most effectively (as things resize, what happens with the old space, etc.), which basically means that we are more or less reimplementing malloc. That's the reason I've been shying away from wanting to do that. If the relocate method includes the start and end position, then I think we can make it work for the quantile sketch case as well. I guess the point there is that, in terms of contract, we are promoting the position
to be both the "ordinal" and the "memory position" by providing visibility into when it changes. We can do it in a compatible manner by using a default method that just does nothing, so yeah, I think we can work with that.
My only worry would be position collisions. I.e. if I have an object at position 0x8
and another one at position 0x48
, both cached in my Map. Then I get a call relocate(0x8, 0x48)
assuming that there will then be another call relocate(0x48, 0x96)
right after it, I won't be able to do anything good with the initial relocate(0x8, 0x48)
call, because it will collide with the old 0x48
and I won't know the new location of 0x48
yet.
We woudl have to resolve that by making the relocate()
call take a batch of relocations all at once, which also might be weird.
For the initial thought of the ordinal
, how disruptive would it be for groupBy to add an extra 32-bit int counter column that it stores alongside all payloads as the ordinal? Just to make it add a bit of extra value, we could also add a new metric on groupBy that emits at the end of a run saying the number of rows in the result set returned from the segment?
Also, enabling resizing will introduce questions of how to reuse space most effectively (as things resize, what happens with the old space, etc.), which basically means that we are more or less reimplementing malloc.
I think there's nothing wrong with a custom allocator, and there's good reasons: performance ("beat the gc") and avoidance of OOMEs. groupBy v2 can optionally start using the disk when it reaches its memory limit, and I think that's a good option to have. It also expresses its memory limit in bytes instead of rows like v1 did, so it's OOME resistant. Having state kept in the aggregator instances, outside of the buffer, defeats all that.
If the relocate method includes the start and end position, then I think we can make it work for the quantile sketch case as well.
I don't really like this as a long term solution (for reasons above) but I think it's ok for now, given that we don't yet have a good enough off-heap allocator to enable resizing.
My only worry would be position collisions.
At least right now, there's no need to worry about this, since groupBy is the only engine that relocates and it doesn't do "swaps". It moves things around in such a way that collisions wouldn't happen if the aggregator tracks its relocates in order. I guess that means if we want to implement an engine that does do swapping then we would need a new method on BufferAggregator, but we can deal with that later :)
For the initial thought of the ordinal, how disruptive would it be for groupBy to add an extra 32-bit int counter column that it stores alongside all payloads as the ordinal?
4 bytes per bucket is a lot, consider that if you have zero-one aggregators then each bucket is going to be just 8–16 bytes, so that's a 25–50% increase in footprint.
Ok. If you think that what I described with collisions won't happen, then I'm ok with adding relocate()
as an interim solution.
@cheddar @gianm relocate()
will throw away any position based caches, I don't think its gonna solve growing quantile
sketches problem? https://github.com/druid-io/druid/issues/4026#issuecomment-285202370
@akashdw, I think @cheddar means doing the void relocate(oldPosition, newPosition)
option, which should give quantiles what it needs.
I see. Going into implementation details, adding relocate()
to BufferAggregator will break existing custom extensions? Suggestions to avoid it?
@akashdw that's OK, 0.10.0 is a major release so backwards incompatible changes are ok. We just need to document it in the release notes.
Okay, then I can start making the relocate()
changes. Is there anything I need to be careful about? @himanshug @cheddar
one thing that I originally thought of doing at the time of Union caching was below....
keep Map<Integer, Union> cache where key is not position in BB but just a monotonically increasing ordinal. Implementation would write the ordinal as first 4 bytes in the BB at given position. So, that way even if you relocate the data , SketchBufferAggregator would still find the right Union to use. But this means storing additional 4 bytes . I did not do it at the time because relocation wasn't possible.
@akashdw @himanshug @cheddar are we decided on doing void relocate(oldPosition, newPosition)
for 0.10.0? Is one of you planning on working on the patch?
@gianm Yup, I'm working on the patch.
Okay cool thanks!
@akashdw, have you had a chance to work on or test this patch?
@gianm, done with the code, testing is in progress. If everything goes well, will create a PR tomorrow.
Great, thank you.
@cheddar @gianm @himanshug created a pr https://github.com/druid-io/druid/pull/4071 to address this issue, please check.
We are on druid 0.10 and seeing groupBy v2 failing intermittently with complex columns, we don't see those failures with groupByStrategy=v1.
Stacktrace : com.yahoo.sketches.SketchesArgumentException: Possible Corruption: Illegal Family ID: 0 at com.yahoo.sketches.Family.idToFamily(Family.java:184) ~[sketches-core-0.8.4.jar:?] at com.yahoo.sketches.theta.SetOperation.wrap(SetOperation.java:105) ~[sketches-core-0.8.4.jar:?] at com.yahoo.sketches.theta.SetOperation.wrap(SetOperation.java:91) ~[sketches-core-0.8.4.jar:?] at io.druid.query.aggregation.datasketches.theta.SketchBufferAggregator.getUnion(SketchBufferAggregator.java:92) ~[druid-datasketches-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT] at io.druid.query.aggregation.datasketches.theta.SketchBufferAggregator.aggregate(SketchBufferAggregator.java:71) ~[druid-datasketches-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT] at io.druid.query.groupby.epinephelinae.BufferGrouper.aggregate(BufferGrouper.java:203) ~[druid-processing-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.query.groupby.epinephelinae.SpillingGrouper.aggregate(SpillingGrouper.java:111) ~[druid-processing-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.query.groupby.epinephelinae.ConcurrentGrouper.aggregate(ConcurrentGrouper.java:163) ~[druid-processing-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.query.groupby.epinephelinae.ConcurrentGrouper.aggregate(ConcurrentGrouper.java:184) ~[druid-processing-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper$1.accumulate(RowBasedGrouperHelper.java:173) ~[druid-processing-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper$1.accumulate(RowBasedGrouperHelper.java:148) ~[druid-processing-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.java.util.common.guava.BaseSequence.accumulate(BaseSequence.java:46) ~[java-util-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.java.util.common.guava.LazySequence.accumulate(LazySequence.java:40) ~[java-util-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50) ~[java-util-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55) ~[java-util-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45) ~[java-util-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.java.util.common.guava.LazySequence.accumulate(LazySequence.java:40) ~[java-util-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50) ~[java-util-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55) ~[java-util-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45) ~[java-util-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50) ~[java-util-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.query.CPUTimeMetricQueryRunner$1.wrap(CPUTimeMetricQueryRunner.java:78) ~[druid-processing-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45) ~[java-util-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.query.spec.SpecificSegmentQueryRunner$2.accumulate(SpecificSegmentQueryRunner.java:83) ~[druid-processing-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50) ~[java-util-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.query.spec.SpecificSegmentQueryRunner.doNamed(SpecificSegmentQueryRunner.java:169) ~[druid-processing-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.query.spec.SpecificSegmentQueryRunner.access$200(SpecificSegmentQueryRunner.java:43) ~[druid-processing-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.query.spec.SpecificSegmentQueryRunner$3.wrap(SpecificSegmentQueryRunner.java:149) ~[druid-processing-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45) ~[java-util-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2$1$1$1.call(GroupByMergingQueryRunnerV2.java:228) ~[druid-processing-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at io.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2$1$1$1.call(GroupByMergingQueryRunnerV2.java:219) ~[druid-processing-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_112] at io.druid.query.PrioritizedListenableFutureTask.run(PrioritizedExecutorService.java:271) ~[druid-processing-0.10.0-0a471fb.jar:0.10.0-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_112] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_112] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
@himanshug @gianm