apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.86k stars 1.77k forks source link

[Bug] [KafkaStreamTable] Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. #2149

Open zhangyuge1 opened 2 years ago

zhangyuge1 commented 2 years ago

Search before asking

What happened

image image

SeaTunnel Version

2.1.2

SeaTunnel Config

env {
  # You can set flink configuration here
  execution.parallelism = 1
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  #"name":"jack";"age":12
  #"name":"Mike";"age":14
    KafkaTableStream {
        consumer.bootstrap.servers = "127.0.0.1:9092"
        consumer.group.id = "seatunnel1"
        offset.reset = earliest
        topics = "source"
        result_table_name = "test"
        format.type = csv
        schema = "[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\",\"type\":\"int\"}]"
        format.field-delimiter = ","
    }

  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
}

transform {
    sql {
      sql = "select name,age from test"
    }

  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
}

sink {
    kafka {
      topics = "sink"
      producer.bootstrap.servers = "127.0.0.1:9092"
    }
  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
}

Running Command

TestContainer

Error Exception

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

adudu137 commented 2 years ago

What is your sample data?

adudu137 commented 2 years ago

是不是缺少了org.apache.flink.table.factories.TableSourceFactory类,所以对应的应该是flink连接JDBC的包。去官网上下载你对应flink版本的连接JDBC的包

zhangyuge1 commented 2 years ago

是不是缺少了org.apache.flink.table.factories.TableSourceFactory类,所以对应的应该是flink连接JDBC的包。去官网上下载你对应flink版本的连接JDBC的包

我用的是Kakfa Connector,数据类型是csv

Hisoka-X commented 2 years ago

Seem like this problem happened when spilt connector jar from core jar. Then flink can't find right kafka factory implement class when use SPI. We should checkout why flink can't find kafka source factory after connector jar splited. If you want use kafka you can use 2.1.1. Also can help us to fix it.😁

ZhouYuling commented 1 year ago

解决思路如下: 1.Could not find a suitable table factory,需要检查flink-1.xx.x/lib目录下是否存在jdbc相关包 2.No factory supports all properties,检查参数是否正确。 以flink-1.16.2的jdbc连接mysql 5.6.43为例,我依次丢了这些包到lib目录下 mysql-connector-java-8.0.29.jar, flink-connector-jdbc-1.16.2.jar, flink-table-common-1.16.2.jar 但是并不管用,还是一样的报错,我检查我的代码: CREATE TABLE products ( id INT, name string, description string, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector.type'='jdbc', 'connector.url'='jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC', 'connector.table'='products', 'connector.driver'='com.mysql.cj.jdbc.Driver', 'connector.port' = '3306', 'connector.username'='root', 'connector.password'='root' ); 去官网上查看Table API Connectors的说明文档,我修改了代码,代码如下,可以正常运行了 CREATE TABLE products ( id INT, name string, description string, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC', 'table-name' = 'products', 'username'='root', 'password'='root' ); 希望对你有帮助