opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
26 stars 33 forks source link

[BUG] Flint not supporting Complex schema #112

Closed YANG-DB closed 11 months ago

YANG-DB commented 1 year ago

What is the bug? When attempting to create an MV on top of complex Json schema - flint responds with error:

{
    "data": {
        "ok": true,
        "resp": {
            "status": "FAILED",
            "error": "Syntax error: \nDataType array is not supported.(line 1, pos 16)\n\n== SQL ==\n`resourceSpans` array not null\n----------------^^^\n"
        }
    }
}

How can one reproduce the bug? 1) create an S3 bucket with json data, in this case OTEL-Traces json logs

{
  "resourceSpans": [
    {
      "resource": {
        "attributes": [
          {
            "key": "telemetry.sdk.version",
            "value": {
              "stringValue": "1.3.0"
            }
          },
          {
            "key": "telemetry.sdk.name",
            "value": {
              "stringValue": "opentelemetry"
            }
          },
          {
            "key": "telemetry.sdk.language",
            "value": {
              "stringValue": "erlang"
            }
          },
          {
            "key": "service.name",
            "value": {
              "stringValue": "featureflagservice"
            }
          },
          {
            "key": "service.instance.id",
            "value": {
              "stringValue": "featureflagservice@e083872efcb9"
            }
          },
          {
            "key": "process.runtime.version",
            "value": {
              "stringValue": "11.2.2.13"
            }
          },
          {
            "key": "process.runtime.name",
            "value": {
              "stringValue": "BEAM"
            }
          },
          {
            "key": "process.runtime.description",
            "value": {
              "stringValue": "Erlang/OTP 23 erts-11.2.2.13"
            }
          },
          {
            "key": "process.executable.name",
            "value": {
              "stringValue": "featureflagservice"
            }
          }
        ]
      },
      "scopeSpans": [
        {
          "scope": {
            "name": "opentelemetry_ecto",
            "version": "1.1.1"
          },
          "spans": [
            {
              "traceId": "bc342fb3fbfa54c2188595b89b0b1cd8",
              "spanId": "87acd6659b425f80",
              "parentSpanId": "9b355ca40dd98f5e",
              "name": "featureflagservice.repo.query:featureflags",
              "kind": 3,
              "startTimeUnixNano": "1698098982170068232",
              "endTimeUnixNano": "1698098982202276205",
              "attributes": [
                {
                  "key": "total_time_microseconds",
                  "value": {
                    "intValue": "31286"
                  }
                },
                {
                  "key": "source",
                  "value": {
                    "stringValue": "featureflags"
                  }
                },
                {
                  "key": "queue_time_microseconds",
                  "value": {
                    "intValue": "13579"
                  }
                },
                {
                  "key": "query_time_microseconds",
                  "value": {
                    "intValue": "17698"
                  }
                },
                {
                  "key": "idle_time_microseconds",
                  "value": {
                    "intValue": "307054"
                  }
                },
                {
                  "key": "decode_time_microseconds",
                  "value": {
                    "intValue": "8"
                  }
                },
                {
                  "key": "db.url",
                  "value": {
                    "stringValue": "ecto://ffs_postgres"
                  }
                },
                {
                  "key": "db.type",
                  "value": {
                    "stringValue": "sql"
                  }
                },
                {
                  "key": "db.statement",
                  "value": {
                    "stringValue": " AS f0"
                  }
                },
                {
                  "key": "db.name",
                  "value": {
                    "stringValue": "ffs"
                  }
                },
                {
                  "key": "db.instance",
                  "value": {
                    "stringValue": "ffs"
                  }
                }
              ],
              "status": {}
            }
          ]
        }
      ]
    }
  ]
}

Next define a spark table (Hive) on top of that S3 bucket:

CREATE TABLE default.otel_traces (
  resourceSpans ARRAY<STRUCT<
      resource: STRUCT<
        attributes: ARRAY<STRUCT<key: STRING, value: STRUCT<stringValue: STRING>>>>,
  scopeSpans: ARRAY<STRUCT<
    scope: STRUCT<name: STRING, version: STRING>,
    spans: ARRAY<STRUCT<
      attributes: ARRAY<STRUCT<key: STRING, value: STRUCT<intValue: STRING, stringValue: STRING>>>,
      endTimeUnixNano: STRING,
      kind: BIGINT,
      name: STRING,
      parentSpanId: STRING,
      spanId: STRING,
      startTimeUnixNano: STRING,
      traceId: STRING>>>>>>)
  USING json options ('path' 's3://my-path/otel-traces')

Afterwards we define the next MV statement:

CREATE MATERIALIZED VIEW oteldemo_mview AS
    SELECT
    rs.resource.attributes.key as resource_key,
    rs.resource.attributes.value.stringValue as resource_value,
    ss.scope.name as scope_name,
    ss.scope.version as scope_version,
    span.attributes.key as span_key,
    span.attributes.value.intValue as span_int_value,
    span.attributes.value.stringValue as span_string_value,
    span.endTimeUnixNano,
    span.kind,
    span.name as span_name,
    span.parentSpanId,
    span.spanId,
    span.startTimeUnixNano,
    span.traceId
    FROM
    glue_1.default.otel_traces
    LATERAL VIEW
    EXPLODE(resourceSpans) as rs
    LATERAL VIEW
    EXPLODE(rs.scopeSpans) as ss
    LATERAL VIEW
    EXPLODE(ss.spans) as span
    LATERAL VIEW
    EXPLODE(span.attributes) as span_attr 
WITH (
  auto_refresh = true,
  checkpoint_location = 's3://my-path/oteldemo/checkpoint',
  refresh_interval = '90 Seconds'
)

Using the LATERAL VIEW used to apply the EXPLODE function to each element of an array or map the actual above select statement within the create MV does work correctly - it generates the resulting rows into the resulting index

{
          "result": [
            "{'resource_key':['service.name'],'resource_value':['frontend-proxy'],'span_key':['node_id','zone','guid:x-request-id','http.url','http.method','downstream_cluster','user_agent','http.protocol','peer.address','request_size','response_size','component','upstream_cluster','upstream_cluster.name','http.status_code','response_flags','error'],'span_int_value':[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],'span_string_value':['','','7f1af550-55ce-9bfb-b220-b395be8abc18','http://frontend-proxy:8080/api/cart','POST','-','python-requests/2.31.0','HTTP/1.1','172.18.0.24','102','95','proxy','frontend','frontend','503','UC','true'],'endTimeUnixNano':'1698225891911800000','kind':2,'span_name':'ingress','parentSpanId':'96b27852ca8b1d9b','spanId':'ee4bcdd367bb89c5','startTimeUnixNano':'1698225891003501000','traceId':'7e46660b19156d93726d9cdf139b1608'}",
            "{'resource_key':['service.name'],'resource_value':['frontend-proxy'],'span_key':['node_id','zone','guid:x-request-id','http.url','http.method','downstream_cluster','user_agent','http.protocol','peer.address','request_size','response_size','component','upstream_cluster','upstream_cluster.name','http.status_code','response_flags','error'],'span_int_value':[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],'span_string_value':['','','7f1af550-55ce-9bfb-b220-b395be8abc18','http://frontend-proxy:8080/api/cart','POST','-','python-requests/2.31.0','HTTP/1.1','172.18.0.24','102','95','proxy','frontend','frontend','503','UC','true'],'endTimeUnixNano':'1698225891911800000','kind':2,'span_name':'ingress','parentSpanId':'96b27852ca8b1d9b','spanId':'ee4bcdd367bb89c5','startTimeUnixNano':'1698225891003501000','traceId':'7e46660b19156d93726d9cdf139b1608'}",
            "{'resource_key':['service.name'],'resource_value':['frontend-proxy'],'span_key':['node_id','zone','guid:x-request-id','http.url','http.method','downstream_cluster','user_agent','http.protocol','peer.address','request_size','response_size','component','upstream_cluster','upstream_cluster.name','http.status_code','response_flags','error'],'span_int_value':[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],'span_string_value':['','','7f1af550-55ce-9bfb-b220-b395be8abc18','http://frontend-proxy:8080/api/cart','POST','-','python-requests/2.31.0','HTTP/1.1','172.18.0.24','102','95','proxy','frontend','frontend','503','UC','true'],'endTimeUnixNano':'1698225891911800000','kind':2,'span_name':'ingress','parentSpanId':'96b27852ca8b1d9b','spanId':'ee4bcdd367bb89c5','startTimeUnixNano':'1698225891003501000','traceId':'7e46660b19156d93726d9cdf139b1608'}",
            "{'resource_key':['service.name'],'resource_value':['frontend-proxy'],'span_key':['node_id','zone','guid:x-request-id','http.url','http.method','downstream_cluster','user_agent','http.protocol','peer.address','request_size','response_size','component','upstream_cluster','upstream_cluster.name','http.status_code','response_flags','error'],'span_int_value':[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],'span_string_value':['','','7f1af550-55ce-9bfb-b220-b395be8abc18','http://frontend-proxy:8080/api/cart','POST','-','python-requests/2.31.0','HTTP/1.1','172.18.0.24','102','95','proxy','frontend','frontend','503','UC','true'],'endTimeUnixNano':'1698225891911800000','kind':2,'span_name':'ingress','parentSpanId':'96b27852ca8b1d9b','spanId':'ee4bcdd367bb89c5','startTimeUnixNano':'1698225891003501000','traceId':'7e46660b19156d93726d9cdf139b1608'}",
            "{'resource_key':['service.name'],'resource_value':['frontend-proxy'],'span_key':['node_id','zone','guid:x-request-id','http.url','http.method','downstream_cluster','user_agent','http.protocol','peer.address','request_size','response_size','component','upstream_cluster','upstream_cluster.name','http.status_code','response_flags','error'],'span_int_value':[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],'span_string_value':['','','7f1af550-55ce-9bfb-b220-b395be8abc18','http://frontend-proxy:8080/api/cart','POST','-','python-requests/2.31.0','HTTP/1.1','172.18.0.24','102','95','proxy','frontend','frontend','503','UC','true'],'endTimeUnixNano':'1698225891911800000','kind':2,'span_name':'ingress','parentSpanId':'96b27852ca8b1d9b','spanId':'ee4bcdd367bb89c5','startTimeUnixNano':'1698225891003501000','traceId':'7e46660b19156d93726d9cdf139b1608'}",
            "{'resource_key':['service.name'],'resource_value':['frontend-proxy'],'span_key':['node_id','zone','guid:x-request-id','http.url','http.method','downstream_cluster','user_agent','http.protocol','peer.address','request_size','response_size','component','upstream_cluster','upstream_cluster.name','http.status_code','response_flags','error'],'span_int_value':[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],'span_string_value':['','','7f1af550-55ce-9bfb-b220-b395be8abc18','http://frontend-proxy:8080/api/cart','POST','-','python-requests/2.31.0','HTTP/1.1','172.18.0.24','102','95','proxy','frontend','frontend','503','UC','true'],'endTimeUnixNano':'1698225891911800000','kind':2,'span_name':'ingress','parentSpanId':'96b27852ca8b1d9b','spanId':'ee4bcdd367bb89c5','startTimeUnixNano':'1698225891003501000','traceId':'7e46660b19156d93726d9cdf139b1608'}",
            "{'resource_key':['service.name'],'resource_value':['frontend-proxy'],'span_key':['node_id','zone','guid:x-request-id','http.url','http.method','downstream_cluster','user_agent','http.protocol','peer.address','request_size','response_size','component','upstream_cluster','upstream_cluster.name','http.status_code','response_flags','error'],'span_int_value':[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],'span_string_value':['','','7f1af550-55ce-9bfb-b220-b395be8abc18','http://frontend-proxy:8080/api/cart','POST','-','python-requests/2.31.0','HTTP/1.1','172.18.0.24','102','95','proxy','frontend','frontend','503','UC','true'],'endTimeUnixNano':'1698225891911800000','kind':2,'span_name':'ingress','parentSpanId':'96b27852ca8b1d9b','spanId':'ee4bcdd367bb89c5','startTimeUnixNano':'1698225891003501000','traceId':'7e46660b19156d93726d9cdf139b1608'}",
            "{'resource_key':['service.name'],'resource_value':['frontend-proxy'],'span_key':['node_id','zone','guid:x-request-id','http.url','http.method','downstream_cluster','user_agent','http.protocol','peer.address','request_size','response_size','component','upstream_cluster','upstream_cluster.name','http.status_code','response_flags','error'],'span_int_value':[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],'span_string_value':['','','7f1af550-55ce-9bfb-b220-b395be8abc18','http://frontend-proxy:8080/api/cart','POST','-','python-requests/2.31.0','HTTP/1.1','172.18.0.24','102','95','proxy','frontend','frontend','503','UC','true'],'endTimeUnixNano':'1698225891911800000','kind':2,'span_name':'ingress','parentSpanId':'96b27852ca8b1d9b','spanId':'ee4bcdd367bb89c5','startTimeUnixNano':'1698225891003501000','traceId':'7e46660b19156d93726d9cdf139b1608'}",
            "{'resource_key':['service.name'],'resource_value':['frontend-proxy'],'span_key':['node_id','zone','guid:x-request-id','http.url','http.method','downstream_cluster','user_agent','http.protocol','peer.address','request_size','response_size','component','upstream_cluster','upstream_cluster.name','http.status_code','response_flags','error'],'span_int_value':[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],'span_string_value':['','','7f1af550-55ce-9bfb-b220-b395be8abc18','http://frontend-proxy:8080/api/cart','POST','-','python-requests/2.31.0','HTTP/1.1','172.18.0.24','102','95','proxy','frontend','frontend','503','UC','true'],'endTimeUnixNano':'1698225891911800000','kind':2,'span_name':'ingress','parentSpanId':'96b27852ca8b1d9b','spanId':'ee4bcdd367bb89c5','startTimeUnixNano':'1698225891003501000','traceId':'7e46660b19156d93726d9cdf139b1608'}",
            "{'resource_key':['service.name'],'resource_value':['frontend-proxy'],'span_key':['node_id','zone','guid:x-request-id','http.url','http.method','downstream_cluster','user_agent','http.protocol','peer.address','request_size','response_size','component','upstream_cluster','upstream_cluster.name','http.status_code','response_flags','error'],'span_int_value':[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null],'span_string_value':['','','7f1af550-55ce-9bfb-b220-b395be8abc18','http://frontend-proxy:8080/api/cart','POST','-','python-requests/2.31.0','HTTP/1.1','172.18.0.24','102','95','proxy','frontend','frontend','503','UC','true'],'endTimeUnixNano':'1698225891911800000','kind':2,'span_name':'ingress','parentSpanId':'96b27852ca8b1d9b','spanId':'ee4bcdd367bb89c5','startTimeUnixNano':'1698225891003501000','traceId':'7e46660b19156d93726d9cdf139b1608'}"
          ],
          "schema": [
            "{'column_name':'resource_key','data_type':'array'}",
            "{'column_name':'resource_value','data_type':'array'}",
            "{'column_name':'scope_name','data_type':'string'}",
            "{'column_name':'scope_version','data_type':'string'}",
            "{'column_name':'span_key','data_type':'array'}",
            "{'column_name':'span_int_value','data_type':'array'}",
            "{'column_name':'span_string_value','data_type':'array'}",
            "{'column_name':'endTimeUnixNano','data_type':'string'}",
            "{'column_name':'kind','data_type':'long'}",
            "{'column_name':'span_name','data_type':'string'}",
            "{'column_name':'parentSpanId','data_type':'string'}",
            "{'column_name':'spanId','data_type':'string'}",
            "{'column_name':'startTimeUnixNano','data_type':'string'}",
            "{'column_name':'traceId','data_type':'string'}"
          ],
          "jobRunId": "00ff2895qb32000m",
          "applicationId": "00ff1rci77hf5g0l",
          "dataSourceName": "glue_1",
          "status": "SUCCESS",
          "error": "",
          "queryId": "QXhYNnlMWXU1VmdsdWVfMQ==",
          "queryText": "SELECT   rs.resource.attributes.key as resource_key,   rs.resource.attributes.value.stringValue as resource_value,   ss.scope.name as scope_name,   ss.scope.version as scope_version,   span.attributes.key as span_key,   span.attributes.value.intValue as span_int_value,   span.attributes.value.stringValue as span_string_value,   span.endTimeUnixNano,   span.kind,   span.name as span_name,   span.parentSpanId,   span.spanId,   span.startTimeUnixNano,   span.traceId FROM   glue_1.default.otel_traces LATERAL VIEW   EXPLODE(resourceSpans) as rs LATERAL VIEW   EXPLODE(rs.scopeSpans) as ss LATERAL VIEW   EXPLODE(ss.spans) as span LATERAL VIEW   EXPLODE(span.attributes) as span_attr LIMIT 10",
          "sessionId": "MzdSUGgwSUl5bWdsdWVfMQ==",
          "updateTime": 1701049777813,
          "queryRunTime": 48864
        }

What is the expected behavior? The MV should result with a correct respond that generated a corresponding OpenSearch Index

Do you have any screenshots?

Screenshot 2023-10-27 at 2 54 33 PM

Do you have any additional context? Spark Error Log:

23/10/27 21:59:04 ERROR FlintJob: Syntax error: 
DataType array is not supported.(line 1, pos 16)

== SQL ==
`resourceSpans` array not null
----------------^^^

org.apache.spark.sql.catalyst.parser.ParseException: 
DataType array is not supported.(line 1, pos 16)

== SQL ==
`resourceSpans` array not null
----------------^^^

    at org.apache.spark.sql.errors.QueryParsingErrors$.dataTypeUnsupportedError(QueryParsingErrors.scala:239) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.sql.catalyst.parser.AstBuilder.$anonfun$visitPrimitiveDataType$1(AstBuilder.scala:2684) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.sql.catalyst.parser.ParserUtils$.withOrigin(ParserUtils.scala:157) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.sql.catalyst.parser.AstBuilder.visitPrimitiveDataType(AstBuilder.scala:2654) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
    at org.apache.spark.sql.catalyst.parser.AstBuilder.visitPrimitiveDataType(AstBuilder.scala:58) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
dai-chen commented 1 year ago

just wondering if it's easier to support this by proposed COPY command in https://github.com/opensearch-project/opensearch-spark/issues/129 ?