StarRocks / starrocks-connector-for-apache-flink

Apache License 2.0
195 stars 160 forks source link

flink如果设置s3的相关配置会出现jar包冲突问题 #136

Open WholeWorld-Timothy opened 2 years ago

WholeWorld-Timothy commented 2 years ago

当项目中存在如下jar包列表: connect-api-2.7.0.jar fastjson-1.2.68.jar flink-connector-kafka_2.12-1.13.6.jar flink-connector-starrocks-1.2.3_flink-1.13_2.11.jar flink-csv-1.13.6.jar flink-dist_2.12-1.13.6.jar flink-json-1.13.6.jar flink-s3-fs-hadoop-1.14.2.jar flink-shaded-hadoop-uber-3.1.2.jar flink-shaded-zookeeper-3.4.14.jar flink-sql-connector-mysql-cdc-2.2.0.jar flink-table_2.12-1.13.6.jar flink-table-blink_2.12-1.13.6.jar kafka-clients-2.4.1.jar log4j-1.2-api-2.17.1.jar log4j-api-2.17.1.jar log4j-core-2.17.1.jar log4j-slf4j-impl-2.17.1.jar mysql-connector-java-8.0.16.jar flink-connector-starrocks-1.2.3_flink-1.13_2.11.jar 配置中存在如下配置: s3a.endpoint: http://XXX.XXX.XXX s3a.access.key: XXX s3a.secret.key: XXX s3a.path.style.access: true s3a.connection.ssl.enabled: false high-availability.storageDir: s3a://XXX/XXX/ state.checkpoints.dir: s3a://XXX/XXX state.savepoints.dir: s3a://XXX/XXX 启动flink报错,报错如下: org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint StandaloneSessionClusterEntrypoint. ...... Caused by: java.lang.NoSuchMethodError: com.fasterxml.jackson.databind.ObjectMapper.configure(Lcom/fasterxml/jackson/core/JsonParser$Feature;Z)Lcom/fasterxml/jackson/databind/ObjectMapper; ...... 当flink-connector-starrocks-1.2.3_flink-1.13_2.11.jar的pom文件修改如下配置:

com.fasterxml.jackson.annotation com.starrocks.shade.com.fasterxml.jackson.annotation com.fasterxml.jackson.core com.starrocks.shade.com.fasterxml.jackson.core com.fasterxml.jackson.databind com.starrocks.shade.com.fasterxml.jackson.databind

重新打包后,数据写入StarRocks会一直报错,始终无法写入数据库。 当去掉以上s3的配置,删除相关jar包,采用原始flink-connector-starrocks-1.2.3_flink-1.13_2.11.jar,相同的flinksql脚本和相同的表是正常能够写入数据的。

banmoy commented 2 years ago

"重新打包后,数据写入StarRocks会一直报错,始终无法写入数据库。" 错误信息可以发一下吗?

WholeWorld-Timothy commented 2 years ago

pom配置修改如下:

<relocation>
    <pattern>com.fasterxml.jackson.annotation</pattern>
    <shadedPattern>com.starrocks.shade.com.fasterxml.jackson.annotation</shadedPattern>
</relocation>
<relocation>
    <pattern>com.fasterxml.jackson.core</pattern>
    <shadedPattern>com.starrocks.shade.com.fasterxml.jackson.core</shadedPattern>
</relocation>
<relocation>
    <pattern>com.fasterxml.jackson.databind</pattern>
    <shadedPattern>com.starrocks.shade.com.fasterxml.jackson.databind</shadedPattern>
</relocation>

修改配置后,仍然报错,报错如下:

2022-11-08 09:26:27
java.lang.Exception: Could not perform checkpoint 6 for operator Source: TableSourceScan(table=[[]) (1/4)#21.
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1006)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$7(StreamTask.java:958)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: {
    "TxnId": 552405,
    "Label": "96d94eca-c40b-489c-8b62-9cafffcbee7a",
    "Status": "Fail",
    "Message": "all partitions have no load data",
    "NumberTotalRows": 0,
    "NumberLoadedRows": 0,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 274117,
    "LoadTimeMs": 4809,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 1,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 0
}
        at com.starrocks.data.load.stream.DefaultStreamLoadManager.AssertNotException(DefaultStreamLoadManager.java:333)
        at com.starrocks.data.load.stream.DefaultStreamLoadManager.flush(DefaultStreamLoadManager.java:262)
        at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.close(StarRocksDynamicSinkFunctionV2.java:187)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:864)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:843)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:756)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:662)
        ... 4 more
    Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException: {
    "TxnId": 552405,
    "Label": "96d94eca-c40b-489c-8b62-9cafffcbee7a",
    "Status": "Fail",
    "Message": "all partitions have no load data",
    "NumberTotalRows": 0,
    "NumberLoadedRows": 0,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 274117,
    "LoadTimeMs": 4809,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 1,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 0
}
        at com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:287)
        at com.starrocks.data.load.stream.DefaultStreamLoader.lambda$send$2(DefaultStreamLoader.java:116)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 6 for operator Source: TableSourceScan(table=[[]) (1/4)#21. Failure reason: Checkpoint was declined.
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1092)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1076)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:994)
    ... 13 more
Caused by: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: {
    "TxnId": 552405,
    "Label": "96d94eca-c40b-489c-8b62-9cafffcbee7a",
    "Status": "Fail",
    "Message": "all partitions have no load data",
    "NumberTotalRows": 0,
    "NumberLoadedRows": 0,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 274117,
    "LoadTimeMs": 4809,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 1,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 0
}
    at com.starrocks.data.load.stream.DefaultStreamLoadManager.AssertNotException(DefaultStreamLoadManager.java:333)
    at com.starrocks.data.load.stream.DefaultStreamLoadManager.flush(DefaultStreamLoadManager.java:262)
    at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.snapshotState(StarRocksDynamicSinkFunctionV2.java:193)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218)
    ... 23 more
    [CIRCULAR REFERENCE:com.starrocks.data.load.stream.exception.StreamLoadFailException: {
    "TxnId": 552405,
    "Label": "96d94eca-c40b-489c-8b62-9cafffcbee7a",
    "Status": "Fail",
    "Message": "all partitions have no load data",
    "NumberTotalRows": 0,
    "NumberLoadedRows": 0,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 274117,
    "LoadTimeMs": 4809,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 1,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 0
}]
banmoy commented 2 years ago

@WholeWorld-Timothy What's your StarRocks version? "all partitions have no load data" this problem has been fixed since StarRocks 2.3. https://github.com/StarRocks/starrocks/pull/5768

WholeWorld-Timothy commented 2 years ago

StarRocks Version 2.1.10 Can I avoid this error by upgrading the version?

banmoy commented 2 years ago

@WholeWorld-Timothy We have built a connector test case, and verified this problem can be avoided with StarRocks >= 2.3. If you have some concerns, I suggest you can first verify it using a testing cluster with StarRocks >= 2.3, and then upgrade the production cluster.