apache / flink-cdc

Flink CDC is a streaming data integration tool
https://nightlies.apache.org/flink/flink-cdc-docs-stable
Apache License 2.0
5.67k stars 1.92k forks source link

FlinkSQL中创建的表结构信息在SparkSQL中不可见 #1064

Closed whitecloud6688 closed 2 years ago

whitecloud6688 commented 2 years ago

FlinkSQL中创建的表结构信息在SparkSQL中不可见

FlinkSQL中创建的表结构信息在SparkSQL中不可见、字段的注释信息不可见,导致在SparkSQL中不能查询FlinkSQL定义的表数据。

FlinkSQL中创建hudi表和mysql表,在FlinkSQL能够看到表结构信息。 [flink]$ bin/sql-client.sh Flink SQL> CREATE TABLE hudi_orders_mor_b (

order_id INT comment '订单编号',
order_date TIMESTAMP(0) comment '订单时间',
customer_name STRING comment '客户名称',
price DECIMAL(10, 5) comment '价格',
product_id INT comment '产品编号',
order_status BOOLEAN comment '订单状态',
PRIMARY KEY (order_id) NOT ENFORCED    

) COMMENT '订单表' WITH ( 'connector' = 'hudi', 'table.type' = 'MERGE_ON_READ', 'path' = '/hive/warehouse/hzga.db/hudi_orders_mor_s', 'read.streaming.enabled' = 'false', 'read.streaming.check-interval' = '3', 'hive_sync.enable' = 'true', 'changelog.enable' = 'true' ); [INFO] Execute statement succeed.

Flink SQL> desc hudi_orders_mor_b; +---------------+----------------+-------+---------------+--------+-----------+ | name | type | null | key | extras | watermark | +---------------+----------------+-------+---------------+--------+-----------+ | order_id | INT | false | PRI(order_id) | | | | order_date | TIMESTAMP(0) | true | | | | | customer_name | STRING | true | | | | | price | DECIMAL(10, 5) | true | | | | | product_id | INT | true | | | | | order_status | BOOLEAN | true | | | | +---------------+----------------+-------+---------------+--------+-----------+ 6 rows in set

Flink SQL> CREATE TABLE mysql_orders (

order_id INT comment '订单编号', order_date TIMESTAMP(0) comment '订单时间', customer_name STRING comment '客户名称', price DECIMAL(10, 5) comment '价格', product_id INT comment '产品编号', order_status BOOLEAN comment '订单状态', PRIMARY KEY (order_id) NOT ENFORCED ) COMMENT '订单表' WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'vm01', 'port' = '3306', 'username' = 'test', 'password' = 'passwd', 'database-name' = 'mydb', 'table-name' = 'orders' ); [INFO] Execute statement succeed.

Flink SQL> desc mysql_orders; +---------------+----------------+-------+---------------+--------+-----------+ | name | type | null | key | extras | watermark | +---------------+----------------+-------+---------------+--------+-----------+ | order_id | INT | false | PRI(order_id) | | | | order_date | TIMESTAMP(0) | true | | | | | customer_name | STRING | true | | | | | price | DECIMAL(10, 5) | true | | | | | product_id | INT | true | | | | | order_status | BOOLEAN | true | | | | +---------------+----------------+-------+---------------+--------+-----------+ 6 rows in set

在SparkSQL中,查看不到相应的表结构信息,同时无法读取表的数据 $ bin/spark-sql \

--master spark://vm01:7077,vm02:7077,vm03:7077 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.executor.cores=1' \ --conf 'spark.cores.max=2' \ --conf 'spark.executor.memory=2g' \ --conf 'spark.driver.memory=2g' 22/04/10 10:59:50 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist 22/04/10 10:59:57 WARN metastore: Failed to connect to the MetaStore Server... Spark master: spark://vm01:7077,vm02:7077,vm03:7077, Application Id: app-20220410105953-0011 spark-sql (default)> use mydb1; ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.8ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.8Response code Time taken: 4.222 seconds spark-sql (default)> desc hudi_orders_mor_b; col_name data_type comment Time taken: 0.325 seconds spark-sql (default)> desc mysql_orders; col_name data_type comment Time taken: 0.09 seconds spark-sql (default)>

版本信息: spark-3.1.2-bin-hadoop3.2 flink-1.13.6 hudi-0.10.1 flink-cdc-2.2.0 apache-hive-3.1.2-bin zookeeper-3.4.14 hadoop-3.2.2 scala-2.12.10 java-1.8.0_321

部分Jar包信息: $ ls /opt/app/spark/jars/ spark-avro_2.12-3.1.2.jar hudi-spark3.1.2-bundle_2.12-0.10.1.jar

$ ls /opt/app/flink/lib/ hive-exec-3.1.2.jar flink-connector-files-1.13.6.jar flink-clients_2.12-1.13.6.jar commons-cli-1.5.0.jar flink-runtime_2.12-1.13.6.jar flink-shaded-hadoop-3-3.1.1.7.2.9.0-173-9.0.jar flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar flink-sql-connector-hive-3.1.2_2.12-1.13.6.jar flink-sql-parser-hive-1.13.6.jar flink-table-common-1.13.6.jar flink-table-runtime-blink_2.12-1.13.6.jar flink-yarn_2.12-1.13.6.jar flink-table-uber-blink_2.12-1.13.6.jar hudi-flink-bundle_2.12-0.10.1.jar flink-connector-hive_2.12-1.13.6.jar flink-connector-jdbc_2.12-1.13.6.jar flink-sql-connector-mysql-cdc-2.2.0.jar flink-sql-connector-postgres-cdc-2.2.0.jar flink-streaming-scala_2.12-1.13.6.jar flink-streaming-java_2.12-1.13.6.jar flink-sql-connector-elasticsearch7_2.12-1.13.6.jar postgresql-42.2.12.jar hudi-hive-sync-bundle-0.10.1.jar flink-hcatalog_2.12-1.13.6.jar

$ ls /opt/app/hive/lib/ mysql-connector-java-8.0.16.jar mysql-connector-java-5.1.49.jar hudi-hadoop-mr-bundle-0.10.1.jar

$ ls /opt/app/hive/auxlib/ hudi-hadoop-mr-bundle-0.10.1.jar

whitecloud6688 commented 2 years ago

可能是我理解错了,FlinkSQL中创建的表,在SparkSQL中是不可读取数据的,如果要读,需要创建外部表关联到FlinkSQL表的路径。所以用desc Flink表是看不到字段的,但是能看到 Table Properties 中的 flink.schema 描述。