When creating a Flint materialized view with a duplicate column, it results in an error in the Flint streaming job during refresh. Specifically, the error occurs due to the presence of duplicate fields in the JSON structure, which causes a JsonParseException.
How can one reproduce the bug?
Define a query for a Flint materialized view with duplicate columns. For example:
SELECT
window.start AS startTime,
COUNT(*) AS count,
COUNT(*) AS count
FROM $testTable
GROUP BY TUMBLE(time, '10 Minutes')
Create the materialized view using the above query.
Refresh the Flint materialized view
Observe the error in the logs.
What is the expected behavior?
The system should perform pre-validation when creating a materialized view (MV) to check for duplicate columns. If duplicate columns are detected, it should prevent the creation of the MV and provide a clear error message indicating the issue. Similar as https://github.com/opensearch-project/opensearch-spark/pull/297.
Do you have any screenshots?
N/A
Do you have any additional context?
24/05/28 09:25:26 ERROR CustomLogging: {"timestamp":1716913526851,"severityText":"ERROR","severityNumber":17,
"body":{"message":"OpenSearch Operation failed with an exception."},"attributes":{"domainName":"UNKNOWN:UNKNOWN",
"clientId":"UNKNOWN", "exception.type":"com.fasterxml.jackson.core.JsonParseException",
"exception.message":"Duplicate field 'count'\n at
[Source: (byte[])\"{\"startTime\":\"2023-10-01T00:00:00.000000-0700\",\"count\":1,\"count\":1}\"; line: 1, column: 65]"}}
com.fasterxml.jackson.core.JsonParseException: Duplicate field 'count'
at [Source: (byte[])"{"startTime":"2023-10-01T00:00:00.000000-0700","count":1,"count":1}"; line: 1, column: 65]
at com.fasterxml.jackson.core.json.JsonReadContext._checkDup(JsonReadContext.java:243)
at com.fasterxml.jackson.core.json.JsonReadContext.setCurrentName(JsonReadContext.java:237)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:799)
at com.fasterxml.jackson.core.JsonGenerator._copyCurrentContents(JsonGenerator.java:2638)
at com.fasterxml.jackson.core.JsonGenerator.copyCurrentStructure(JsonGenerator.java:2619)
at org.opensearch.common.xcontent.json.JsonXContentGenerator.copyCurrentStructure(JsonXContentGenerator.java:418)
at org.opensearch.common.xcontent.XContentBuilder.copyCurrentStructure(XContentBuilder.java:1013)
at org.opensearch.client.RequestConverters.bulk(RequestConverters.java:258)
at org.opensearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1907)
at org.opensearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1877)
at org.opensearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1845)
at org.opensearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:364)
at org.opensearch.flint.core.RestHighLevelClientWrapper.lambda$bulk$0(RestHighLevelClientWrapper.java:54)
at org.opensearch.flint.core.RestHighLevelClientWrapper.execute(RestHighLevelClientWrapper.java:128)
at org.opensearch.flint.core.RestHighLevelClientWrapper.bulk(RestHighLevelClientWrapper.java:54)
at org.opensearch.flint.core.storage.OpenSearchWriter.flush(OpenSearchWriter.java:59)
at com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.flush(WriterBasedJsonGenerator.java:983)
at org.apache.spark.sql.flint.json.FlintJacksonGenerator.flush(FlintJacksonGenerator.scala:257)
at org.apache.spark.sql.flint.FlintPartitionWriter.commit(FlintPartitionWriter.scala:70)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:453)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: java.lang.IllegalStateException: Failed to close the XContentBuilder
at org.opensearch.common.xcontent.XContentBuilder.close(XContentBuilder.java:1027)
at org.opensearch.client.RequestConverters.bulk(RequestConverters.java:257)
... 23 more
Caused by: java.io.IOException: Unclosed object or array found
at org.opensearch.common.xcontent.json.JsonXContentGenerator.close(JsonXContentGenerator.java:469)
at org.opensearch.common.xcontent.XContentBuilder.close(XContentBuilder.java:1025)
... 24 more
What is the bug?
When creating a Flint materialized view with a duplicate column, it results in an error in the Flint streaming job during refresh. Specifically, the error occurs due to the presence of duplicate fields in the JSON structure, which causes a
JsonParseException
.How can one reproduce the bug?
What is the expected behavior?
The system should perform pre-validation when creating a materialized view (MV) to check for duplicate columns. If duplicate columns are detected, it should prevent the creation of the MV and provide a clear error message indicating the issue. Similar as https://github.com/opensearch-project/opensearch-spark/pull/297.
Do you have any screenshots?
N/A
Do you have any additional context?