streamnative / pulsar-flink

Elastic data processing with Apache Pulsar and Apache Flink
Apache License 2.0
278 stars 119 forks source link

[BUG]pulsar-flink catalog options #539

Closed bingfeng2004 closed 2 years ago

bingfeng2004 commented 2 years ago

Describe the bug 1.I used catalog to configure SQL-client and started SQL-client with a message indicating that the format parameter is not supported. `catalogs:

[root@node07 bin]# ./sql-client.sh embedded SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/flink-1.13.6/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] No default environment specified. Searching for '/opt/flink-1.13.6/conf/sql-client-defaults.yaml'...found. Reading default environment from: file:/opt/flink-1.13.6/conf/sql-client-defaults.yaml

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) Caused by: org.apache.flink.table.api.ValidationException: Unable to create catalog 'pulsarCatalog'.

Catalog options are: 'catalog-admin-url'='http://192.168.1.113:8080' 'catalog-service-url'='pulsar://192.168.1.113:6650' 'default-database'='default' 'format'='json' 'type'='pulsar-catalog' at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:270) at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.createCatalog(LegacyTableEnvironmentInitializer.java:217) at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.lambda$initializeCatalogs$1(LegacyTableEnvironmentInitializer.java:120) at java.util.HashMap.forEach(HashMap.java:1288) at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeCatalogs(LegacyTableEnvironmentInitializer.java:117) at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeSessionState(LegacyTableEnvironmentInitializer.java:105) at org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:233) at org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:100) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:91) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88) at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) ... 1 more Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'pulsar-catalog'.

**Unsupported options:

format**

Supported options:

catalog-admin-url catalog-auth-params catalog-auth-plugin catalog-service-url catalog-tenant default-database property-version pulsar-version table-default-partitions at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:398) at org.apache.flink.table.factories.FactoryUtil$CatalogFactoryHelper.validate(FactoryUtil.java:563) at org.apache.flink.streaming.connectors.pulsar.catalog.PulsarCatalogFactory.createCatalog(PulsarCatalogFactory.java:52) at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:267) ... 11 more

Expected behavior A clear and concise description of what you expected to happen.

Screenshots If applicable, add screenshots to help explain your problem.

Additional context The Readme is described in the catalog configuration is not correct, do not agree with the parameter name in the "PulsarCatalogFactoryOptions“ class requirement

catalogs:

nlu90 commented 2 years ago

@bingfeng2004

We reimplemented the catalog for 1.13, 1.14 and master branch. You will use the following option keys to reconfigure your catalog.

catalog-admin-url
catalog-auth-params
catalog-auth-plugin
catalog-service-url
catalog-tenant
default-database
property-version
pulsar-version
table-default-partitions