opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
12 stars 18 forks source link

Transition Flint index state to Failed upon refresh job termination #362

Closed dai-chen closed 4 weeks ago

dai-chen commented 4 weeks ago

Description

This PR addresses the issue where the index state incorrectly remains in "refreshing" after a streaming job has failed. The fix transitions the index state before Spark application exits on its best efforts. Ref: Flint index state transition diagram

Before the Changes

  1. FlintJob waits for a global lock in StreamingQueryManager using the awaitAnyTermination API. The Spark StreamExecution notifies all threads suspending on it first, and then triggers the listener and cleanup logic.
  2. Consequently, it's possible that main thread (FlintJob) completes first and does not wait for the index monitor or the listener and cleanup logic in the StreamExecution, as both are daemon threads.
Screenshot 2024-05-30 at 11 51 47 AM

After the Changes

  1. A new awaitMonitor API in FlintSparkIndexMonitor has been introduced to suspend the caller thread (main thread in FlintJob) and update the index state immediately upon resumption.
  2. As a result, FlintJob now wait for a specific stream execution and will be notified only after StreamExecution completes all listener and cleanup logic.
Screenshot 2024-05-30 at 11 49 46 AM

Sources that May Trigger the Termination of Stream Execution

TODO

  1. Support DROP index with FAILED state in SQL plugin. Tracked in https://github.com/opensearch-project/sql/issues/2705
  2. Persist error message of root cause in metadata log. Tracked in https://github.com/opensearch-project/opensearch-spark/issues/281
  3. Differentiate exception retryable or not and persist state code. Tracked in https://github.com/opensearch-project/opensearch-spark/issues/149

Testing

# EMR-S log:
24/05/31 22:14:10 INFO AppendDataExec: Data source write support FlintWrite(query_execution_result_glue...) committed.
24/05/31 22:14:10 INFO FlintSparkIndexMonitor: Awaiting index monitor for None
24/05/31 22:14:10 INFO FlintSparkIndexMonitor: Awaiting streaming job flint_glue_default_http_logs_await_test_3_index until terminated

# Simulate streaming job exception
PUT flint_glue_default_http_logs_await_test_3_index/_block/write

# EMR-S log:
24/06/01 22:24:31 ERROR FlintSparkIndexMonitor: Streaming job flint_glue_default_http_logs_await_test_3_index terminated with exception
24/06/01 22:25:03 INFO FlintOpenSearchMetadataLog: Log entry written as
FlintMetadataLogEntry(ZmxpbnRfZ2x1ZV9kZWZhdWx0X2h0dHBfbG9nc19hd2FpdF90ZXN0XzNfaW5kZXg=,
71,1,1717203462456,failed,glue,)

# Verify index state transitioned to FAILED as expected
GET .query_execution_request_glue/_doc/ZmxpbnRfZ2x1ZV9kZWZhdWx0X2h0dHBfbG9nc19hd2FpdF90ZXN0XzNfaW5kZXg=
{
  "_index": ".query_execution_request_glue",
  "_id": "ZmxpbnRfZ2x1ZV9kZWZhdWx0X2h0dHBfbG9nc19hd2FpdF90ZXN0XzNfaW5kZXg=",
  "_version": 11,
  "_seq_no": 35,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "version": "1.0",
    "latestId": "ZmxpbnRfZ2x1ZV9kZWZhdWx0X2h0dHBfbG9nc19hd2FpdF90ZXN0XzNfaW5kZXg=",
    "type": "flintindexstate",
    "state": "failed",
    "applicationId": "00fj56e4cs0ghe0l",
    "jobId": "00fjojhr9r8dho0n",
    "dataSourceName": "glue",
    "jobStartTime": 1717193633832,
    "lastUpdateTime": 1717193954278,
    "error": ""
  }
}

Issues Resolved

https://github.com/opensearch-project/opensearch-spark/issues/361

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check here.

dai-chen commented 4 weeks ago

Thanks for the change! Do we need to update the state diagram as well?

Double checked the diagram seems up-to-date (specifically from REFRESHING to FAILED). Let me know if I missed anything. Thanks!