apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.43k stars 957 forks source link

[Bug] When use log.system=kakfa.Report java.lang.NoSuchFiledRrror: callback #1888

Closed bugmenoway closed 1 year ago

bugmenoway commented 1 year ago

Search before asking

Paimon version

Compute Engine

Minimal reproduce step

  1. 添加依赖 jar
add jars
paimon-flink-1.17-0.4.0-incubating.jar
flink-connector-base-1.17.1.jar
flink-connector-files-1.17.1.jar
flink-connector-kafka-1.17.1.jar
flink-sql-connector-kafka-1.17.0.jar
flink-shaded-hadoop-2-uber-2.7.3.jar
其他官方 flink-1.17.1-bin-scala_2.12.tgz 安装包内lib自带jar
  1. 启动集群,sql-client(略),设置paimon 仓库catalog
    
    CREATE CATALOG my_catalog WITH (
    'type'='paimon',
    'warehouse'='hdfs:///tmp/paimon'
    );

USE CATALOG my_catalog;

3.  构建 paimon 表及 kakfa日志表
```typescript
CREATE  TABLE word_10 (
    word STRING
);

CREATE  TABLE word_11 (
    word STRING
);

CREATE  TABLE word_12 (
    word STRING
)   WITH (
   'log.system'='kafka',
   'kafka.bootstrap.servers'='host1:9092.host2:9092.host3:9092',
   'kafka.topic'='test12',
   'scan,startup.mode'='earliest-offset',
   'write-mode'='append-only'
);
  1. 执行数据操作
    • opt1
      
      SET 'execution.checkpointing.interval' = '10 s';
      SET 'sql-client.execution.result-mode' = 'tableau';
      SET 'execution.runtime-mode' = 'streaming';

INSERT INTO TABLE word_12 VALUES('ca12412');


- opt2
```typescript
SET 'execution.checkpointing.interval' = '10 s';
SET 'sql-client.execution.result-mode' = 'tableau';
SET 'execution.runtime-mode' = 'streaming';

SELECT * FROM  word_12;

INSERT INTO TABLE word_11 SELECT * FROM word_12;

- opt4
```typescript
SET 'execution.checkpointing.interval' = '10 s';
SET 'sql-client.execution.result-mode' = 'tableau';
SET 'execution.runtime-mode' = 'streaming';

INSERT INTO TABLE word_10 VALUES('ca12492');
INSERT INTO TABLE word_11 SELECT * FROM  word_10;
SELECT * FROM  word_10;
SELECT * FROM  word_11;

What doesn't meet your expectations?

以上操作中只有 opt4 成功了。 对于log.system=kafka表的操作,只有ddl可以成功,其他相关的读写都会提示如下错误:

 Caused by: java.lang.NoSuchFiledRrror: callback
     at org.apache.paimon.flink.kafka.KafkaSinkFunction.open(KafkaSinkFunction.java:71)
     at org.apache.flink.api.common.function.util.FunctionUtils.openFunction(FunctionUtils.java:34)
     at org.apache.paimon.flink.sink.RowDataStoreWriteOperator.open(RowDataStoreWriteOperator.java:126)
     at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOperators(RegularOperatorChain.java:107)
     ...

Anything else?

  1. 请问这个异常应该如何解决?是否可以通过修改配置文件或依赖实现适配
  2. 哪里可以看到更详细的paimon相关文档?尤其是关于其基本使用方法,详细配置的部分

辛苦大佬解惑!!! ^_^

Are you willing to submit a PR?

liming30 commented 1 year ago

@bugmenoway Thanks for your reporting.

请问这个异常应该如何解决?是否可以通过修改配置文件或依赖实现适配

We use the class org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer to write to kafka. From the exception stack, I guess that there may be class conflicts in multiple jars in your classpath, and the callback field does not exist in the wrong FlinkKafkaProducer class. You can check whether there are multiple org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer classes in the classpath, and if so, keep only the correct one.

哪里可以看到更详细的paimon相关文档?尤其是关于其基本使用方法,详细配置的部分

All information about paimon can be found at here. At the same time, It can also be communicated through email and Dingding group (10880001919)

Alibaba-HZY commented 1 year ago

remove flink-connector-kafka-1.17.1.jar

bugmenoway commented 1 year ago

remove flink-connector-kafka-1.17.1.jar

我也发现了解决方法,日志显示FlinkKafkaProducer调用的是org.apache.kafka内的方法,说明优先级 flink-connector-kafka-1.17.1.jar 更高了。 org.apache.paimon.flink.kafka.KafkaSinkFunction 内需要org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.Callback 和 org.apache.kafka.clients.producer.Callback 不同就报错了。 移除flink-connector-kafka-1.17.1.jar 成功解决。 多谢!!