apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.81k stars 1.76k forks source link

[Bug] [Postgressql] facing issue in executing aggregators query min, max, distinct #6522

Closed Bennyjoseph07 closed 4 months ago

Bennyjoseph07 commented 6 months ago

Search before asking

What happened

Issue with PostgreSQL source (Column names created with double quotes to maintain mixed-case style) When attempting to read data from PostgreSQL and load it into MySQL, the operation fails if the columns were created using double quotes.

CREATE TABLE STATEMENT

CREATE TABLE IF NOT EXISTS public.cricket_data_15 ( index bigint, "Team" text COLLATE pg_catalog."default", "Player_ID" bigint, "PLAYER" text COLLATE pg_catalog."default", matches bigint, runs bigint, wickets bigint, "Hundreds" bigint, fifties bigint, speciality text COLLATE pg_catalog."default" )

CONNECTOR CONFIG

source { jdbc { url= "jdbc:postgresql://hostname:5432/postgres" driver= "org.postgresql.Driver" user= "postgres" password= "" query= "SELECT min(CASE WHEN Player_ID IS NOT NULL THEN Player_ID END) AS Player_ID_min, min(CASE WHEN Matches IS NOT NULL THEN Matches END) AS Matches_min, min(CASE WHEN Runs IS NOT NULL THEN Runs END) AS Runs_min, min(CASE WHEN Wickets IS NOT NULL THEN Wickets END) AS Wickets_min, min(CASE WHEN Hundreds IS NOT NULL THEN Hundreds END) AS Hundreds_min, min(CASE WHEN Fifties IS NOT NULL THEN Fifties END) AS Fifties_min, max(CASE WHEN Player_ID IS NOT NULL THEN Player_ID END) AS Player_ID_max, max(CASE WHEN Matches IS NOT NULL THEN Matches END) AS Matches_max, max(CASE WHEN Runs IS NOT NULL THEN Runs END) AS Runs_max, max(CASE WHEN Wickets IS NOT NULL THEN Wickets END) AS Wickets_max, max(CASE WHEN Hundreds IS NOT NULL THEN Hundreds END) AS Hundreds_max, max(CASE WHEN Fifties IS NOT NULL THEN Fifties END) AS Fifties_max, count(CASE WHEN Team IS NULL THEN 1 ELSE 0 END) AS Team_null, count(CASE WHEN Player_ID IS NULL THEN 1 ELSE 0 END) AS Player_ID_null, count(CASE WHEN Player IS NULL THEN 1 ELSE 0 END) AS Player_null, count(CASE WHEN Matches IS NULL THEN 1 ELSE 0 END) AS Matches_null, count(CASE WHEN Runs IS NULL THEN 1 ELSE 0 END) AS Runs_null, count(CASE WHEN Wickets IS NULL THEN 1 ELSE 0 END) AS Wickets_null, count(CASE WHEN Hundreds IS NULL THEN 1 ELSE 0 END) AS Hundreds_null, count(CASE WHEN Fifties IS NULL THEN 1 ELSE 0 END) AS Fifties_null, count(CASE WHEN Speciality IS NULL THEN 1 ELSE 0 END) AS Speciality_null, count(distinct(CASE WHEN Team IS NOT NULL THEN Team END)) AS Team_distinct, count(distinct(CASE WHEN Player_ID IS NOT NULL THEN Player_ID END)) AS Player_ID_distinct, count(distinct(CASE WHEN Player IS NOT NULL THEN Player END)) AS Player_distinct, count(distinct(CASE WHEN Matches IS NOT NULL THEN Matches END)) AS Matches_distinct, count(distinct(CASE WHEN Runs IS NOT NULL THEN Runs END)) AS Runs_distinct, count(distinct(CASE WHEN Wickets IS NOT NULL THEN Wickets END)) AS Wickets_distinct, count(distinct(CASE WHEN Hundreds IS NOT NULL THEN Hundreds END)) AS Hundreds_distinct, count(distinct(CASE WHEN Fifties IS NOT NULL THEN Fifties END)) AS Fifties_distinct, count(distinct(CASE WHEN Speciality IS NOT NULL THEN Speciality END)) AS Speciality_distinct, count(*) as count_check FROM public.cricket_data;" } }


SeaTunnel Version

2.3.4

SeaTunnel Config

env {
    parallelism: 2
    job= {
      mode= "BATCH"
    }
  }
  source {
    jdbc {
      url= "jdbc:postgresql://hostname:5432/postgres"
      driver= "org.postgresql.Driver"
      user= "postgres"
      password= ""
      query= "SELECT min(CASE WHEN Player_ID IS NOT NULL THEN Player_ID END) AS Player_ID_min, min(CASE WHEN Matches IS NOT NULL THEN Matches END) AS Matches_min, min(CASE WHEN Runs IS NOT NULL THEN Runs END) AS Runs_min, min(CASE WHEN Wickets IS NOT NULL THEN Wickets END) AS Wickets_min, min(CASE WHEN Hundreds IS NOT NULL THEN Hundreds END) AS Hundreds_min, min(CASE WHEN Fifties IS NOT NULL THEN Fifties END) AS Fifties_min, max(CASE WHEN Player_ID IS NOT NULL THEN Player_ID END) AS Player_ID_max, max(CASE WHEN Matches IS NOT NULL THEN Matches END) AS Matches_max, max(CASE WHEN Runs IS NOT NULL THEN Runs END) AS Runs_max, max(CASE WHEN Wickets IS NOT NULL THEN Wickets END) AS Wickets_max, max(CASE WHEN Hundreds IS NOT NULL THEN Hundreds END) AS Hundreds_max, max(CASE WHEN Fifties IS NOT NULL THEN Fifties END) AS Fifties_max, count(CASE WHEN Team IS NULL THEN 1 ELSE 0 END) AS Team_null, count(CASE WHEN Player_ID IS NULL THEN 1 ELSE 0 END) AS Player_ID_null, count(CASE WHEN Player IS NULL THEN 1 ELSE 0 END) AS Player_null, count(CASE WHEN Matches IS NULL THEN 1 ELSE 0 END) AS Matches_null, count(CASE WHEN Runs IS NULL THEN 1 ELSE 0 END) AS Runs_null, count(CASE WHEN Wickets IS NULL THEN 1 ELSE 0 END) AS Wickets_null, count(CASE WHEN Hundreds IS NULL THEN 1 ELSE 0 END) AS Hundreds_null, count(CASE WHEN Fifties IS NULL THEN 1 ELSE 0 END) AS Fifties_null, count(CASE WHEN Speciality IS NULL THEN 1 ELSE 0 END) AS Speciality_null, count(distinct(CASE WHEN Team IS NOT NULL THEN Team END)) AS Team_distinct, count(distinct(CASE WHEN Player_ID IS NOT NULL THEN Player_ID END)) AS Player_ID_distinct, count(distinct(CASE WHEN Player IS NOT NULL THEN Player END)) AS Player_distinct, count(distinct(CASE WHEN Matches IS NOT NULL THEN Matches END)) AS Matches_distinct, count(distinct(CASE WHEN Runs IS NOT NULL THEN Runs END)) AS Runs_distinct, count(distinct(CASE WHEN Wickets IS NOT NULL THEN Wickets END)) AS Wickets_distinct, count(distinct(CASE WHEN Hundreds IS NOT NULL THEN Hundreds END)) AS Hundreds_distinct, count(distinct(CASE WHEN Fifties IS NOT NULL THEN Fifties END)) AS Fifties_distinct, count(distinct(CASE WHEN Speciality IS NOT NULL THEN Speciality END)) AS Speciality_distinct, count(*) as count_check FROM public.cricket_data_16;"
    }
  }
  sink {
    jdbc {
      url= "jdbc:mysql://hostname:3306/sea_tunnnel_datavalidation"
      driver= "com.mysql.cj.jdbc.Driver"
      user= "root"
      password= "123Welcome$"
      generate_sink_sql= "true"
      database= "sea_tunnnel_datavalidation"
      table= "1439_query_build"
    }
  }

Running Command

./bin/seatunnel.sh --config ./config/1439_query_build -e local

Error Exception

ERROR LOG
------------------------
===============================================================================

2024-03-15 10:06:57,519 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Fatal Error,

2024-03-15 10:06:57,519 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues

2024-03-15 10:06:57,519 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Reason:SeaTunnel job executed failed

2024-03-15 10:06:57,520 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a source for identifier 'jdbc'.
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:100)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:320)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:181)
        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:88)
        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:161)
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:146)
        ... 2 more
Caused by: org.postgresql.util.PSQLException: ERROR: column "player_id" does not exist
  Hint: Perhaps you meant to reference the column "cricket_data_16.Player_ID".
  Position: 22
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2676)
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2366)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:316)
        at org.postgresql.jdbc.PgPreparedStatement.getMetaData(PgPreparedStatement.java:1149)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils.getCatalogTable(CatalogUtils.java:280)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog.getTable(PostgresCatalog.java:283)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils.getCatalogTable(JdbcCatalogUtils.java:194)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils.getTables(JdbcCatalogUtils.java:83)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSource.<init>(JdbcSource.java:57)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceFactory.lambda$createSource$0(JdbcSourceFactory.java:78)
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:112)
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:73)
        ... 7 more

2024-03-15 10:06:57,520 ERROR [o.a.s.c.s.SeaTunnel           ] [main] -
===============================================================================

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a source for identifier 'jdbc'.
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:100)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:320)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:181)
        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:88)
        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:161)
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:146)
        ... 2 more
Caused by: org.postgresql.util.PSQLException: ERROR: column "player_id" does not exist
  Hint: Perhaps you meant to reference the column "cricket_data_16.Player_ID".
  Position: 22
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2676)
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2366)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:316)
        at org.postgresql.jdbc.PgPreparedStatement.getMetaData(PgPreparedStatement.java:1149)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils.getCatalogTable(CatalogUtils.java:280)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog.getTable(PostgresCatalog.java:283)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils.getCatalogTable(JdbcCatalogUtils.java:194)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils.getTables(JdbcCatalogUtils.java:83)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSource.<init>(JdbcSource.java:57)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceFactory.lambda$createSource$0(JdbcSourceFactory.java:78)
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:112)
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:73)
        ... 7 more

------------------------

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 5 months ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] commented 4 months ago

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.