Hi,
I try to use the BulkProcessor but I can't configure correctly.
I think that the problem is when configuring the handler, because the listener from the consumer has a different type to has expected on restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, handler).
I'am using the incorrect methods or parameters?
The BulkProcessor works fine, if I set builder.setConcurrentRequests(10), they execute 10 times, if I set 3, execute 3...
But after that, the process is blocking waiting for the lock.
I think that never execute the semaphore.release(); in BulkRequestHandler
I share some parts of code:
Regards.
Error after 10 execution of bulk
io.vertx.core.VertxException: Thread blocked at java.base@11.0.2/jdk.internal.misc.Unsafe.park(Native Method) at java.base@11.0.2/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194) at java.base@11.0.2/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885) at java.base@11.0.2/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1039) at java.base@11.0.2/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345) at java.base@11.0.2/java.util.concurrent.Semaphore.acquire(Semaphore.java:318) at app//org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:59) at app//org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) at app//org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:389) at app//org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:361)
Configuration of BulkProcessor
override fun get(): BulkProcessor {
logger.info("Build bulk processor")
val listener: BulkProcessor.Listener = object : BulkProcessor.Listener {
override fun beforeBulk(executionId: Long, request: BulkRequest) {
val numberOfActions = request.numberOfActions()
logger.debug("Executing bulk [{}] with {} requests",
executionId, numberOfActions)
}
override fun afterBulk(executionId: Long, request: BulkRequest, response: BulkResponse) {
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId)
} else {
logger.debug("Bulk [{}] completed in {} milliseconds",
executionId, response.took.millis)
}
}
override fun afterBulk(executionId: Long, request: BulkRequest, failure: Throwable) {
logger.error("Failed to execute bulk", failure)
}
}
val builder = BulkProcessor.builder(
{ request: BulkRequest, bulkListener: ActionListener<BulkResponse> ->
// Correct type for handler
var handler = Handler<AsyncResult<BulkResponse>> { it ->
logger.info(it.result().toString())
}
// ERROR
// I can not use bulkListener as handler because has a different type
// ( ActionListener<BulkResponse> instead of Handler<AsyncResult<BulkResponse>)
restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, handler)
},
listener
)
builder.setBulkActions(1000)
builder.setConcurrentRequests(10)
builder.setBulkSize(ByteSizeValue(5, ByteSizeUnit.MB))
builder.setFlushInterval(TimeValue.timeValueSeconds(60L))
builder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueSeconds(2), 3))
return builder.build()
}
Signature of methods
public interface RestHighLevelClient {
public void bulkAsync(BulkRequest bulkRequest, RequestOptions options, Handler<AsyncResult<BulkResponse>> handler);
package org.elasticsearch.action.bulk;
public class BulkProcessor implements Closeable {
public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
Hi, I try to use the BulkProcessor but I can't configure correctly. I think that the problem is when configuring the handler, because the listener from the consumer has a different type to has expected on restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, handler).
I'am using the incorrect methods or parameters?
The BulkProcessor works fine, if I set
builder.setConcurrentRequests(10)
, they execute 10 times, if I set 3, execute 3... But after that, the process is blocking waiting for the lock. I think that never execute thesemaphore.release();
inBulkRequestHandler
I share some parts of code:
Regards.
Error after 10 execution of bulk
io.vertx.core.VertxException: Thread blocked at java.base@11.0.2/jdk.internal.misc.Unsafe.park(Native Method) at java.base@11.0.2/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194) at java.base@11.0.2/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885) at java.base@11.0.2/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1039) at java.base@11.0.2/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345) at java.base@11.0.2/java.util.concurrent.Semaphore.acquire(Semaphore.java:318) at app//org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:59) at app//org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) at app//org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:389) at app//org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:361)
Configuration of BulkProcessor
Signature of methods