opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
12 stars 18 forks source link

Fix index state stuck in refreshing when streaming job exits early #370

Closed dai-chen closed 2 weeks ago

dai-chen commented 3 weeks ago

Description

Addressed the issue that streaming job already exits before awaitMonitor API called and updated Flint index state to FAILED.

Note that I'm unable to reproduce this issue in IT and doubt the the delay between execute SQL query and await monitor is caused by result index write. For DDL statement, although the result is empty, we write it by DataFrame with Flint data source instead of using OpenSearch client directly. Need to confirm this and will create issue separately.

Issues Resolved

https://github.com/opensearch-project/opensearch-spark/issues/368

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check here.

dai-chen commented 1 week ago

Verified the changes with more test:

# Generate Parquet file with INT column
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._

val data = Seq(1, 2, 3, 4, 5)
val df = data.toDF("protocol").withColumn("protocol", $"protocol".cast(IntegerType))
df.write.mode("overwrite").parquet("s3://parquet_mismatch_test/")

# Create Spark table with BIGINT column
sql("""
    CREATE TABLE parquet_mismatch_test (
        protocol BIGINT
    )
    USING parquet
    LOCATION 's3://parquet_mismatch_test/'
""")

# Submit Spark job to create covering index
CREATE INDEX await_early_test
ON glue.default. parquet_mismatch_test (
  protocol
)
WITH (
  auto_refresh = true
);

# Spark job log
24/06/20 21:44:39 ERROR MicroBatchExecution: Query flint_glue_default_parquet_mismatch_test_await_early_test_index [id = 72705241-bafd-4b02-8f2c-328ec3528aa7,
runId = 8b978f47-a816-4ab0-b9b6-b7e2657b52e6] terminated with error
org.apache.spark.SparkException: Writing job aborted
...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 1.0 (TID 11) ([2600:1f14:3ed2:a401:2d1f:8db2:ece6:45cc] executor 1): org.apache.spark.sql.execution.QueryExecutionException: 
Parquet column cannot be converted in file s3://parquet_mismatch_test/part-00001-cbde737c-1014-4d9a-b4fd-6aea3aa91206-c000.snappy.parquet.
Column: [protocol], Expected: bigint, Found: INT32
    at ...
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
    at ...

24/06/20 21:44:40 INFO FlintSparkIndexMonitor: Index monitor for [None] not found.
24/06/20 21:44:40 INFO FlintSparkIndexMonitor: Found index name in index monitor task list: flint_glue_default_parquet_mismatch_test_await_early_test_index
24/06/20 21:44:40 INFO FlintSparkIndexMonitor: Updating index state to failed for flint_glue_default_parquet_mismatch_test_await_early_test_index
24/06/20 21:44:41 INFO FlintOpenSearchMetadataLog: Log entry written as FlintMetadataLogEntry(
ZmxpbnRfZ2x1ZV9kZWZhdWx0X3BhcnF1ZXRfbWlzbWF0Y2hfdGVzdF9hd2FpdF9lYXJseV90ZXN0X2luZGV4,
14,1,1718919871609,failed,glue,)

# Double check index state is Failed
      {
        "_index": ".query_execution_request_glue",
        "_id": "ZmxpbnRfZ2x1ZV9kZWZhdWx0X3BhcnF1ZXRfbWlzbWF0Y2hfdGVzdF9hd2FpdF9lYXJseV90ZXN0X2luZGV4",
        "_score": 1,
        "_source": {
          "version": "1.0",
          "latestId": "ZmxpbnRfZ2x1ZV9kZWZhdWx0X3BhcnF1ZXRfbWlzbWF0Y2hfdGVzdF9hd2FpdF9lYXJseV90ZXN0X2luZGV4",
          "type": "flintindexstate",
          "state": "failed",
          "applicationId": "00fj56e4cs0ghe0l",
          "jobId": "00fk8m0kqlj1180n",
          "dataSourceName": "glue",
          "jobStartTime": 1718919871609,
          "lastUpdateTime": 1718919880581,
          "error": ""
        }