apache / flink-cdc

Flink CDC is a streaming data integration tool
https://nightlies.apache.org/flink/flink-cdc-docs-stable
Apache License 2.0
5.68k stars 1.93k forks source link

Postgres Join 同步报错 #23

Closed ddusen closed 4 years ago

ddusen commented 4 years ago

Flink SQL

'connector' = 'postgres-cdc', 'hostname' = '127.0.0.1', 'port' = '5432', 'username' = 'postgres', 'password' = 'postgres', 'database-name' = 'seesaw_boh_test_report', 'schema-name' = 'public' , 'table-name' = 'store_caches', 'debezium.slot.name' = 'store_caches' );

create table sales_ticket_amounts ( id bigint primary key, partner_id bigint , scope_id bigint , bus_date date , bus_date_week date , bus_date_month date , bus_date_year date , store_id bigint , eticket_id bigint , channel_id bigint , channel_name varchar(50) , order_type varchar(15) , order_type_name varchar(15) , order_time timestamp , refunded boolean , gross_amount numeric(38, 16) , net_amount numeric(38, 16) , discount_amount numeric(38, 16) , tip numeric(38, 16) , package_fee numeric(38, 16) , delivery_fee numeric(38, 16) , service_fee numeric(38, 16) , tax_fee numeric(38, 16) , other_fee numeric(38, 16) , pay_amount numeric(38, 16) , rounding numeric(38, 16) , overflow_amount numeric(38, 16) , change_amount numeric(38, 16) , product_count bigint , accessory_count bigint , eticket_count bigint , created TIMESTAMP
) WITH (

'connector' = 'postgres-cdc', 'hostname' = '127.0.0.1', 'port' = '5432', 'username' = 'postgres', 'password' = 'postgres', 'database-name' = 'seesaw_boh_test_report', 'schema-name' = 'public' , 'table-name' = 'sales_ticket_amounts', 'debezium.slot.name' = 'sales_ticket_amounts' );

--目标表

create table store_sales ( bus_date date, store_id bigint, gross_amount numeric(38, 16), net_amount numeric(38, 16), discount_amount numeric(38, 16), tip numeric(38, 16), package_fee numeric(38, 16), delivery_fee numeric(38, 16), service_fee numeric(38, 16), tax_fee numeric(38, 16), other_fee numeric(38, 16), pay_amount numeric(38, 16), rounding numeric(38, 16), overflow_amount numeric(38, 16), change_amount numeric(38, 16), order_count bigint, product_count bigint, accessory_count bigint, gross_amount_returned numeric(38, 16), net_amount_returned numeric(38, 16), discount_amount_returned numeric(38, 16), tip_returned numeric(38, 16), package_fee_returned numeric(38, 16), delivery_fee_returned numeric(38, 16), service_fee_returned numeric(38, 16), tax_fee_returned numeric(38, 16), other_fee_returned numeric(38, 16), pay_amount_returned numeric(38, 16), rounding_returned numeric(38, 16), overflow_amount_returned numeric(38, 16), change_amount_returned numeric(38, 16), order_count_returned bigint, PRIMARY KEY(bus_date, store_id) NOT ENFORCED ) WITH (

'connector' = 'jdbc', 'url' = 'jdbc:postgresql://127.0.0.1:5432/seesaw_boh_test_report', 'username' = 'postgres', 'password' = 'postgres' , 'table-name' = 'store_sales' )

--Flink Job INSERT INTO store_sales( bus_date,store_id,gross_amount,net_amount,discount_amount,tip,package_fee,delivery_fee, service_fee,tax_fee,other_fee,pay_amount,rounding,overflow_amount,change_amount,order_count, product_count,accessory_count,gross_amount_returned,net_amount_returned,discount_amount_returned, tip_returned,package_fee_returned,delivery_fee_returned,service_fee_returned,tax_fee_returned, other_fee_returned,pay_amount_returned,rounding_returned,overflow_amount_returned,change_amount_returned, order_count_returned ) SELECT sales_ticket_amounts.bus_date AS bus_date, store_caches.id AS store_id, SUM(CASE WHEN refunded THEN 0 ELSE gross_amount END) AS gross_amount, SUM(net_amount) AS net_amount, SUM(CASE WHEN refunded THEN 0 ELSE discount_amount END) AS discount_amount, SUM(CASE WHEN refunded THEN 0 ELSE tip END) AS tip, SUM(CASE WHEN refunded THEN 0 ELSE package_fee END) AS package_fee, SUM(CASE WHEN refunded THEN 0 ELSE delivery_fee END) AS delivery_fee, SUM(CASE WHEN refunded THEN 0 ELSE service_fee END) AS service_fee, SUM(CASE WHEN refunded THEN 0 ELSE tax_fee END) AS tax_fee, SUM(CASE WHEN refunded THEN 0 ELSE other_fee END) AS other_fee, SUM(CASE WHEN refunded THEN 0 ELSE pay_amount END) AS pay_amount, SUM(CASE WHEN refunded THEN 0 ELSE rounding END) AS rounding, SUM(CASE WHEN refunded THEN 0 ELSE overflow_amount END) AS overflow_amount, SUM(CASE WHEN refunded THEN 0 ELSE change_amount END) AS change_amount, SUM(CASE WHEN refunded THEN 0 ELSE eticket_count END) AS order_count, SUM(CASE WHEN refunded THEN 0 ELSE product_count END) AS product_count, SUM(CASE WHEN refunded THEN 0 ELSE accessory_count END) AS accessory_count, SUM(CASE WHEN refunded THEN gross_amount ELSE 0 END) AS gross_amount_returned, SUM(CASE WHEN refunded THEN net_amount ELSE 0 END) AS net_amount_returned, SUM(CASE WHEN refunded THEN discount_amount ELSE 0 END) AS discount_amount_returned, SUM(CASE WHEN refunded THEN tip ELSE 0 END) AS tip_returned, SUM(CASE WHEN refunded THEN package_fee ELSE 0 END) AS package_fee_returned, SUM(CASE WHEN refunded THEN delivery_fee ELSE 0 END) AS delivery_fee_returned, SUM(CASE WHEN refunded THEN service_fee ELSE 0 END) AS service_fee_returned, SUM(CASE WHEN refunded THEN tax_fee ELSE 0 END) AS tax_fee_returned, SUM(CASE WHEN refunded THEN other_fee ELSE 0 END) AS other_fee_returned, SUM(CASE WHEN refunded THEN pay_amount ELSE 0 END) AS pay_amount_returned, SUM(CASE WHEN refunded THEN rounding ELSE 0 END) AS rounding_returned, SUM(CASE WHEN refunded THEN overflow_amount ELSE 0 END) AS overflow_amount_returned, SUM(CASE WHEN refunded THEN change_amount ELSE 0 END) AS change_amount_returned, SUM(CASE WHEN refunded THEN eticket_count ELSE 0 END) AS order_count_returned FROM sales_ticket_amounts LEFT JOIN store_caches ON sales_ticket_amounts.store_id = store_caches.id GROUP BY sales_ticket_amounts.bus_date, store_caches.id


### 版本
- Java : 1.8.0_181
- System: MacOS 10.15.5
- Flink : 1.11.1
- Flink-CDC: 

flink-sql-connector-postgres-cdc-1.1.0-SNAPSHOT.jar flink-format-changelog-json-1.0.0.jar


### 报错信息
![image](https://user-images.githubusercontent.com/23182508/91250693-393da380-e78c-11ea-9ac4-de1c324936f4.png)

```log
⋊> ~/D/f/log cat flink-dusen-sql-client-dusendeMacBook-Pro.local.log                                                                 11:09:13
2020-08-26 11:08:39,060 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, localhost
2020-08-26 11:08:39,064 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123
2020-08-26 11:08:39,064 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 1600m
2020-08-26 11:08:39,064 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2020-08-26 11:08:39,064 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 3
2020-08-26 11:08:39,064 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: parallelism.default, 3
2020-08-26 11:08:39,065 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-08-26 11:08:39,100 INFO  org.apache.flink.core.fs.FileSystem                          [] - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2020-08-26 11:08:44,160 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Loading FallbackYarnSessionCli
2020-08-26 11:08:44,161 INFO  org.apache.flink.table.client.gateway.local.LocalExecutor    [] - Using default environment file: file:/Users/dusen/Downloads/flink-1.11.1/conf/sql-client-defaults.yaml
2020-08-26 11:08:44,397 INFO  org.apache.flink.table.client.config.entries.ExecutionEntry  [] - Property 'execution.restart-strategy.type' not specified. Using default value: fallback
2020-08-26 11:08:45,246 INFO  org.apache.flink.table.client.gateway.local.ExecutionContext [] - Executor config: {taskmanager.memory.process.size=1728m, jobmanager.execution.failover-strategy=region, jobmanager.rpc.address=localhost, execution.target=remote, jobmanager.memory.process.size=1600m, jobmanager.rpc.port=6123, execution.savepoint.ignore-unclaimed-state=false, execution.attached=true, execution.shutdown-on-attached-exit=false, pipeline.jars=[file:/Users/dusen/Downloads/flink-1.11.1/opt/flink-sql-client_2.11-1.11.1.jar], parallelism.default=3, taskmanager.numberOfTaskSlots=3, pipeline.classpaths=[]}
2020-08-26 11:08:45,249 INFO  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.
2020-08-26 11:08:45,469 INFO  org.apache.flink.table.client.cli.CliClient                  [] - Command history file path: /Users/dusen/.flink-sql-history
2020-08-26 11:09:10,518 WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL statement.
    at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:579) ~[flink-sql-client_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:515) ~[flink-sql-client_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:596) ~[flink-sql-client_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:315) ~[flink-sql-client_2.11-1.11.1.jar:1.11.1]
    at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_181]
    at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212) [flink-sql-client_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) [flink-sql-client_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) [flink-sql-client_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) [flink-sql-client_2.11-1.11.1.jar:1.11.1]
Caused by: java.lang.NoSuchMethodError: com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.<init>(Lorg/apache/flink/table/types/logical/RowType;Lorg/apache/flink/api/common/typeinfo/TypeInformation;Lcom/alibaba/ververica/cdc/debezium/table/RowDataDebeziumDeserializeSchema$ValueValidator;Ljava/time/ZoneId;)V
    at com.alibaba.ververica.cdc.connectors.postgres.table.PostgreSQLTableSource.getScanRuntimeProvider(PostgreSQLTableSource.java:101) ~[flink-sql-connector-postgres-cdc-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
    at org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:69) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:91) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:44) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:44) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:76) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecJoin.translateToPlanInternal(StreamExecJoin.scala:121) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecJoin.translateToPlanInternal(StreamExecJoin.scala:49) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecJoin.translateToPlan(StreamExecJoin.scala:49) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:76) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:119) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:52) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlan(StreamExecGroupAggregate.scala:52) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.getPipeline(StreamTableEnvironmentImpl.java:327) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$createPipeline$1(ExecutionContext.java:284) ~[flink-sql-client_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255) ~[flink-sql-client_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.client.gateway.local.ExecutionContext.createPipeline(ExecutionContext.java:281) ~[flink-sql-client_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) ~[flink-sql-client_2.11-1.11.1.jar:1.11.1]
    ... 8 more
wuchong commented 4 years ago

It seems that there is class conflicts in your class path. Please check whether there is multiple flink-cdc-connectors with different version.

ddusen commented 4 years ago
wuchong commented 4 years ago

flink-sql-connector-mysql-cdc-1.0.0.jarflink-sql-connector-postgres-cdc-1.1.0-SNAPSHOT.jar 是有冲突的。

ddusen commented 4 years ago

原来是 mysql-cdc 和 postgres-cdc-1.1.0 有冲突。 未来的版本会解决这个问题嘛?

之后可能会同时使用 mysql 和 postgres

wuchong commented 4 years ago

@Sdu0 我的意思是,你这两个 jar 的版本不一致啊。版本一致就没问题了。

ddusen commented 4 years ago

我明白了,多谢 👍