DTStack / chunjun

A data integration framework
https://dtstack.github.io/chunjun/
Apache License 2.0
3.98k stars 1.69k forks source link

[Bug] [connector-mongo] flinksql-mongo查询失败报错 #1445

Open 20100507 opened 1 year ago

20100507 commented 1 year ago

Search before asking

What happened

2022-12-13 15:51:57,022 INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using predefined options: DEFAULT.
2022-12-13 15:51:57,023 INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using default options factory: DefaultConfigurableOptionsFactory{configuredOptions={}}.
2022-12-13 15:51:58,015 WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL query.
        at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:548) ~[flink-sql-client_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:374) ~[flink-sql-client_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:648) ~[flink-sql-client_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:323) ~[flink-sql-client_2.11-1.12.7.jar:1.12.7]
        at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_211]
        at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:214) [flink-sql-client_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:144) [flink-sql-client_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:115) [flink-sql-client_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) [flink-sql-client_2.11-1.12.7.jar:1.12.7]
Caused by: scala.MatchError: null
        at org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:78) ~[chunjun-core.jar:?]
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:91) ~[flink-table-blink_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:44) ~[flink-table-blink_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) ~[flink-table-blink_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:44) ~[flink-table-blink_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) ~[flink-table-blink_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82) ~[flink-table-blink_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) ~[flink-table-blink_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) ~[flink-table-blink_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) ~[flink-table-blink_2.11-1.12.7.jar:1.12.7]

What you expected to happen

我希望可以正常查询。select * from mogodb_source_映射表; 可以正常查询。

How to reproduce

只要查询就报错。

Anything else

No response

Version

master

Are you willing to submit PR?

Code of Conduct

FlechazoW commented 1 year ago

能提供下你的sql吗?

20100507 commented 1 year ago

首先,感谢你的回复; 其次说一下我 使用的版本号: flink1.12.1 和flink1.12.7都测试过了,chunjun 版本为最新release的包。均报错。 还有官方的examples中的样例是错误的 代码中写的uri ,样例给的url。 我的sql如下: ` ``CREATE TABLE device_manage3 ( _id STRING, abc_abc STRING, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb-x', 'uri' = 'mongodb://superadmin:admin@xxxxxx:27017', 'database' = 'alarm', 'collection' = 'device_manage', 'lookup.cache-type' = 'lru' );```

https://github.com/DTStack/chunjun/blob/master/chunjun-examples/sql/mongo/async_join.sql

执行查询:

select * from device_manage3;

ll076110 commented 1 year ago

I have no problem running through the latest code, this is my sql: CREATE TABLE sk( id int, name varchar, gender int, idcard varchar, proc_time AS PROCTIME() )WITH( 'properties.bootstrap.servers'='localtest:9092', 'connector'='kafka-x', 'scan.parallelism'='1', 'format'='json', 'topic'='trest', 'scan.startup.mode'='latest-offset' ); CREATE TABLE sink( id int, name varchar, gender int, idcard varchar )WITH( 'connector'='print' ); CREATE TABLE side( id int, gender int, PRIMARY KEY(id) NOT ENFORCED )WITH( 'database'='test', 'connector'='mongodb-x', 'lookup.cache-type'='None', 'lookup.parallelism'='1', 'lookup.cache.ttl'='10000', 'lookup.cache.max-rows'='10', 'collection'='mongo_lookup', 'uri'='mongodb://localtest:27017' ); INSERT INTO sink SELECT sk.id, name, sk.gender, sk.idcard FROM sk LEFT JOIN side FOR SYSTEM_TIME AS OF sk.proc_time as s ON sk.gender = s.gender WHERE sk.gender = 0

20100507 commented 1 year ago

你的是 mongo 的 sink 表,读取kafka数据写入mongodb,我是读取mongodb数据写入到其他地方。