vesoft-inc / nebula-flink-connector

Flink Connector for Nebula Graph
48 stars 30 forks source link

The Flink job using nebula-flink-connector-3.5.0 cannot run after being submitted to Yarn due to class name conflicts. #97

Open adu-shzz opened 8 months ago

adu-shzz commented 8 months ago

NebulaGraph 环境

BUG 描述

QingZ11 commented 8 months ago

com.vesoft.nebula.client.graph.net.Session 这个 Java 类,在 com.vesoft:nebula-flink-connector:3.5.0.jar 没有定义,只在nebula-java 的 com.vesoft:client:3.5.0.jar 中有定义。大概是你 pom 中对 com.vesoft:client 使用的 scope 的问题。

研发的意思是让你去看看 scope,了解下 maven 依赖的 scope。

adu-shzz commented 8 months ago

maven 依赖的 scope 问题,发现是发生在官方提供的 jar 这边:

刚下载了官方 jar 包和源码,对比后发现,官方提供的 com.vesoft:nebula-flink-connector:3.5.0.jarcom.vesoft:client:3.5.0.jar 存在相同的源码。 image

我们先在使用时,直接排除掉 com.vesoft:client:3.5.0.jar 试一下吧,如果还不行,只能手动处理 com.vesoft:nebula-flink-connector:3.5.0.jar 里的 class 文件,先删掉同名的包了。

Nicole00 commented 8 months ago

我用这个依赖可以运行的,没有类加载的问题,你看下 你pom.xml中的引用方式

  <dependencies>
        <dependency>
            <groupId>com.vesoft</groupId>
            <artifactId>nebula-flink-connector</artifactId>
            <version>3.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.14.4</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>
    </dependencies>
adu-shzz commented 8 months ago

感谢回复,你的方式在我这边还是跑不通,我最终试成功的方式是: 1、maven 引 nebula-flink-connector 时,加 provided,将它从我的 Flink uber-jar 中排除出去,然后单独把 nebula-flink-connector-3.5.0.jar 放到 ${FLINK_HOME}/lib/ 下面。 2、在报错的第 82 行那里,将所有代码格式做换行然后 IDEA 重新编译打包,对,是换行,你没看错。

// 这个位置
at com.XXX.jobs.portraits.XXXJober.initSink(XXXJober.java:82)

换行前是这样

kfkSource.addSink(SinkUtil.<MyVO>nebulaSinkOptions(context, cfgPrefix)
                    .vertexOpts(opts -> opts.setGraphSpace("my_space").setTag("test_tag")
                            .setBatchSize(2000)
                            .setBatchIntervalMs(3000)
                    )
                    .batchExecutor(new MyNebulaBatchExecutor())
                    .buildVertexSinkFunction()
        );

换行后是这样

kfkSource.addSink(
                SinkUtil.<MyVO>nebulaSinkOptions(context, cfgPrefix)
                        .vertexOpts(opts -> opts.setGraphSpace("my_space").setTag("test_tag")
                                .setBatchSize(2000)
                                .setBatchIntervalMs(3000)
                        )
                        .batchExecutor(new MyNebulaBatchExecutor())
                        .buildVertexSinkFunction()
        );

做了这两步,就能成功通过 DataSphere 提交到 Hadoop-Yarn 集群运行了。 怀疑可能和 IDEA 的编译方式或 Flink 类加载机制有关。

Nicole00 commented 8 months ago

第二点太诡异了,我用的https://github.com/vesoft-inc/nebula-flink-connector/blob/master/example/src/main/java/org/apache/flink/FlinkConnectorSinkExample.java 这里的代码本地测试跑的,全部的依赖 就4个,而且格式故意打乱,不影响运行。 image

第一点,你的pom中没有额外引用com.vesoft:client吧,这个不需要单独引入, nebula-flink-connector的包中会带着client的。还有一个 可能要看下flink环境中是否有其他版本的client jar包

adu-shzz commented 8 months ago

第一点,你的pom中没有额外引用com.vesoft:client吧,这个不需要单独引入, nebula-flink-connector的包中会带着client的。还有一个 可能要看下flink环境中是否有其他版本的client jar包

这个我有排查过,有的都去掉了。不是这方面的问题。就感觉挺诡异的。

adu-shzz commented 8 months ago

第一点,你的pom中没有额外引用com.vesoft:client吧,这个不需要单独引入, nebula-flink-connector的包中会带着client的。还有一个 可能要看下flink环境中是否有其他版本的client jar包

对了,com.vesoft:client-3.5.0.jar 中,有一个 com.facebook.thrift 包,这个没有包含在 com.vesoft:nebula-flink-connector:3.5.0.jar 里边,担心 flink 里只引 nebula-flink-connector 不引 client 会有问题,求解决。

Nicole00 commented 8 months ago

对了,com.vesoft:client-3.5.0.jar 中,有一个 com.facebook.thrift 包,这个没有包含在 com.vesoft:nebula-flink-connector:3.5.0.jar 里边,担心 flink 里只引 nebula-flink-connector 不引 client 会有问题,求解决。

这个不需要单独引用client是因为你已经引用了nebula-flink-connector,这个包里面包含了 client。