opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
20 stars 32 forks source link

[FEATURE] Improve validation for SQL statement #65

Closed dai-chen closed 5 months ago

dai-chen commented 1 year ago

Is your feature request related to a problem?

Improve validation for SQL create statement:

  1. For DDL statement (create): a. Validate WITH options and report error if invalid given b. Check if given column is not supported by skipping/covering index, report error early instead of reporting when submitting DataFrame job at background
  2. For DML statement (show/desc/refresh), report error if given table name invalid

What solution would you like?

dai-chen commented 1 year ago

Other validation example:

scala> (flint
     |   .materializedView()
     |   .name("myglue.default.lineitem_metrics")
     |   .query("SELECT window.start, COUNT(*) FROM stream.lineitem_tiny GROUP BY TUMBLE(l_shipdate, '1 Month')")
     |   .options(FlintSparkIndexOptions(Map(
     |     "auto_refresh" -> "true",
     |     "checkpoint_location" -> "s3://test/checkpoints/"
     |   )))
     |   .create())
java.lang.IllegalArgumentException: Intervals greater than a month is not supported (1 Month).

# Select alias is required otherwise StructType.fromDDL() may fail if any parentheses in column name
scala> (flint
     |   .materializedView()
     |   .name("myglue.default.lineitem_metrics")
     |   .query("SELECT window.start, COUNT(*) FROM stream.lineitem_tiny GROUP BY TUMBLE(l_shipdate, '1 Week')")
     |   .options(FlintSparkIndexOptions(Map(
     |     "auto_refresh" -> "true",
     |     "checkpoint_location" -> "s3://test/checkpoints/"
     |   )))
     |   .create())
org.apache.spark.sql.catalyst.parser.ParseException:
Syntax error at or near '('(line 1, pos 30)

== SQL ==
start timestamp not null,count(1) long not null
dai-chen commented 1 year ago

Another validation required. Because whereClause and mvQuery rule can match anything (non-greedily). If anything wrong in WITH clause after it, the query will still be accepted and reply on Spark to validate.

spark-sql> CREATE INDEX test ON ds_tables.http_logs
         > (clientip, status)
         > WHERE status != 200
         > WITH (
         >   auto_refresh = true
         > ;
Time taken: 2.511 seconds

The expression WHERE status != 200 WITH (auto_refresh = true above is passed to Spark as filtering expression. Spark doesn't throw any exception.

dai-chen commented 11 months ago

Spark structured streaming doesn't support Hive table. Here is the test that identifies a table is Hive or not:

$ spark-shell  ... --conf spark.flint.datasource.name=myglue

scala> import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName}

scala> def getTableProperties(qualifiedTableName: String): java.util.Map[String, String] = {
     |   val (catalog, ident) = parseTableName(spark, qualifiedTableName)
     |   val table = loadTable(catalog, ident)
     |   table.get.properties
     | }

scala> getTableProperties("myglue.stream.lineitem_tiny")
res11: java.util.Map[String,String] = {location=s3://.../tpch-lineitem-tiny,
 provider=JSON, external=true, option.compression=gzip, owner=hadoop}

scala> getTableProperties("myglue.ds_tables.http_logs")
res12: java.util.Map[String,String] = {location=s3://.../http_logs_partitioned_json_bz2,
 provider=json, external=true, option.compression=bzip2, owner=hadoop}

scala> getTableProperties("myglue.mydatabase.noaa_ghcn_pds")
res14: java.util.Map[String,String] = {location=s3://noaa-ghcn-pds/csv,
 provider=hive, transient_lastDdlTime=1675459327, option.serialization.format=1,
 external=true, classification=csv, owner=hadoop, option.separatorChar=,}
penghuo commented 9 months ago

Reproduce Issue

-- Insert data into the table INSERT INTO user_data VALUES ('Alice', 30), ('Bob', 25);

CREATE SKIPPING INDEX ON user_data (age VALUE_SET) WITH (auto_refresh = true)


* Error log

org.apache.spark.SparkException: Execution of the stream flint_spark_catalog_default_user_data_skipping_index failed. Please, fill a bug report in, and provide the full stack trace. at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:324) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208) Caused by: java.lang.NullPointerException at org.apache.spark.sql.hive.HiveShim$.wrapperToFileSinkDesc(HiveShim.scala:228) at org.apache.spark.sql.hive.execution.HiveFileFormat.supportFieldName(HiveFileFormat.scala:112) at org.apache.spark.sql.execution.datasources.DataSourceUtils$.$anonfun$checkFieldNames$1(DataSourceUtils.scala:75) at org.apache.spark.sql.execution.datasources.DataSourceUtils$.$anonfun$checkFieldNames$1$adapted(DataSourceUtils.scala:74) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102) at org.apache.spark.sql.execution.datasources.DataSourceUtils$.checkFieldNames(DataSourceUtils.scala:74) at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifySchema(DataSourceUtils.scala:95) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:437) at org.apache.spark.sql.execution.streaming.FileStreamSource.getBatch(FileStreamSource.scala:248) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:549) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27) at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293) at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290) at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:545) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:545) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285) ... 1 more Time taken: 6.146 seconds

penghuo commented 8 months ago

Proposed Solutions

Using SHOW TABLE EXTENDED to filter out hive table. The procedures are

  1. SHOW TABLE EXTENDED IN database LIKE '*'
  2. Filter on information column, if it contain Provider: hive, it is hive table. For instance

Hive table info

SHOW TABLE EXTENDED IN `test-db` LIKE 'my_hive_table'
Database: test-db 
Table: my_hive_table 
Owner: owner 
Created Time: Mon Jan 08 17:28:48 UTC 2024 
Last Access: Mon Jan 08 17:28:48 UTC 2024 
Created By: Spark 2.2 or prior 
Type: EXTERNAL 
Provider: hive 

Spark datasource table info

Database: default 
Table: alb_logs 
Owner: hadoop 
Created Time: Mon Jan 08 18:54:37 UTC 2024 
Last Access: UNKNOWN 
Created By: Spark 3.3.2-amzn-0 
Type: EXTERNAL 
Provider: csv
dai-chen commented 7 months ago

If auto_refresh is true, user should not specify incremental_refresh or only specify it false.

vamsi-amazon commented 7 months ago

Another Validation Required is restricting the length of the index name.

dai-chen commented 7 months ago

Summary

Here is an summary for all issues listed above, especially CREATE Flint index DDL statement.

Out of Scope

  1. Note that validation for table and column existence is handled within the Flint index builder and is therefore not detailed here.
  2. Due to the streaming job logic being executed inside Spark during job initiation, it is not feasible to validate all aspects beforehand. In such cases, we aim to capture and record the internal error messages in https://github.com/opensearch-project/opensearch-spark/issues/281.

Index Option Validations

Other Validations

dai-chen commented 7 months ago

Tested checkpoint location validate approach. CheckpointFileManager is the same abstraction used by Spark streaming job.

scala> val checkpointMgr = CheckpointFileManager.create(new Path("s3://test/123"), spark.sessionState.newHadoopConf)
checkpointMgr: org.apache.spark.sql.execution.streaming.CheckpointFileManager
 = org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager@7b976364

scala> checkpointMgr.exists(new Path("s3://test/123"))
java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
All access to this object has been disabled (Service: Amazon S3; Status Code: 403; Error Code: AllAccessDisabled;
  ...

checkpointMgr.exists(new Path("s3://benchmark/httplogs"))
res4: Boolean = true
dai-chen commented 5 months ago

Finished high priority items in Index Option Validations section already. Will track other minor items separately as needed in future.