akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka/current/
Other
1.26k stars 644 forks source link

Elasticsearch createWithPassThrough sometimes doesn't return WriteResult [1.0-M3] #1566

Closed danelkotev closed 4 years ago

danelkotev commented 5 years ago

Hi,

I upgraded to 1.0-M3 version of Alpakka ES and it seems like there is a bug when using ElasticsearchFlow.createWithPassThrough - I'm not receiving WriteResult for all bulk documents. Everything worked fine when I used 1.0-M1 version of Alpakka. I know you have made some changes in ES, and I think it's related.

ennru commented 5 years ago

Thank you for reporting, that is definitely possible.

ennru commented 5 years ago

Do you have retrying enabled? When retrying at least order isn't kept.

danelkotev commented 5 years ago

Hi, I do use retrying and the problem is not with the order. It seems like some messages are missing. I have a unit test that can be used to produce the issue. Anyways, when I not using retry mechanism, it seems like everything works as expected. On 1.0-M1 using retry mechanism yield the expected result.

ennru commented 5 years ago

Ok, so it seems related to retrying. Can you see (eg. in the log) if it actually retries in cases where you are missing WriteResults?

ennru commented 5 years ago

Please try creating a RetryLogic similar to RetryAtFixedRate and log and exceptions it receives in shouldRetry. It would be interesting to improve this back to the earlier behaviour.

danelkotev commented 5 years ago

I created a unit test with RetryAtFixedRate. This test reaches timeout. I don't see lost of messages in logs. Please review my test below and share your opinion.

"index bulk of document with passThrough" in assertAllStagesStopped {
      val indexName = "sink10"
      val sink = ElasticsearchFlow.createWithPassThrough[Book, Book](
        indexName = indexName,
        typeName = "doc",
        ElasticsearchWriteSettings() //.withBufferSize(5)
          .withRetryLogic(RetryAtFixedRate(maxRetries = 5,
          retryInterval = 1.millis))
      )

      val bookMessages = List(
        Book("Book 1"),
        Book("Book 2"),
        Book("Book 3"),
        Book("Book 4"),
        Book("Book 5"),
        Book("Book 6"),
        Book("Book 3"),
        Book("Book 8"),
        Book("Book 9"),
        Book("Book 10"),
        Book("Book 3"),
        Book("Book 11"),
        Book("Book 12"),
        Book("Book 13"),
        Book("Book 14"),
        Book("Book 3"),
        Book("Book 1"),
        Book("Book 2"),
        Book("Book 3"),
        Book("Book 1"),
        Book("Book 2"),
        Book("Book 3"),
        Book("Book 1"),
        Book("Book 2"),
        Book("Book 3"),
        Book("Book 1")
      )

      val bookToEs = Source(bookMessages)
        .map { book: Book =>
          val id = book.title
          println("title: " + book.title)
          // Transform message so that we can write to elastic
          WriteMessage.createCreateMessage(id, book).withPassThrough(book)
        }
        .via( // write to elastic
          sink
        )
        .map { result =>
          if (result.success) println("success")
          else println("failure")
        }
        .runFold(0)((count, msg) => count + 1)

      bookToEs.futureValue( Timeout(PatienceConfig(5 minute).timeout)) shouldBe 26
    }
danelkotev commented 5 years ago

In addition, I took a look at how RetryLogic works, and as far as I understood RetryLogic does not aggregate failedMessages and just overrides them when new failures arrive. When I changed retryPartialFailedMessages to update failedMessages using union: failedMessages = failedMessages union failedMsgs.map(_.message), the code didn't reach timeout (but this is not the solution). Maybe I don't understand how RetryLogic works, however I believe that failedMessages should hold leftRetries for every message, and in general failedMessages should be updated respectfully and not overridden.

ennru commented 5 years ago

Sorry for being so quiet, I've been traveling and on vacation. Your reasoning about failedMessages makes sense to me. I'll try to get back on this...

nivox commented 4 years ago

I've stumbled on the same issue.

In my case there are 2 document updates:

  1. analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary
  2. analyticJob~72679b24-49ca-3d7c-b37d-b114b31c6577~tileDwellings

Doc 1 is part of a bulk update along with other documents, while doc 2 is alone. Doc 2 fails and gets scheduled for retry. Right after, also doc 1 fails and gets scheduled for retry.

After some time the retry is carried on but only for doc 1.

Here are the actual logs:

13:50:25.382 [ DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - Posting data to Elasticsearch: ...
{"update":{"_id":"analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary","_index":"thinkin-jobs-prod-static","_type":"_doc"}}
{"doc": {...}},"doc_as_upsert":true}
13:50:25.384 DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - Posting data to Elasticsearch: {"update":{"_id":"analyticJob~72679b24-49ca-3d7c-b37d-b114b31c6577~tileDwellings","_index":"thinkin-jobs-prod-static","_type":"_doc"}}
{"doc":{...},"doc_as_upsert":true}

...

13:50:25.436 DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - response {
  "errors": true,
  "items": [{
    "update": {
      "_id": "analyticJob~72679b24-49ca-3d7c-b37d-b114b31c6577~tileDwellings",
      "_index": "thinkin-jobs-prod-static",
      "_type": "_doc",
      "error": {
        "index": "thinkin-jobs-prod-static",
        "index_uuid": "2DinhZHvR7mwuHGuubRWlw",
        "reason": "[_doc][analyticJob~72679b24-49ca-3d7c-b37d-b114b31c6577~tileDwellings]: version conflict, current version [4] is different than the one provided [3]",
        "shard": "2",
        "type": "version_conflict_engine_exception"
      },
      "status": 409
    }
  }],
  "took": 48
}
13:50:25.437 DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - retryPartialFailedMessages inflight=63 Vector(WriteResult(message=WriteMessage(operation=update,id=Some(analyticJob~72679b24-49ca-3d7c-b37d-b114b31c6577~tileDwellings),source=Some({...}),passThrough=...,version=None,indexName=Some(thinkin-jobs-prod-static),customMetadata=Map()),error=Some({"index":"thinkin-jobs-prod-static","index_uuid":"2DinhZHvR7mwuHGuubRWlw","reason":"[_doc][analyticJob~72679b24-49ca-3d7c-b37d-b114b31c6577~tileDwellings]: version conflict, current version [4] is different than the one provided [3]","shard":"2","type":"version_conflict_engine_exception"})))

...

13:50:26.025 DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - response {
  "errors": true,
  "items": [...
    , {
    "update": {
      "_id": "analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary",
      "_index": "thinkin-jobs-prod-static",
      "_type": "_doc",
      "error": {
        "index": "thinkin-jobs-prod-static",
        "index_uuid": "2DinhZHvR7mwuHGuubRWlw",
        "reason": "[_doc][analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary]: version conflict, current version [4] is different than the one provided [3]",
        "shard": "2",
        "type": "version_conflict_engine_exception"
      },
      "status": 409
    }
  }],
  "took": 639
}
13:50:26.025 DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - retryPartialFailedMessages inflight=62 Vector(WriteResult(message=WriteMessage(operation=update,id=Some(analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary),source=Some({...}),passThrough=...,version=None,indexName=Some(thinkin-jobs-prod-static),customMetadata=Map()),error=Some({"index":"thinkin-jobs-prod-static","index_uuid":"2DinhZHvR7mwuHGuubRWlw","reason":"[_doc][analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary]: version conflict, current version [4] is different than the one provided [3]","shard":"2","type":"version_conflict_engine_exception"})))

...

13:50:27.043 DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - retrying inflight=2 Vector(WriteMessage(operation=update,id=Some(analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary),source=Some({...}),passThrough=...,version=None,indexName=Some(thinkin-jobs-prod-static),customMetadata=Map()))
13:50:27.043 DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - Posting data to Elasticsearch: {"update":{"_id":"analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary","_index":"thinkin-jobs-prod-static","_type":"_doc"}}
{...},"doc_as_upsert":true}
13:50:27.148 DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - response {
  "errors": false,
  "items": [{
    "update": {
      "_id": "analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary",
      "_index": "thinkin-jobs-prod-static",
      "_primary_term": 3,
      "_seq_no": 6362956,
      "_shards": {
        "failed": 0,
        "successful": 2,
        "total": 2
      },
      "_type": "_doc",
      "_version": 5,
      "result": "updated",
      "status": 200
    }
  }],
  "took": 101
}

Looking at the code for retryPartialFailedMessages it seems to me that the line

failedMessages = failedMsgs.map(_.message) // These are the messages we're going to retry

causes the loosing of previous failed messages if a retry doesn't happen before the next failure.

I think that the desired behaviour should be:

failedMessages = failedMessages ++ failedMsgs.map(_.message)

The same reasoning should also be applied for the handleFailure function by turning

failedMessages = messages

into

failedMessages = failedMessages ++ messages

Furthermore to correctly handle the retry count for messages failed at different times, instead of having a global retryCount, it should somehow tied to each message.

I'm going to prepare a patch and test it on my code in the next days.

I see that #2031 is targeting the retry logic, will that be a rewrite of it thus solving this issue at the root (and making possible patches moot)?

Teudimundo commented 4 years ago

I experienced a similar issue. The PR by @nivox seems to fix it.

seglo commented 4 years ago

@nivox Thanks for the analysis and the PR!

I see that #2031 is targeting the retry logic, will that be a rewrite of it thus solving this issue at the root (and making possible patches moot)?

Using an upstream RetryFlow in the ES connector is a goal, but its design is inspired by retry use cases we've found in the ES and other Alpakka connectors. I think we can evaluate the PR at face value. I'll take a look next week when I'm back, but @ennru has the best context here.