vesoft-inc / nebula-java

Client API and data importer of Nebula Graph in Java
Apache License 2.0
164 stars 121 forks source link

[ent 3.6 zone]when upload to hdfs report error #558

Closed jinyingsunny closed 8 months ago

jinyingsunny commented 8 months ago

when upload to hdfs from ent 3.6 with zone on,then report error the same sst_application.conf when change to ent-3.5 without zone,then it is ok. errors:

23/10/11 18:27:21 INFO SparkContext: Created broadcast 2 from csv at FileBaseReader.scala:86
23/10/11 18:27:21 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4195699 bytes, open cost is considered as scanning 4194304 bytes.
Exception in thread "main" com.facebook.thrift.protocol.TProtocolException: Expected protocol id ffffff82 but got 15
    at com.facebook.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:485)
    at com.vesoft.nebula.meta.MetaService$Client.recv_verifyClientVersion(MetaService.java:4364)
    at com.vesoft.nebula.meta.MetaService$Client.verifyClientVersion(MetaService.java:4341)
    at com.vesoft.nebula.client.meta.MetaClient.getClient(MetaClient.java:156)
    at com.vesoft.nebula.client.meta.MetaClient.doConnect(MetaClient.java:127)
    at com.vesoft.nebula.client.meta.MetaClient.connect(MetaClient.java:116)
    at com.vesoft.exchange.common.MetaProvider.<init>(MetaProvider.scala:57)
    at com.vesoft.nebula.exchange.processor.VerticesProcessor.process(VerticesProcessor.scala:111)
    at com.vesoft.nebula.exchange.Exchange$.$anonfun$main$2(Exchange.scala:154)
    at com.vesoft.nebula.exchange.Exchange$.$anonfun$main$2$adapted(Exchange.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:126)
    at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
23/10/11 18:27:22 INFO SparkContext: Invoking stop() from shutdown hook
23/10/11 18:27:22 INFO SparkUI: Stopped Spark web UI at http://192.168.10.152:4040
23/10/11 18:27:22 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

内核版本:nebula-ent 3.6 rc 开启了zone

exchange版本:master代码:origin/release-3.3

执行命令:

/Users/jinying.liu/Documents/software/spark/bin/spark-submit  --master "local" --conf spark.sql.shuffle.partitions=200 --class com.vesoft.nebula.exchange.Exchange  /Users/jinying.liu/Documents/code/nebula-exchange/nebula-exchange_spark_3.0/target/nebula-exchange_spark_3.0-3.0.0.jar -c /Users/jinying.liu/Documents/code/nebula-exchange/sst_application.conf

配置文件sst_application.conf 详情:

{
  # Spark 相关配置
  spark: {
    app: {
      name: NebulaGraph Exchange 3.0.0
    }

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    executor: {
        memory:1G
    }

    cores:{
      max: 16
    }
  }

   # Nebula Graph 相关配置
  nebula: {
    address:{
      # 指定 Graph 服务和所有 Meta 服务的 IP 地址和端口。
      # 如果有多台服务器,地址之间用英文逗号(,)分隔。
      # 格式:"ip1:port","ip2:port","ip3:port"
      graph:["192.168.8.202:9669","192.168.8.237:9237","192.168.8.239:9239"]
      meta:["192.168.8.202:9559","192.168.8.237:9109","192.168.8.239:9129"]
    }

    # 指定拥有 Nebula Graph 写权限的用户名和密码。
    user: root
    pswd: nebula

    # 指定图空间名称。
    space: bas

  # SST 文件相关配置
    path:{ 
        # 本地临时存放生成的 SST 文件的目录
        local:"/tmp"

        # SST 文件在 HDFS 的存储路径
        remote:"/sunny/bug_fix"

        # HDFS 的 NameNode 地址
        hdfs.namenode: "hdfs://192.168.8.131:9000"
    }

    # 客户端连接参数
    connection: {
      # socket 连接、执行的超时时间,单位:毫秒。
      timeout: 30000
    }

    error: {
      # 最大失败数,超过后会退出应用程序。
      max: 32
      # 失败的导入作业将记录在输出路径中。
      output: /tmp/errors
    }

    # 使用Google Guava RateLimiter 来限制发送到 NebulaGraph 的请求。
    rate: {
      # RateLimiter 的稳定吞吐量。
      limit: 1024

      # 从 RateLimiter 获取允许的超时时间,单位:毫秒
      timeout: 1000
    }
  }

 # 处理点
  tags: [
    # 设置 Tag player 相关信息。
    {
      # 指定 Nebula Graph 中定义的 Tag 名称。
      name: player
      type: {
        # 指定数据源,使用 CSV。
        source: csv

        # 指定如何将点数据导入 Nebula Graph:Client 或 SST。
        sink: sst
      }

      # 指定 CSV 文件的路径。
      # 如果文件存储在 HDFS 上,用双引号括起路径,以 hdfs://开头,例如"hdfs://ip:port/xx/xx"。
      # 如果文件存储在本地,用双引号括起路径,以 file://开头,例如"file:///tmp/xx.csv"。
      path: "file:///Users/jinying.liu/Downloads/dataset2/vertex_player.csv"

      # 如果 CSV 文件没有表头,使用 [_c0, _c1, _c2, ..., _cn] 表示其表头,并将列指示为属性值的源。
      # 如果 CSV 文件有表头,则使用实际的列名。
      fields: [_c1, _c2]

      # 指定 Nebula Graph 中定义的属性名称。
      # fields 与 nebula.fields 的顺序必须一一对应。
      nebula.fields: [age, name]

      # 指定一个列作为 VID 的源。
      # vertex 的值必须与上述 fields 或者 csv.fields 中的列名保持一致。
      # 目前,Nebula Graph master仅支持字符串或整数类型的 VID。
      vertex: {
        field:_c0
        # policy:hash
      }

      # 指定的分隔符。默认值为英文逗号(,)。
      separator: ","

      # 如果 CSV 文件有表头,请将 header 设置为 true。
      # 如果 CSV 文件没有表头,请将 header 设置为 false。默认值为 false。
      header: false

      # 指定单批次写入 Nebula Graph 的最大点数量。
      batch: 256

      # 指定 Spark 分片数量。
      partition: 32

      # 生成 SST 文件时是否要基于 NebulaGraph 中图空间的 partition 进行数据重分区。
      repartitionWithNebula: true
    }

    # 如果需要添加更多点,请参考前面的配置进行添加。
  ]

  # 处理边
  edges: [
    # 设置 Edge type follow 相关信息。
    {
      # 指定 Nebula Graph 中定义的 Edge type 名称。
      name: follow
      type: {
        # 指定数据源,使用 CSV。
        source: csv

        # 指定如何将点数据导入 Nebula Graph:Client 或 SST。
        sink: sst
      }

      # 指定 CSV 文件的路径。
      # 如果文件存储在 HDFS 上,用双引号括起路径,以 hdfs://开头,例如"hdfs://ip:port/xx/xx"。
      # 如果文件存储在本地,用双引号括起路径,以 file://开头,例如"file:///tmp/xx.csv"。
      path: "file:///Users/jinying.liu/Downloads/dataset2/edge_follow.csv"

      # 如果 CSV 文件没有表头,使用 [_c0, _c1, _c2, ..., _cn] 表示其表头,并将列指示为属性值的源。
      # 如果 CSV 文件有表头,则使用实际的列名。
      fields: [_c2]

      # 指定 Nebula Graph 中定义的属性名称。
      # fields 与 nebula.fields 的顺序必须一一对应。
      nebula.fields: [degree]

      # 指定一个列作为起始点和目的点的源。
      # vertex 的值必须与上述 fields 或者 csv.fields 中的列名保持一致。
      # 目前,Nebula Graph 3.1.0仅支持字符串或整数类型的 VID。
      source: {
        field: _c0
      }
      target: {
        field: _c1
      }

      # 指定的分隔符。默认值为英文逗号(,)。
      separator: ","

      # 指定一个列作为 rank 的源(可选)。

      #ranking: rank

      # 如果 CSV 文件有表头,请将 header 设置为 true。
      # 如果 CSV 文件没有表头,请将 header 设置为 false。默认值为 false。
      header: false

      # 指定单批次写入 Nebula Graph 的最大边数量。
      batch: 256

      # 指定 Spark 分片数量。
      partition: 32

      # 生成 SST 文件时是否要基于 NebulaGraph 中图空间的 partition 进行数据重分区。
      repartitionWithNebula: true
    }
  ]
    # 如果需要添加更多边,请参考前面的配置进行添加。

}
Nicole00 commented 8 months ago

when use meta client to connect the metad server,thrift incompatibility error happens image

jinyingsunny commented 8 months ago

the error is because i enabled_meta_ssl so exchange is also need to set ssl config and cert。

by the way,with enabled_meta_ssl close,and zone on,it is succeed.