apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.43k stars 957 forks source link

[Bug] Kafka sync table reported "Not authorized to access group" error #1724

Closed ChenShuai1981 closed 1 year ago

ChenShuai1981 commented 1 year ago

Search before asking

Paimon version

0.5.0-SNAPSHOT

Compute Engine

Flink 1.15.2

Minimal reproduce step

  1. setup canal to sync mysql binlog to aliyun kafka topic
  2. register "test" consumer group on aliyun kafka instance
  3. startup a flink cluster
  4. execute paimon action job with flink run
  5. the job failed with error message
$ ./bin/flink run \
>     ./lib/paimon-flink-action-0.5-SNAPSHOT.jar \
>     kafka-sync-table \
>     --warehouse oss://odps-prd/rtdp/paimon \
>     --database flink \
>     --table top_trans_order \
>     --partition-keys c_year,c_month \
>     --primary-keys ord_id,c_year,c_month \
>     --computed-column 'c_year=year(create_time)' \
>     --computed-column 'c_month=month(create_time)' \
>     --kafka-conf properties.bootstrap.servers=192.168.24.63:9092,192.168.24.65:9092,192.168.24.64:9092 \
>     --kafka-conf topic=top_stag.top_trans_order \
>     --kafka-conf properties.group.id=test \
>     --kafka-conf value.format=canal-json \
>     --catalog-conf metastore=hive \
>     --catalog-conf uri=thrift://localhost:9083 \
>     --table-conf bucket=4 \
>     --table-conf changelog-producer=input \
>     --table-conf sink.parallelism=4
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/admin/dev/flink-1.15.2/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/admin/dev/hadoop-2.10.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Not authorized to access group: bc9dc7ed-0dd7-4beb-a754-8099b9f42dbc
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
    at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: bc9dc7ed-0dd7-4beb-a754-8099b9f42dbc

What doesn't meet your expectations?

It seems the parameter "properties.group.id" not take effective in fact. The program auto help me generate a random UUID string as group id but this random generated group id DOES NOT register ahead in aliyun kafka instance so it reported "Not authorized to access group"

Anything else?

No response

Are you willing to submit a PR?

zhuangchong commented 1 year ago

I went to solve this problem and let the kafka-conf configuration take effect.