apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.97k stars 1.8k forks source link

[Bug] [transfor-v2] SQL transform plugin reports error when parsing fields starting with numbers #7505

Closed zhangxiang-ola closed 1 month ago

zhangxiang-ola commented 2 months ago

Search before asking

What happened

I use SQL transform plugin to do field mapping and assign default values ​​to null values ​​in source table fields. But when my source table has a field named starting with a number, if my transform sql is written as

SELECT COALESCE(1col,0) as 1col

CCJSqlParserUtil will not be able to parse it. If it is written as

SELECT COALESCE(`1col`,0) as `1col`

the task An error will be reported that

 java.lang.IllegalArgumentException: can't find field [`1col`]

SeaTunnel Version

2.3.5

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  Jdbc {
    url = "my_hive_jdbc_url"
    driver = "org.apache.hive.jdbc.HiveDriver"
    connection_check_timeout_sec = 100
    query = "SELECT `1col`,`2col`,`dt` FROM dws_test.zx_col_number_test WHERE dt= '2024-08-26"
    result_table_name = "zx_col_number_test"
  }
}

transform {
  SQL {
      source_table_name = "zx_col_number_test"
      result_table_name = "my_ck_table"
      query = """
        SELECT
          COALESCE(`1col`,0) as `1col`,
          COALESCE(`2col`,'') AS `2col`,
          COALESCE(dt,'') AS dt
        FROM zx_col_number_test
      """
    }
}

sink {
  Clickhouse {
    source_table_name = "my_ck_table"
    host = "my_host"
    database = "my_ck_db"
    table = "my_ck_table"
    username = "my_username"
    password = "my_password"
    bulk_size = 100000
  }
}

Running Command

apache-seatunnel-2.3.5/bin/start-seatunnel-spark-2-connector-v2.sh -name my_job_name --master yarn  -deploy-mode client --config my_config

Error Exception

2024-08-27 11:36:43.038 - 24/08/27 11:36:42 ERROR [main] SeaTunnel: Fatal Error, 
2024-08-27 11:36:43.159 - 
2024-08-27 11:36:43.217 - 24/08/27 11:36:42 ERROR [main] SeaTunnel: Please submit bug report in https://github.com/apache/seatunnel/issues
2024-08-27 11:36:43.268 - 
2024-08-27 11:36:43.315 - 24/08/27 11:36:42 ERROR [main] SeaTunnel: Reason:Run SeaTunnel on spark failed 
2024-08-27 11:36:43.358 - 
2024-08-27 11:36:43.404 - 24/08/27 11:36:42 ERROR [main] SeaTunnel: Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: Run SeaTunnel on spark failed
2024-08-27 11:36:43.471 -   at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:62)
2024-08-27 11:36:43.532 -   at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
2024-08-27 11:36:43.593 -   at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35)
2024-08-27 11:36:43.663 -   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2024-08-27 11:36:43.741 -   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2024-08-27 11:36:43.834 -   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2024-08-27 11:36:43.923 -   at java.lang.reflect.Method.invoke(Method.java:498)
2024-08-27 11:36:43.971 -   at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
2024-08-27 11:36:44.029 -   at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:855)
2024-08-27 11:36:44.084 -   at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
2024-08-27 11:36:44.138 -   at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
2024-08-27 11:36:44.232 -   at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
2024-08-27 11:36:44.365 -   at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:930)
2024-08-27 11:36:44.410 -   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:939)
2024-08-27 11:36:44.498 -   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2024-08-27 11:36:44.542 - Caused by: org.apache.seatunnel.core.starter.exception.TaskExecuteException: SeaTunnel transform task: Sql execute error
2024-08-27 11:36:44.591 -   at org.apache.seatunnel.core.starter.spark.execution.TransformExecuteProcessor.execute(TransformExecuteProcessor.java:121)
2024-08-27 11:36:44.641 -   at org.apache.seatunnel.core.starter.spark.execution.SparkExecution.execute(SparkExecution.java:70)
2024-08-27 11:36:44.698 -   at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:60)
2024-08-27 11:36:44.762 -   ... 14 more
2024-08-27 11:36:44.806 - Caused by: java.lang.IllegalArgumentException: can't find field [`11`]
2024-08-27 11:36:44.852 -   at org.apache.seatunnel.transform.sql.zeta.ZetaSQLType.getExpressionType(ZetaSQLType.java:125)
2024-08-27 11:36:44.897 -   at org.apache.seatunnel.transform.sql.zeta.ZetaSQLType.getFunctionType(ZetaSQLType.java:435)
2024-08-27 11:36:45.018 -   at org.apache.seatunnel.transform.sql.zeta.ZetaSQLType.getExpressionType(ZetaSQLType.java:143)
2024-08-27 11:36:45.092 -   at org.apache.seatunnel.transform.sql.zeta.ZetaSQLEngine.typeMapping(ZetaSQLEngine.java:206)
2024-08-27 11:36:45.133 -   at org.apache.seatunnel.transform.sql.SQLTransform.transformTableSchema(SQLTransform.java:122)
2024-08-27 11:36:45.227 -   at org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform.transformCatalogTable(AbstractCatalogSupportTransform.java:50)
2024-08-27 11:36:45.358 -   at org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform.getProducedCatalogTable(AbstractCatalogSupportTransform.java:40)
2024-08-27 11:36:45.403 -   at org.apache.seatunnel.core.starter.spark.execution.TransformExecuteProcessor.sparkTransform(TransformExecuteProcessor.java:134)
2024-08-27 11:36:45.490 -   at org.apache.seatunnel.core.starter.spark.execution.TransformExecuteProcessor.execute(TransformExecuteProcessor.java:110)
2024-08-27 11:47:18.769 - 24/08/27 11:47:18 ERROR [main] SeaTunnel: Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: Run SeaTunnel on spark failed
2024-08-27 11:47:18.817 -   at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:62)
2024-08-27 11:47:18.891 -   at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
2024-08-27 11:47:18.958 -   at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35)
2024-08-27 11:47:19.031 -   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2024-08-27 11:47:19.088 -   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2024-08-27 11:47:19.123 -   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2024-08-27 11:47:19.178 -   at java.lang.reflect.Method.invoke(Method.java:498)
2024-08-27 11:47:19.217 -   at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
2024-08-27 11:47:19.261 -   at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:855)
2024-08-27 11:47:19.310 -   at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
2024-08-27 11:47:19.359 -   at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
2024-08-27 11:47:19.420 -   at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
2024-08-27 11:47:19.470 -   at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:930)
2024-08-27 11:47:19.576 -   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:939)
2024-08-27 11:47:19.618 -   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2024-08-27 11:47:19.670 - Caused by: org.apache.seatunnel.core.starter.exception.TaskExecuteException: SeaTunnel transform task: Sql execute error
2024-08-27 11:47:19.714 -   at org.apache.seatunnel.core.starter.spark.execution.TransformExecuteProcessor.execute(TransformExecuteProcessor.java:121)
2024-08-27 11:47:19.780 -   at org.apache.seatunnel.core.starter.spark.execution.SparkExecution.execute(SparkExecution.java:70)
2024-08-27 11:47:19.824 -   at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:60)
2024-08-27 11:47:19.888 -   ... 14 more
2024-08-27 11:47:19.937 - Caused by: org.apache.seatunnel.transform.exception.TransformException: ErrorCode:[COMMON-05], ErrorDescription:[Unsupported operation] - SQL parse failed: 
2024-08-27 11:47:19.982 -         SELECT
2024-08-27 11:47:20.032 -           COALESCE(1col,0) as 1col,
2024-08-27 11:47:20.076 -           COALESCE(2col,'') AS 2col,
2024-08-27 11:47:20.139 -           COALESCE(dt,'') AS dt
2024-08-27 11:47:20.232 -         FROM zx_col_number_test
2024-08-27 11:47:20.294 -       , cause: net.sf.jsqlparser.parser.ParseException: Encountered unexpected token: "as" "AS"
2024-08-27 11:47:20.387 -     at line 3, column 25.
2024-08-27 11:47:20.449 - 
2024-08-27 11:47:20.499 - Was expecting one of:
2024-08-27 11:47:20.551 - 
2024-08-27 11:47:20.619 -     "&"
2024-08-27 11:47:20.680 -     "."
2024-08-27 11:47:20.812 -     "::"
2024-08-27 11:47:20.904 -     ";"
2024-08-27 11:47:20.961 -     "<<"
2024-08-27 11:47:21.012 -     ">>"
2024-08-27 11:47:21.062 -     "COLLATE"
2024-08-27 11:47:21.120 -     "CONNECT"
2024-08-27 11:47:21.164 -     "EMIT"
2024-08-27 11:47:21.224 -     "GROUP"
2024-08-27 11:47:21.285 -     "HAVING"
2024-08-27 11:47:21.347 -     "INTO"
2024-08-27 11:47:21.388 -     "START"
2024-08-27 11:47:21.440 -     "WINDOW"
2024-08-27 11:47:21.478 -     "["
2024-08-27 11:47:21.595 -     "^"
2024-08-27 11:47:21.759 -     "|"
2024-08-27 11:47:21.801 -     <EOF>
2024-08-27 11:47:21.840 - 
2024-08-27 11:47:21.887 -   at org.apache.seatunnel.transform.sql.zeta.ZetaSQLEngine.parseSQL(ZetaSQLEngine.java:95)
2024-08-27 11:47:21.927 -   at org.apache.seatunnel.transform.sql.zeta.ZetaSQLEngine.init(ZetaSQLEngine.java:83)
2024-08-27 11:47:21.976 -   at org.apache.seatunnel.transform.sql.SQLTransform.open(SQLTransform.java:99)
2024-08-27 11:47:22.028 -   at org.apache.seatunnel.transform.sql.SQLTransform.tryOpen(SQLTransform.java:108)
2024-08-27 11:47:22.097 -   at org.apache.seatunnel.transform.sql.SQLTransform.transformTableSchema(SQLTransform.java:120)
2024-08-27 11:47:22.144 -   at org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform.transformCatalogTable(AbstractCatalogSupportTransform.java:50)
2024-08-27 11:47:22.191 -   at org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform.getProducedCatalogTable(AbstractCatalogSupportTransform.java:40)
2024-08-27 11:47:22.254 -   at org.apache.seatunnel.core.starter.spark.execution.TransformExecuteProcessor.sparkTransform(TransformExecuteProcessor.java:134)
2024-08-27 11:47:22.309 -   at org.apache.seatunnel.core.starter.spark.execution.TransformExecuteProcessor.execute(TransformExecuteProcessor.java:110)


### Zeta or Flink or Spark Version

spark  3.3.1

### Java or Scala Version

_No response_

### Screenshots

_No response_

### Are you willing to submit PR?

- [ ] Yes I am willing to submit a PR!

### Code of Conduct

- [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
zhilinli123 commented 2 months ago

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  FakeSource {
    result_table_name = "fake"
    parallelism = 1
    schema = {
      fields {
        name = "string"
        age = "int"
        id = "int"
      }
    }
  }
}

transform {
  Sql {
    source_table_name = "fake"
    result_table_name = "fake1"
    query = "select id, COALESCE(name, '1112') as name, age+1 as age from fake where id>0"
  }
}

sink {
  console {
    source_table_name = "fake1"
  }
}

Is it convenient for you to execute this case to see if it is possible

zhangxiang-ola commented 2 months ago

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  FakeSource {
    result_table_name = "fake"
    parallelism = 1
    schema = {
      fields {
        name = "string"
        age = "int"
        id = "int"
      }
    }
  }
}

transform {
  Sql {
    source_table_name = "fake"
    result_table_name = "fake1"
    query = "select id, COALESCE(name, '1112') as name, age+1 as age from fake where id>0"
  }
}

sink {
  console {
    source_table_name = "fake1"
  }
}

Is it convenient for you to execute this case to see if it is possible

I remember this is the test configuration provided by the official documentation. Yes, it can be run.

zhangxiang-ola commented 1 month ago

Version 2.3.7 can solve my problem. version 2.3.5 Zetaengine parse SelectExpressionItem:

SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem;
                Expression expression = expressionItem.getExpression();

                if (expressionItem.getAlias() != null) {
                    fieldNames[idx] = expressionItem.getAlias().getName();
                } else {
                    if (expression instanceof Column) {
                        fieldNames[idx] = ((Column) expression).getColumnName();
                    } else {
                        fieldNames[idx] = expression.toString();
                    }
                }

version 2.3.7 Zetaengine parse SelectExpressionItem:

SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem;
                Expression expression = expressionItem.getExpression();
                if (expressionItem.getAlias() != null) {
                    String aliasName = expressionItem.getAlias().getName();
                    if (aliasName.startsWith(ESCAPE_IDENTIFIER)
                            && aliasName.endsWith(ESCAPE_IDENTIFIER)) {
                        aliasName = aliasName.substring(1, aliasName.length() - 1);
                    }
                    fieldNames[idx] = aliasName;
                } else {
                    if (expression instanceof Column) {
                        fieldNames[idx] = ((Column) expression).getColumnName();
                    } else {
                        fieldNames[idx] = expression.toString();
                    }
                }