orkes-io / orkes-conductor-community

Orkes Conductor is a microservices orchestration engine.
Other
111 stars 27 forks source link

Race condition when indexTask with ES #51

Open qingdaojunzuo opened 11 months ago

qingdaojunzuo commented 11 months ago

Describe the bug Race condition found when indexTask with ES, the index requests' count sent by conductor server are not matched by received on ES side

Steps To Reproduce Steps to reproduce the behavior:

  1. Run multiple tasks in parallel
  2. Change ES index log to debug
  3. Logged requests in ES (Using ES7 , may same in ES6)
  4. Some task status finished in IN_PROGRESS rather than COMPLETED after workflow COMPLETED
  5. On the other hand, the persistency component status is right (using postgres as persistency)
  6. indexBatchSize is default as 1 and asyncIndexingEnabled is also default as false Even better - add a Loom video where you walk through the steps of the error.

Expected behavior All the task should in terminated status, such as COMPLETED/FAILED in ES rather than IN_PROGRESS

Device/browser

Additional context

  1. When debug log opened, following log printed right in our env, we have 3 index requests per task, which logged in ElasticSearchRestDAOV7.java -> indexTask, the average time cost is less than 30 ms

Time taken {} for indexing task:{} in workflow: {}

  1. On ES side, the received records count is less than 3 randomly

  2. Seem that, there is a race condition in function indexObject and indexBulkRequest,

`

private void indexObject(
        final String index, final String docType, final String docId, final Object doc) {

    byte[] docBytes;
    try {
        docBytes = objectMapper.writeValueAsBytes(doc);
    } catch (JsonProcessingException e) {
        logger.error("Failed to convert {} '{}' to byte string", docType, docId);
        return;
    }
    IndexRequest request = new IndexRequest(index);
    request.id(docId).source(docBytes, XContentType.JSON);

    if (bulkRequests.get(docType) == null) {
        bulkRequests.put(
                docType, new BulkRequests(System.currentTimeMillis(), new BulkRequest()));
    }

    bulkRequests.get(docType).getBulkRequest().add(request);
    if (bulkRequests.get(docType).getBulkRequest().numberOfActions() >= this.indexBatchSize) {
        indexBulkRequest(docType);
    }
}

private synchronized void indexBulkRequest(String docType) {
    if (bulkRequests.get(docType).getBulkRequest() != null
            && bulkRequests.get(docType).getBulkRequest().numberOfActions() > 0) {
        synchronized (bulkRequests.get(docType).getBulkRequest()) {
            indexWithRetry(
                    bulkRequests.get(docType).getBulkRequest().get(),
                    "Bulk Indexing " + docType,
                    docType);
            bulkRequests.put(
                    docType, new BulkRequests(System.currentTimeMillis(), new BulkRequest()));
        }
    }
}`

Thanks