apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.41k stars 3.68k forks source link

remove unnecessary synchronization overhead from complex Aggregators #8031

Open himanshug opened 5 years ago

himanshug commented 5 years ago

Motivation

Many complex [Buffer]Aggregator implementations need to add synchronized access to internal data structures due to single-writer-multiple-reader concurrent usage of those during realtime indexing process where they are concurrently queried in addition to getting updated. However, that synchronization is totally unnecessary everywhere else but we pay its price anyway , for example at historical nodes while querying and in batch indexing tasks etc. Most recently this came up in https://github.com/apache/incubator-datasketches-java/issues/263 .

Proposed changes

I haven't really done a prototype yet but I "think" these changes should be doable.

Add following methods (with default implementations) to AggregatorFactory .

  public Aggregator factorize(ColumnSelectorFactory metricFactory, boolean isConcurrent)
  {
    return factorize(metricFactory);
  }

  public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, boolean isConcurrent)
  {
    return factorizeBuffered(metricFactory);
  }

And, replace all calls inside druid code from AggregatorFactory.factorize[Buffered](ColumnSelectorFactory) to AggregatorFactory.factorize[Buffered](ColumnSelectorFactory, boolean isConcurrent) with right value for boolean isConcurrent specified . IncrementalIndex would be made aware of its concurrency context (by changing existing variable concurrentEventAdd to isConcurrent and it being correctly specified in all places an IncrementalIndex instance is created ) so that it can set right value for isConcurrent when calling factorize[Buffered](..) Relevant complex aggregator such as thetaSketch can then override newly added methods to add synchronization only for cases where it is really needed.

Rationale

One other option would be that aggregator implementors get additional contextual information (e.g. the nodeType they are running on ) and based on that enable/disable synchronization. However, proposed approach is simpler to use for extension writers and takes away the guessing game. I also contemplated on adding an enum like

enum ConcurrencyContext {
  NONE
  MULTI_WRITE
  SINGLE_WRITE_MULTI_READ
 ...
 ..
}

and using it instead of boolean isConcurrent in newly introduced method arguments, but couldn't see any significant advantages of doing that for now.

Operational impact

None

Test plan (optional)

Existing unit/integration tests would cover the changes introduced.

Future work (optional)

Adjust relevant complex aggregator implementations to take advantage of newly added methods.

gianm commented 5 years ago

It sounds to me like it would work. Btw, I wonder how meaningful the overhead is of synchronization in the case of most complex aggregators -- is it noticeable or is it dwarfed by the complex aggrecomputations? If it is meaningful, a change like this would be good for the expected majority of cases (nonconcurrent access on historicals).

himanshug commented 5 years ago

I ran this benchmark

@State(Scope.Benchmark)
public class SynchronizeOverhead
{
  private int n = 1000000;

  @Benchmark
  @BenchmarkMode(Mode.AverageTime)
  @OutputTimeUnit(TimeUnit.MICROSECONDS)
  public void unsynchronizedAdd(Blackhole blackhole)
  {
    for (int i = 0; i < n; i++) {
      blackhole.consume(1);
    }
  }

  @Benchmark
  @BenchmarkMode(Mode.AverageTime)
  @OutputTimeUnit(TimeUnit.MICROSECONDS)
  public void synchronizedAdd(Blackhole blackhole)
  {
    for (int i = 0; i < n; i++) {
      synchronized(this) {
        blackhole.consume(1);
      }
    }
  }

  public static void main(String[] args) throws RunnerException
  {
    Options opt = new OptionsBuilder()
        .include(SynchronizeOverhead.class.getSimpleName())
        .warmupIterations(5)
        .measurementIterations(10)
        .forks(1)
        .build();

    new Runner(opt).run();
  }
}

which gave me...

Benchmark                              Mode  Cnt      Score     Error  Units
SynchronizeOverhead.synchronizedAdd    avgt   10  18509.829 ± 372.391  us/op
SynchronizeOverhead.unsynchronizedAdd  avgt   10   1828.303 ±  50.494  us/op

Sidenote/Caution: I did run it on my mac laptop(which is not ideal for running benchmarks) as I didn't have a linux or EC2 instance handy, but I did 5 warmup and 10 measurement iterations, so it is not too bad for high level understanding.

that says that there is about 15 ms overhead for 1mn lock/release on object locks. I am pretty sure this is negligible compared to time spent doing sketch operations in all those synchronized blocks. So, I don't think this change will have any measurable impact on performance overall. (In fact, I am not entirely sure but It is also possible that server jvm can further dynamically optimize the lock away when its JIT profiling discovers that synchronized block is never accessed concurrently, that will make the overhead go away altogether)

so, code for this proposal could only be useful for reducing some confusion for extension writers where this question keeps coming up that they have unnecessary synchronization overhead inside their aggregator implementations. It might be good enough for facts getting documented in this proposal so that those worries can be alleviated by just pointing those conversations to this page.

pdeva commented 5 years ago

Above benchmark doesn't take lock contention into account. The synchronized keyword has different performance characteristics based on how contended the lock is.

The JIT will convert from thin to fat locks if the lock shows too much contention. if the lock has no contention, the overhead of synchronized keyword is negligible, but not zero, which is indeed shown in the benchmark above.

himanshug commented 5 years ago

@pdeva this proposal is not about removing/modifying synchronizations where there is real concurrency. aggregator implementors are free to handle that independently using their preferred way.

this is about removing the synchronization overhead when aggregator is only accessed by single thread , but it appears that wouldn't be worth it.

Eshcar commented 5 years ago

that says that there is about 15 ms overhead for 1mn lock/release on object locks. I am pretty sure this is negligible compared to time spent doing sketch operations

This is not always correct. For example, an update of a theta sketch takes less than 10ns and when the sketch is very big it takes less than 5ns. Specifically, adding 1M uniques to a sketch takes less than 10ms. See https://datasketches.github.io/docs/Theta/ThetaUpdateSpeed.html. For these cases the overhead wrt the sketch operation is not negligible.

pdeva commented 5 years ago

i do think its worth removing the synchronization keyword even for non-contended locks. the overhead while little is still non-zero

himanshug commented 5 years ago

@Eshcar thanks for pointing to that benchmark, that is the performance in ideal condition and specific to thetaSketch and aggregation methods typically do a little bit extra work and not just sketch op for managing things . However, I stand corrected that synchronization overhead is not necessarily always negligible compared to sketch operations and would depend on different aggregator implementations.

@pdeva you are right that there is always a non-zero overhead even if little.

hmmm.. I think, optimization proposed here might provide slight performance improvement for queries where multi million rows are scanned/merged and there were multiple sketch aggregators in the query.

@gianm does that sound reasonable ?

gianm commented 5 years ago

@himanshug IMO something like this is worth doing even if its main purpose is making it clearer what concurrency behavior is expected of Aggregator and BufferAggregator impls. It has caused substantial confusion in code reviews in the past.

Btw, I believe isConcurrent would always be false today for current usages of BufferAggregator (they're never used at ingestion time, which is the only context that does unsynchronized concurrent access). For Aggregator it would of course be true when used in the ingestion time context.

himanshug commented 5 years ago

related to https://github.com/apache/incubator-druid/issues/8126 which removes usage of Aggregator from indexing code as well . I will add code changes for this proposal as a follow up to https://github.com/apache/incubator-druid/issues/8126