apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.37k stars 3.67k forks source link

Extension: oak-incremental-index: Low resource (RAM and CPU) incremental-index implementation using off-heap key/value map (OakMap) #9967

Open liran-funaro opened 4 years ago

liran-funaro commented 4 years ago

Motivation

The current incremental-index implementations (on-heap and off-heap) suffer from poor memory utilization and sub-optimal performance. In some ingestion scenarios, we observed 200% memory overhead and 70% runtime overhead that both are attributed to the GC mechanism. This is mainly due to the large number of metadata objects created by Java’s ConcurrentSkipList (CSL).

Proposed changes

We implemented an alternative incremental-index (OakIncrementalIndex) that has two main attributes that are different from the current implementations:

  1. It stores both keys and values off-heap (as opposed to the off-heap implementation that stores only the values off-heap).
  2. It is based on OakMap [1] instead of Java’s ConcurrentSkipList (CSL).

These two changes significantly reduce the number of heap-objects and thus decrease dramatically the GC’s memory and performance overhead.

This implementation was proposed before (#5698 and #7676). This issue expands on these with system-level experiments results, as well as more comprehensive component-level benchmarks results (as requested by the community). In addition to improved performance compared to older versions.

[1] Oak: a Scalable Off-Heap Allocated Key-Value Map. ACM Conference on Principles and Practices of Parallel Programming (PPoPP) ‘2020.

Rationale

Our implementation (OakIncrementalIndex) instantiates a sub-linear number of objects with respect to the number of rows in the incremental-index, as opposed to a linear number of metadata objects that are instantiated by CSL. For typical Incremental-Index sizes (e.g., the current flush threshold is 1M rows), this overhead is millions of Java metadata objects just for internal CSL use. In addition, an on-heap multi-dimensional key might include many small objects that increase the memory overhead even further, as opposed to OakIncrementalIndex that needs only one buffer object for many multi-dimensional keys.

Our experiments show that when using OnHeapIncrementalIndex and OffHeapIncrementalIndex, Java GC requires roughly 200% memory compared to the raw data size to achieve reasonable ingestion speed. Furthermore, this large number of objects also incur longer GC pauses (about 40% of the runtime in our experiments) as there are many long-living objects to traverse. OakIncrementalIndex has only 2% memory overhead and negligible GC runtime overhead.

We evaluated OakIncrementalIndex with comparison to OnHeapIncrementalIndex and OffHeapIncrementalIndex via system-level experiments and component-level benchmarks. The experimental setup and the results are depicted here.

The system-level experiments show improved ingestion memory and CPU efficiency. It uses 60% less memory and 50% less CPU-time to achieve the same performance. This translates to nearly double the system's ingestion-throughput with the same memory budget, and a 75% increase in throughput with the same CPU-time budget. The component-level benchmarks show almost 33% of the memory usage and 60% of the runtime (1.7x ingestion throughput) compared to the on-heap and off-heap implementations.

Test plan

We modified all the unit-test and benchmarks to test all the available incremental-index implementations (on-heap, off-heap, and Oak). All the unit tests passed successfully.

Operational impact

This change will not affect any existing clusters. It will work seamlessly and interchangeably with existing incremental index implementations. See our wiki’s usage section for more details.

ebortnikov commented 4 years ago

Hey Druid community,

Could we start discussing this issue please?

Thanks!

liran-funaro commented 4 years ago

We noticed a lot of Druid users run their workload on Amazon EC2. We want to point out that OakIncrementalIndex will not only improve performance but will also reduce operational costs by allowing the users to choose more affordable EC2 instances without sacrificing performance.

The figure below shows the operational cost of different required ingestion throughput on Amazon EC2. image

yuanlihan commented 4 years ago

Hi @liran-funaro, thanks for your work on this!

I am so excited about this improvement that I tried to apply the patch and rebuilt modules(exclude the benchmark mudule) on on-prem cluster. And I got a OOM error related with direct memory allocation.

2020-06-17T14:51:12,290 ERROR [[index_kafka_ds_name_a4e6ce8abc9e093_ofeaphml]-threading-task-runner-executor-4] org.apache.d
ruid.indexing.overlord.ThreadingTaskRunner - Exception caught while running the task.
java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:695) ~[?:1.8.0_181]
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) ~[?:1.8.0_181]
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) ~[?:1.8.0_181]
        at com.yahoo.oak.Block.<init>(Block.java:37) ~[oak-0.2.1.jar:?]
        at com.yahoo.oak.BlocksPool.prealloc(BlocksPool.java:143) ~[oak-0.2.1.jar:?]
        at com.yahoo.oak.BlocksPool.getBlock(BlocksPool.java:100) ~[oak-0.2.1.jar:?]
        at com.yahoo.oak.NativeMemoryAllocator.allocateNewCurrentBlock(NativeMemoryAllocator.java:213) ~[oak-0.2.1.jar:?]
        at com.yahoo.oak.NativeMemoryAllocator.<init>(NativeMemoryAllocator.java:64) ~[oak-0.2.1.jar:?]
        at com.yahoo.oak.NativeMemoryAllocator.<init>(NativeMemoryAllocator.java:53) ~[oak-0.2.1.jar:?]
        at com.yahoo.oak.OakMapBuilder.build(OakMapBuilder.java:84) ~[oak-0.2.1.jar:?]
        at org.apache.druid.segment.incremental.OakIncrementalIndex$OakFactsHolder.<init>(OakIncrementalIndex.java:418) ~[druid-processing-0.18.1.jar:0
.18.1]
        at org.apache.druid.segment.incremental.OakIncrementalIndex.<init>(OakIncrementalIndex.java:89) ~[druid-processing-0.18.1.jar:0.18.1]
        at org.apache.druid.segment.incremental.IncrementalIndex$Builder.buildOak(IncrementalIndex.java:512) ~[druid-processing-0.18.1.jar:0.18.1]
        at org.apache.druid.segment.incremental.IncrementalIndex$Builder.build(IncrementalIndex.java:458) ~[druid-processing-0.18.1.jar:0.18.1]
        at org.apache.druid.segment.incremental.IncrementalIndex$Builder.build(IncrementalIndex.java:445) ~[druid-processing-0.18.1.jar:0.18.1]
        at org.apache.druid.segment.realtime.plumber.Sink.makeNewCurrIndex(Sink.java:375) ~[druid-server-0.18.1.jar:0.18.1]
        at org.apache.druid.segment.realtime.plumber.Sink.<init>(Sink.java:179) ~[druid-server-0.18.1.jar:0.18.1]
        at org.apache.druid.segment.realtime.plumber.Sink.<init>(Sink.java:122) ~[druid-server-0.18.1.jar:0.18.1]
        at org.apache.druid.segment.realtime.appenderator.AppenderatorImpl.getOrCreateSink(AppenderatorImpl.java:406) ~[druid-server-0.18.1.jar:0.18.1]
        at org.apache.druid.segment.realtime.appenderator.AppenderatorImpl.add(AppenderatorImpl.java:250) ~[druid-server-0.18.1.jar:0.18.1]
        at org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver.append(BaseAppenderatorDriver.java:406) ~[druid-server-0.18.1.jar:0.18
.1]
        at org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver.add(StreamAppenderatorDriver.java:187) ~[druid-server-0.18.1.jar:0.1
8.1]
        at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:654) ~[druid-indexing-
service-0.18.1.jar:0.18.1]
        at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:277) ~[druid-indexing-service-
0.18.1.jar:0.18.1]
        at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:164) ~[druid-indexing-service-0.18.1.jar:0
.18.1]
        at org.apache.druid.indexing.overlord.ThreadingTaskRunner$1.call(ThreadingTaskRunner.java:209) [druid-indexing-service-0.18.1.jar:0.18.1]
        at org.apache.druid.indexing.overlord.ThreadingTaskRunner$1.call(ThreadingTaskRunner.java:149) [druid-indexing-service-0.18.1.jar:0.18.1]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]

It seems that the direct memory allocator of oak incremental index exceeds the MaxDirectMemorySize of the JVM process. Found that the maxBytesInMemory is calculated from heap memory by default while the oak incremental index will take it as available size of direct memory.

I solved this OOM issue by increasing MaxDirectMemorySize. In my opinion, it will be nice to have additional docs about direct memory calculation when users choose oak type incremental index. And the maxBytesInMemory seems a little ambiguous.

liran-funaro commented 4 years ago

Hi @yuanlihan. Thanks for trying our implementation. We can't wait to hear about your experience with OakIncrementalIndex. Did you witness ingestion speedup and/or resource utilization improvement? Please share your findings.

Regarding the issues you mentioned, thank you for pointing out the mismatch in the default configuration of maxBytesInMemory. We did not notice it was configured according to the on-heap memory. We are working on a solution to this configuration problem and we will publish an update soon. We will also address the OOM issue in this update.

yuanlihan commented 4 years ago

Hi @liran-funaro, I would like to post feedback about performance improvement here later.

But currently before performing further performance testing, I suffer from NPE errors for timeseries queries which scan memory data of latest 10 minutes. Note that I applied the patch to branch 0.18.1 and built successfully except the benchmark module.

"timeseries_ds_name_[2020-06-18T09:07:46.039Z/2020-06-18T09:17:46.039Z]@12302" daemon prio=5 tid=0x152 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at com.yahoo.oak.Slice.getAddress(Slice.java:172)
      at com.yahoo.oak.KeyBuffer.getAddress(KeyBuffer.java:9)
      at com.yahoo.oak.UnscopedBuffer.getAddress(UnscopedBuffer.java:132)
      at org.apache.druid.segment.incremental.OakIncrementalIndexRow.<init>(OakIncrementalIndexRow.java:52)
      at org.apache.druid.segment.incremental.OakIncrementalIndex$OakFactsHolder.lambda$transformStreamIterator$1(OakIncrementalIndex.java:482)
      at org.apache.druid.segment.incremental.OakIncrementalIndex$OakFactsHolder$$Lambda$389.770801012.apply(Unknown Source:-1)
      at com.google.common.collect.Iterators$8.transform(Iterators.java:794)
      at com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48)
      at org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter$IncrementalIndexCursor.reset(IncrementalIndexStorageAdapter.java:360)
      at org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter$IncrementalIndexCursor.<init>(IncrementalIndexStorageAdapter.java:264)
      at org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter.lambda$makeCursors$0(IncrementalIndexStorageAdapter.java:213)
      at org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter$$Lambda$381.1128483784.apply(Unknown Source:-1)
      at org.apache.druid.java.util.common.guava.MappingAccumulator.accumulate(MappingAccumulator.java:40)
      at org.apache.druid.java.util.common.guava.BaseSequence.accumulate(BaseSequence.java:44)
      at org.apache.druid.java.util.common.guava.MappedSequence.accumulate(MappedSequence.java:43)
      at org.apache.druid.java.util.common.guava.MappedSequence.accumulate(MappedSequence.java:43)
      at org.apache.druid.java.util.common.guava.FilteredSequence.accumulate(FilteredSequence.java:45)
      at org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
      at org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
      at org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
      at org.apache.druid.java.util.common.guava.Sequence.toList(Sequence.java:85)
      at org.apache.druid.query.ChainedExecutionQueryRunner$1$1.call(ChainedExecutionQueryRunner.java:124)
      at org.apache.druid.query.ChainedExecutionQueryRunner$1$1.call(ChainedExecutionQueryRunner.java:114)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at org.apache.druid.java.util.common.concurrent.DirectExecutorService.execute(DirectExecutorService.java:81)
      at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
      at com.google.common.util.concurrent.AbstractListeningExecutorService.submit(AbstractListeningExecutorService.java:58)
      at org.apache.druid.query.ChainedExecutionQueryRunner$1.lambda$make$0(ChainedExecutionQueryRunner.java:112)
      at org.apache.druid.query.ChainedExecutionQueryRunner$1$$Lambda$379.804059870.apply(Unknown Source:-1)
      at com.google.common.collect.Iterators$8.transform(Iterators.java:794)
      at com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48)
      at com.google.common.collect.Iterators.addAll(Iterators.java:357)
      at com.google.common.collect.Lists.newArrayList(Lists.java:147)
      at com.google.common.collect.Lists.newArrayList(Lists.java:129)
      at org.apache.druid.query.ChainedExecutionQueryRunner$1.make(ChainedExecutionQueryRunner.java:104)
      at org.apache.druid.java.util.common.guava.BaseSequence.accumulate(BaseSequence.java:39)
      at org.apache.druid.java.util.common.guava.LazySequence.accumulate(LazySequence.java:40)
      at org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
      at org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
      at org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
      at org.apache.druid.java.util.common.guava.LazySequence.accumulate(LazySequence.java:40)
      at org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
      at org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
      at org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
      at org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
      at org.apache.druid.query.CPUTimeMetricQueryRunner$1.wrap(CPUTimeMetricQueryRunner.java:74)
      at org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
      at org.apache.druid.query.spec.SpecificSegmentQueryRunner$1.accumulate(SpecificSegmentQueryRunner.java:87)
      at org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
      at org.apache.druid.query.spec.SpecificSegmentQueryRunner.doNamed(SpecificSegmentQueryRunner.java:171)
      at org.apache.druid.query.spec.SpecificSegmentQueryRunner.access$100(SpecificSegmentQueryRunner.java:44)
      at org.apache.druid.query.spec.SpecificSegmentQueryRunner$2.wrap(SpecificSegmentQueryRunner.java:153)
      at org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
      at org.apache.druid.java.util.common.guava.Sequence.toList(Sequence.java:85)
      at org.apache.druid.query.ChainedExecutionQueryRunner$1$1.call(ChainedExecutionQueryRunner.java:124)
      at org.apache.druid.query.ChainedExecutionQueryRunner$1$1.call(ChainedExecutionQueryRunner.java:114)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at org.apache.druid.query.PrioritizedListenableFutureTask.run(PrioritizedExecutorService.java:247)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)

and log

2020-06-18T17:32:49,884 ERROR [timeseries_shopee_sto_apm__tracking_http_live_[2020-06-18T09:23:46.039Z/2020-06-18T09:33:46.039Z]] org.apache.druid.quer
y.ChainedExecutionQueryRunner - Exception with one of the sequences! (java.lang.NullPointerException)

The NPE error occurs when the writeBuffer is null. And I would like to provide more info to address this issue.

liran-funaro commented 4 years ago

Thanks, @yuanlihan. We are looking into this NPE bug. We'll update once we will understand more.

liran-funaro commented 4 years ago

@yuanlihan I found the reason for the NPE. It is due to a bug in the Oak library. For now, until we update this PR, you can solve it by upgrading Oak version. Simply modify Oak's version in druid/processing/pom.xml file as follows, then recompile using maven. Note: If maven repository is not automatically updated for you, run the mvn command with -U flag.

diff --git a/processing/pom.xml b/processing/pom.xml
index 50ccc9eb6a..ebe850e480 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -35,7 +35,7 @@
         <dependency>
             <groupId>com.yahoo.oak</groupId>
             <artifactId>oak</artifactId>
-            <version>0.2.1</version>
+            <version>0.2.2</version>
         </dependency>
         <dependency>
             <groupId>org.apache.druid</groupId>
yuanlihan commented 4 years ago

Thanks @liran-funaro, I have solved the NPE error after upgrading Oak's jar.

ebortnikov commented 4 years ago

Hi Community,

Anybody up to reviewing this PR? Hopefully, the feature motivation is pretty clear.

Thanks!

liran-funaro commented 3 years ago

@yuanlihan Did you have time to continue evaluating our optimization. Our latest updates of #10001 solved all know issues in addition to addressing your review notes. We will also appreciate if you can check out our new design proposal (#10321) that will allow embedding this optimization as a Druid extension.

Mrxiashu commented 2 years ago

hello,@liran-funaro. In our tests: In the kafka-index uptake model, the performance of using oak is no better than onheap under the same data and the same flow, and the uptake rate decreases by 20%.

liran-funaro commented 2 years ago

@Mrxiashu It is great to hear that you tried our extension. Can you elaborate? What do you mean by uptake? Did you mean resource uptake?

Mrxiashu commented 2 years ago

@Mrxiashu很高兴听到您尝试了我们的扩展。你能详细说明一下吗?你说的吸收是什么意思?你的意思是资源占用?

I mean Write Rate
Currently, on-heap ingest rates can reach 4W/s, while Oak's bottleneck is 3W/s, no matter how much direct buffer memory is adjusted.

liran-funaro commented 2 years ago

@Mrxiashu Can you share your ingestion configuration? One optimization that comes to mind is tweaking the maxRowsInMemory and maxBytesInMemory. With Oak, I'd suggest maxRowsInMemory=1M and updating maxBytesInMemory to your hardware limitations.

Mrxiashu commented 2 years ago

@Mrxiashu你能分享你的摄取配置吗?想到的一种优化是调整maxRowsInMemorymaxBytesInMemory。使用 Oak,我会建议maxRowsInMemory=1M并更新maxBytesInMemory您的硬件限制。

Of course, on-Heap Configuration in kafka-ingest json: "tuningConfig": { "type": "kafka", "maxRowsInMemory": 2000000, "maxRowsPerSegment": 8000000, "maxTotalRows": 20000000, "reportParseExceptions": false }

oak Configuration in kafka-ingest json:

"tuningConfig": {
    "type": "kafka",
    "maxRowsInMemory": 2000000,
    "maxRowsPerSegment": 8000000,
    "maxTotalRows": 20000000,
    "reportParseExceptions": false,
     "appendableIndexSpec": {
                 "type": "oak",
                 "oakMaxMemoryCapacity": 34359738368,
                 "oakBlockSize": 33554432,
                 "oakChunkMaxItems": 512
}

Here, I tried to adjust the values of oakBlockSize and oakChunkMaxItems in the oak tree. After checking the source code, I still cannot understand the relationship between these parameters.

liran-funaro commented 2 years ago

@Mrxiashu Did you monitor your memory usage during the ingestion? What was your direct buffer memory allocation?

Mrxiashu commented 2 years ago

@Mrxiashu Did you monitor your memory usage during the ingestion? What was your direct buffer memory allocation?

kafka-index-task jvm info: -Xms3g -Xmx3g -XX:MaxDirectMemorySize=9g

In DirectMemory, 5.6 GB is used for mergeBuffer.

liran-funaro commented 2 years ago

@Mrxiashu Are these the settings and usage for the onheap or Oak?

Mrxiashu commented 2 years ago

@Mrxiashu Are these the settings and usage for the onheap or Oak?

They use the same configuration

Mrxiashu commented 2 years ago

@Mrxiashu Are these the settings and usage for the onheap or Oak?

In addition, I would like to ask whether the configuration experience of maxRowsInMemory and maxBytesInMemory in Oak is any reference.

and "oakMaxMemoryCapacity" "oakBlockSize" "oakChunkMaxItems" Whether the default values of these parameters need to be adjusted

liran-funaro commented 2 years ago

@Mrxiashu In Oak, maxRowsInMemory and maxBytesInMemory behaves the same as the OnheapIncrementalIndex. Are the offheap usage for Oak and Onheap the same? That doesn't seem right. Did you monitor the overall machine memory usage? The main benefit of Oak is its low memory and CPU footprint.

Mrxiashu commented 2 years ago

@Mrxiashu Are these the settings and usage for the onheap or Oak?

In addition, I would like to ask whether the configuration experience of maxRowsInMemory and maxBytesInMemory in Oak is any reference.

and "oakMaxMemoryCapacity" "oakBlockSize" "oakChunkMaxItems" Whether the default values of these parameters need to be adjusted

hello,@liran-funaro I have just configured monitoring for onheap and oak. This is a comparison of on-heap and direct-heap memory monitoring metrics within half an hour.

The comparison shows that the peak memory usage in the heap is similar and the performance outside the heap is the same.

So, I'm a little confused. I would prefer to see Oak's performance consume less memory at the same throughput. If you can help me, I would prefer to know how your parameters would be configured under the following specifications.

ECS:32U128G Write Rate:3w/s taskcount:10 task jvm : -Xms3g -Xmx3g -XX:MaxDirectMemorySize=9g

memory

liran-funaro commented 2 years ago

@Mrxiashu The plot you shared is not typical for Oak usage. I would expect a much higher direct heap size, and much smaller onheap size. I would like to help you debug this.

My working days are Sun to Thu. What are your working days? Sun to Thu or Mon to Fri?

Mrxiashu commented 2 years ago

@Mrxiashu The plot you shared is not typical for Oak usage. I would expect a much higher direct heap size, and much smaller onheap size. I would like to help you debug this.

My working days are Sun to Thu. What are your working days? Sun to Thu or Mon to Fri?

@liran-funaro I've seen that your previous performance tests were performed on parallel-index tasks, and I suggest that you use kafka-index to test oak performance. Also, in addition to the code migration, are there any performance improvements or bug fixes in the latest commit?

liran-funaro commented 2 years ago

@Mrxiashu

I've seen that your previous performance tests were performed on parallel-index tasks, and I suggest that you use kafka-index to test oak performance.

We did some tests with Kafka about a year ago and saw improved CPU and RAM consumption with similar performance, but less frequent flashes.

in addition to the code migration, are there any performance improvements or bug fixes in the latest commit?

Just migration.

Mrxiashu commented 2 years ago

frequent flashes @liran-funaro Does frequent flashes refer to the rate of persistence to disk?

liran-funaro commented 2 years ago

frequent flashes @liran-funaro Does frequent flashes refer to the rate of persistence to disk?

Yes

Mrxiashu commented 2 years ago

Hi @yuanlihan. Thanks for trying our implementation. We can't wait to hear about your experience with OakIncrementalIndex. Did you witness ingestion speedup and/or resource utilization improvement? Please share your findings.

Regarding the issues you mentioned, thank you for pointing out the mismatch in the default configuration of maxBytesInMemory. We did not notice it was configured according to the on-heap memory. We are working on a solution to this configuration problem and we will publish an update soon. We will also address the OOM issue in this update.

Recently, we tested the query performance of oak in kafka-index mode, and it performed very well. However, there is an Direct OOM error under a large amount of data query, and I don’t know if it is related to the above problem.

Mrxiashu commented 2 years ago

Hi @yuanlihan. Thanks for trying our implementation. We can't wait to hear about your experience with OakIncrementalIndex. Did you witness ingestion speedup and/or resource utilization improvement? Please share your findings. Regarding the issues you mentioned, thank you for pointing out the mismatch in the default configuration of maxBytesInMemory. We did not notice it was configured according to the on-heap memory. We are working on a solution to this configuration problem and we will publish an update soon. We will also address the OOM issue in this update.

Recently, we tested the query performance of oak in kafka-index mode, and it performed very well. However, there is an Direct OOM error under a large amount of data query, and I don’t know if it is related to the above problem.

@liran-funaro

liran-funaro commented 2 years ago

@Mrxiashu

Recently, we tested the query performance of oak in kafka-index mode, and it performed very well. However, there is an Direct OOM error under a large amount of data query, and I don’t know if it is related to the above problem.

The issue that you mentioned was fixed in previous versions. I don't think it is related. Can you please share the error you got?

takaaki7 commented 9 months ago

I'm struggling with heavy gc pressure of batch ingestion... Have there been any progress related to this issue?

liran-funaro commented 9 months ago

@takaaki7 This extension is no longer under development.

takaaki7 commented 9 months ago

I am hoping for this issue to be resolved... Looking at past PRs, the implementation and verification are mostly completed and it appears to be quite effective (though conflicts may occurs).

I encountered challenges with the performance of batch ingestion with a workload like the following: Input: 8billion rows Sharding type: hash Sharding Number: 3000 maxBytesInMemory: 600MB Servers: 144server (8core / 45GB RAM) Time to complete: 2.5hours

I have set fine shards, and with maxBytesInMemory being only 600MB, we're writing partial segments to disk at a very granular level like following log.

2023-11-06T05:45:29,443 INFO [[partial_index_generate_event_xxxx_2022_parq_3_pknobigd_2023-11-06T05:23:36.710Z]-appenderator-persist] org.apache.druid.segment.realtime.appenderator.AppenderatorImpl - Flushed in-memory data for segment[event_xxxx_2022_parq_3_2022-01-01T00:00:00.000Z_2023-01-01T00:00:00.000Z_2023-11-06T04:31:09.889Z_316] spill[15] to disk in [24] ms (172 rows).
2023-11-06T05:45:29,443 INFO [[partial_index_generate_event_xxxx_2022_parq_3_pknobigd_2023-11-06T05:23:36.710Z]-appenderator-persist] org.apache.druid.segment.realtime.appenderator.AppenderatorImpl - Segment[event_xxxx_2022_parq_3_2022-01-01T00:00:00.000Z_2023-01-01T00:00:00.000Z_2023-11-06T04:31:09.889Z_2296] hydrant[FireHydrant{queryable=null, count=15}] already swapped. Ignoring request to persist.
2023-11-06T05:45:29,472 INFO [[partial_index_generate_event_xxxx_2022_parq_3_pknobigd_2023-11-06T05:23:36.710Z]-appenderator-persist] org.apache.druid.segment.realtime.appenderator.AppenderatorImpl - Flushed in-memory data for segment[event_xxxx_2022_parq_3_2022-01-01T00:00:00.000Z_2023-01-01T00:00:00.000Z_2023-11-06T04:31:09.889Z_2296] spill[16] to disk in [28] ms (785 rows).
2023-11-06T05:45:29,472 INFO [[partial_index_generate_event_xxxx_2022_parq_3_pknobigd_2023-11-06T05:23:36.710Z]-appenderator-persist] org.apache.druid.segment.realtime.appenderator.AppenderatorImpl - Segment[event_xxxx_2022_parq_3_2022-01-01T00:00:00.000Z_2023-01-01T00:00:00.000Z_2023-11-06T04:31:09.889Z_2713] hydrant[FireHydrant{queryable=null, count=15}] already swapped. Ignoring request to persist.
2023-11-06T05:45:29,494 INFO [[partial_index_generate_event_xxxx_2022_parq_3_pknobigd_2023-11-06T05:23:36.710Z]-appenderator-persist] org.apache.druid.segment.realtime.appenderator.AppenderatorImpl - Flushed in-memory data for segment[event_xxxx_2022_parq_3_2022-01-01T00:00:00.000Z_2023-01-01T00:00:00.000Z_2023-11-06T04:31:09.889Z_2713] spill[16] to disk in [22] ms (54 rows).

Each time this spill down happens, serialization occurs, and dictionaries have to be rebuilt. If I raise the maxBytesInMemory any higher, the overhead for garbage collection increases, and "GC overhead limit exceeded" occurs. Moreover, even with current setting, GC threads using cpu heavily so that I can only execute tasks at about half the parallelism compared to the number of cores.

It's difficult to estimate the effects, but Intuitively I expect that merging this change could yield an improvement of more than tenfold.

If at least this refactoring PR were to be merged, it would become easier to add our own extensions. https://github.com/apache/druid/pull/12122

abhishekagarwal87 commented 9 months ago

@takaaki7 - For batch ingestion, have you tried SQL-based ingestion?