elastic / elasticsearch

Free and Open Source, Distributed, RESTful Search Engine
https://www.elastic.co/products/elasticsearch
Other
69.59k stars 24.63k forks source link

Potential mutex contention for bulk indexing ? #57466

Open micoq opened 4 years ago

micoq commented 4 years ago

Currently, for a single bulk of new documents, each of them is added one by one internally into a shard (to Lucene and to the translog) by calling InternalEngine.index(Engine.Index index)

This method calls IndexWriter.addDocuments() and TranslogWriter.add(). However, these methods are both protected by a mutex (a lock or a synchronized block) which seems to be costly when you runs a single instance of Elasticsearch with a lot of CPUs and you use a lot of write threads: even with a lot of active shards, the threads tend to compete to write the documents into the shards.

This results in a lot of futex() syscalls (and context switches) on the server and caps the indexing speed.

Actually, InternalEngine.index() seems to be able to write more than one document per call but only for nested documents since the Engine.Index object can contain only one _source (or incoming document).

I don't know for the translog but for Lucene, adding a batch of documents with IndexWriter.addDocuments() if more efficient in a multi-threaded environment: Lucene can write multiple segments in parallel (in a single Lucene index) by allocating one DocumentsWriterPerThread() objet per incoming thread but the access to IndexWriter.addDocuments() still needs to be synchronized.

Another approach would be to stick a single write thread on each shard to avoid the lock. For the moment, the only solution seems to run more than one nodes on the server with less write threads per node and spread the active indices/shards to reduce the contention.

A made a quick test on Lucene (8.5.1) to illustrate this behavior on a 8 core CPU and 10M docs:

Batch size (docs)ThreadsTime (ms)
1120438
1212296
148157
167561 (best)
188570
10117568
1029868
1045631
1065530 (best)
1085667
100117407
10029331
10045417
10065380 (best)
10085504

Each document contains only one field (a 1kB string). The time includes a Lucene commit() after the indexing loop to ensure all the documents are written to the storage (SSD). The batch is just a LinkedList<Document> passed to IndexWriter.addDocuments() The indexing buffer is set to 256MB and merges are disabled (NoMergePolicy).

Elasticsearch version: 7.7.0

dnhatn commented 4 years ago

It will be hard for Elasticsearch to use the bulk method of IndexWriter. Each indexing operation must be performed under its _id lock to ensure that there is at most one revision per _id.

elasticmachine commented 4 years ago

Pinging @elastic/es-distributed (:Distributed/Engine)

jpountz commented 1 year ago

For reference, I noticed that with the TSDB track, indexing threads spend 3-3.5% of their time waiting on a lock in Translog#add. Here is a flame chart I generated with async profiler -e wall so that it would not only show time spent running but also waiting on locks. The red ellipse is the wait on the translog lock. tsdb_contention

jpountz commented 1 year ago

Here's what I'm observing: from time to time, indexing completely stops as I'm seeing indexing threads blocked on the read lock of the translog:

"elasticsearch[rally-node-0][write][T#1]" #82 daemon prio=5 os_prio=0 cpu=130999.67ms elapsed=258.21s tid=0x00007f38701447b0 nid=0x381f9 waiting on condition  [0x00007f38f52fe000]
   java.lang.Thread.State: WAITING (parking)
        at jdk.internal.misc.Unsafe.park(java.base@17/Native Method)
        - parking to wait for  <0x000000070e3fd280> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(java.base@17/LockSupport.java:211)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@17/AbstractQueuedSynchronizer.java:715)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(java.base@17/AbstractQueuedSynchronizer.java:1027)
        at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(java.base@17/ReentrantReadWriteLock.java:738)
        at org.elasticsearch.common.util.concurrent.ReleasableLock.acquire(org.elasticsearch.server@8.8.0-SNAPSHOT/ReleasableLock.java:43)
        at org.elasticsearch.index.translog.Translog.add(org.elasticsearch.server@8.8.0-SNAPSHOT/Translog.java:588)
        at org.elasticsearch.index.engine.InternalEngine.index(org.elasticsearch.server@8.8.0-SNAPSHOT/InternalEngine.java:1055)
        at org.elasticsearch.index.shard.IndexShard.index(org.elasticsearch.server@8.8.0-SNAPSHOT/IndexShard.java:1063)
        at org.elasticsearch.index.shard.IndexShard.applyIndexOperation(org.elasticsearch.server@8.8.0-SNAPSHOT/IndexShard.java:996)
        at org.elasticsearch.index.shard.IndexShard.applyIndexOperationOnPrimary(org.elasticsearch.server@8.8.0-SNAPSHOT/IndexShard.java:913)
        at org.elasticsearch.action.bulk.TransportShardBulkAction.executeBulkItemRequest(org.elasticsearch.server@8.8.0-SNAPSHOT/TransportShardBulkAction.java:354)
        at org.elasticsearch.action.bulk.TransportShardBulkAction$2.doRun(org.elasticsearch.server@8.8.0-SNAPSHOT/TransportShardBulkAction.java:219)
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(org.elasticsearch.server@8.8.0-SNAPSHOT/AbstractRunnable.java:26)
        at org.elasticsearch.action.bulk.TransportShardBulkAction.performOnPrimary(org.elasticsearch.server@8.8.0-SNAPSHOT/TransportShardBulkAction.java:286)
        at org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnPrimary(org.elasticsearch.server@8.8.0-SNAPSHOT/TransportShardBulkAction.java:137)
        at org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnPrimary(org.elasticsearch.server@8.8.0-SNAPSHOT/TransportShardBulkAction.java:74)
        at org.elasticsearch.action.support.replication.TransportWriteAction$1.doRun(org.elasticsearch.server@8.8.0-SNAPSHOT/TransportWriteAction.java:214)
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(org.elasticsearch.server@8.8.0-SNAPSHOT/ThreadContext.java:958)
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(org.elasticsearch.server@8.8.0-SNAPSHOT/AbstractRunnable.java:26)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17/Thread.java:833)

because the write lock is taken by a thread on the flush threadpool, busy waiting on a fsync to finish:

"elasticsearch[rally-node-0][flush][T#2]" #132 daemon prio=5 os_prio=0 cpu=7748.31ms elapsed=202.34s tid=0x00007f378c0172d0 nid=0x38347 runnable  [0x00007f3815264000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.FileDispatcherImpl.force0(java.base@17/Native Method)
        at sun.nio.ch.FileDispatcherImpl.force(java.base@17/FileDispatcherImpl.java:82)
        at sun.nio.ch.FileChannelImpl.force(java.base@17/FileChannelImpl.java:465)
        at org.elasticsearch.index.translog.Checkpoint.write(org.elasticsearch.server@8.8.0-SNAPSHOT/Checkpoint.java:202)
        at org.elasticsearch.index.translog.TranslogWriter.writeCheckpoint(org.elasticsearch.server@8.8.0-SNAPSHOT/TranslogWriter.java:604)
        at org.elasticsearch.index.translog.TranslogWriter.create(org.elasticsearch.server@8.8.0-SNAPSHOT/TranslogWriter.java:165)
        at org.elasticsearch.index.translog.Translog.createWriter(org.elasticsearch.server@8.8.0-SNAPSHOT/Translog.java:545)
        at org.elasticsearch.index.translog.Translog.createWriter(org.elasticsearch.server@8.8.0-SNAPSHOT/Translog.java:512)
        at org.elasticsearch.index.translog.Translog.rollGeneration(org.elasticsearch.server@8.8.0-SNAPSHOT/Translog.java:1668)
        at org.elasticsearch.index.engine.InternalEngine.rollTranslogGeneration(org.elasticsearch.server@8.8.0-SNAPSHOT/InternalEngine.java:2083)
        at org.elasticsearch.index.shard.IndexShard.rollTranslogGeneration(org.elasticsearch.server@8.8.0-SNAPSHOT/IndexShard.java:1408)
        at org.elasticsearch.index.shard.IndexShard$8.doRun(org.elasticsearch.server@8.8.0-SNAPSHOT/IndexShard.java:3745)
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(org.elasticsearch.server@8.8.0-SNAPSHOT/ThreadContext.java:958)
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(org.elasticsearch.server@8.8.0-SNAPSHOT/AbstractRunnable.java:26)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17/Thread.java:833)
jpountz commented 1 year ago

For reference, here is another stack trace that takes the write lock and busy waits on fsync to return before releasing the lock, hence stalling indexing.

"elasticsearch[rally-node-0][flush][T#2]" #132 daemon prio=5 os_prio=0 cpu=17532.07ms elapsed=1043.43s tid=0x00007ff6e8019530 nid=0x3af10 runnable  [0x00007ff8a83fe000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.FileDispatcherImpl.force0(java.base@17/Native Method)
        at sun.nio.ch.FileDispatcherImpl.force(java.base@17/FileDispatcherImpl.java:82)
        at sun.nio.ch.FileChannelImpl.force(java.base@17/FileChannelImpl.java:465)
        at org.elasticsearch.core.IOUtils.fsync(org.elasticsearch.base@8.8.0-SNAPSHOT/IOUtils.java:299)
        at org.elasticsearch.core.IOUtils.fsync(org.elasticsearch.base@8.8.0-SNAPSHOT/IOUtils.java:274)
        at org.elasticsearch.index.translog.Translog.copyCheckpointTo(org.elasticsearch.server@8.8.0-SNAPSHOT/Translog.java:317)
        at org.elasticsearch.index.translog.Translog.rollGeneration(org.elasticsearch.server@8.8.0-SNAPSHOT/Translog.java:1666)
        at org.elasticsearch.index.engine.InternalEngine.rollTranslogGeneration(org.elasticsearch.server@8.8.0-SNAPSHOT/InternalEngine.java:2083)
        at org.elasticsearch.index.shard.IndexShard.rollTranslogGeneration(org.elasticsearch.server@8.8.0-SNAPSHOT/IndexShard.java:1408)
        at org.elasticsearch.index.shard.IndexShard$8.doRun(org.elasticsearch.server@8.8.0-SNAPSHOT/IndexShard.java:3745)
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(org.elasticsearch.server@8.8.0-SNAPSHOT/ThreadContext.java:958)
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(org.elasticsearch.server@8.8.0-SNAPSHOT/AbstractRunnable.java:26)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17/Thread.java:833)
StephanErb commented 1 year ago

@jpountz for the contention you've identified here, would index.translog.durability: async make a difference? Or is that not an fsync causing issues here?

elasticsearchmachine commented 1 year ago

Pinging @elastic/es-distributed (Team:Distributed)