vesoft-inc / nebula-spark-utils

Spark related libraries and tools
23 stars 31 forks source link

sst import data error "Keys must be added in strict ascending order" #145

Closed Codelone closed 3 years ago

Codelone commented 3 years ago

this is spark job log driver.log space sst_test image tag idno image data example 230421197906123305,0,客户0,0,2021/1/1,女,博士,离异,1,black 230421197906123305,1,客户1,1,2021/1/1,女,博士,离异,1,black 230421197906123305,2,客户2,2,2021/1/1,女,博士,离异,1,black 533101196807196696,3,客户3,3,2021/1/1,女,博士,离异,1,black 533101196807196696,4,客户4,4,2021/1/1,女,博士,离异,1,black 220722198306264943,5,客户5,5,2021/1/1,女,博士,离异,1,black 220722198306264943,6,客户6,6,2021/1/1,女,博士,离异,1,black 220722198306264943,7,客户7,7,2021/1/1,女,博士,离异,1,black 310200197802274261,8,客户8,8,2021/1/1,女,博士,离异,1,black 310200197802274261,9,客户9,9,2021/1/1,女,博士,离异,1,black edge address_id image data example `贵州省黔东南苗族侗族自治州榕江县镜湖花园22栋177号,230421197906123305,2019-02-20 08:02:20

贵州省黔东南苗族侗族自治州榕江县镜湖花园22栋177号,230421197906123305,2019-01-30 13:26:01 贵州省黔东南苗族侗族自治州榕江县镜湖花园22栋177号,230421197906123305,2019-03-31 03:19:30 四川省泸州市龙马潭区锋尚名居8栋734号,533101196807196696,2019-05-14 22:31:58 四川省泸州市龙马潭区锋尚名居8栋734号,533101196807196696,2020-02-20 06:09:22 河北省邢台市巨鹿县市聚福园16栋228号,220722198306264943,2020-12-11 12:23:53 河北省邢台市巨鹿县市聚福园16栋228号,220722198306264943,2020-01-08 05:19:52 河北省邢台市巨鹿县市聚福园16栋228号,220722198306264943,2020-04-29 09:23:55 浙江省丽水地区青田县尚书苑8栋271号,310200197802274261,2019-09-03 01:40:32 浙江省丽水地区青田县尚书苑8栋271号,310200197802274261,2019-04-23 18:37:49`

application-sst.conf

  # Spark relation config
  spark: {
    app: {
      name: Nebula Exchange 2.5
    }

    master:local

    driver: {
      cores: 1
      maxResultSize: 1G
      memory: 1G
    }

    executor: {
      memory: 4G
      instances: 1
      cores: 10
    }

    cores:{
      max: 48
    }
  }

  # Nebula Graph relation config
  nebula: {
    address:{
      graph:["xxxx:9669"]
      meta:["xxxx:9559"]
    }
    user: root
    pswd: nebula
    space: sst_test

    # parameters for SST import, not required
    path:{
      local: "/tmp"
      remote: "/user/frms/sst"
      hdfs.namenode: "hdfs://xxxx:8020"
    }

    # nebula client connection parameters
    connection {
      timeout: 3000
      retry: 3
    }

    # nebula client execution parameters
    execution {
      retry: 3
    }

    error: {
      # max number of failures, if the number of failures is bigger than max, then exit the application.
      max: 32
      # failed import job will be recorded in output path
      output: /user/frms/error
    }

    # use google's RateLimiter to limit the requests send to NebulaGraph
    rate: {
      # the stable throughput of RateLimiter
      limit: 1024
      # Acquires a permit from RateLimiter, unit: MILLISECONDS
      # if it can't be obtained within the specified timeout, then give up the request.
      timeout: 1000
    }
  }

  # Processing tags
  # There are tag config examples for different dataSources.
  tags: [

    # csv
    {
      name: address
      type: {
        source: csv
        sink: sst
      }
      path: "hdfs://xxxx:8020/user/frms/nebula-data/address.csv"
      # if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields
      fields: []
      nebula.fields: []
      vertex: {
        field: _c0
        # policy: "hash"
      }
      separator: ","
      header: false
      batch: 2560
      partition: 32
    }

    {
      name: idno
      type: {
        source: csv
        sink: sst
      }
      path: "hdfs://xxxx:8020/user/frms/nebula-data/id.csv"
      fields: [_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9]
      nebula.fields: [a1,a2,a3,a4,a5,a6,a7,a8,a9]
      vertex: {
        field: _c0
        # policy: "hash"
      }
      separator: ","
      header: false
      batch: 2560
      partition: 32
    }

  ]

  # Processing edges
  # There are edge config examples for different dataSources.
  edges: [

    # csv
    {
      name: address_id
      type: {
        source: csv
        sink: sst
      }
      path: "hdfs://xxxx:8020/user/frms/nebula-data/addressid.csv"
      fields: [_c2]
      nebula.fields: [create_time]
      source: _c0
      target: _c1
      # ranking: rank
      separator: ","
      header: false
      batch: 2560
      partition: 32
    }

  ]
}

run command

./spark-2.4.8-bin-hadoop2.6/bin/spark-submit --master yarn-client --class com.vesoft.nebula.exchange.Exchange nebula-exchange-2.5-SNAPSHOT.jar -c application-sst.conf

Nicole00 commented 3 years ago

According to your nebula schema and Exchange configuration, I re-operate your process with the same Nebula Schema, the same data, the same Exchange configuration and the same submit command, but I cannot get the same error message. And the result looks normal and correct. INFO [nebula.exchange.Exchange$$anonfun$main$3:apply:197] SST-Import: failure.address_id: 0

Can you please re-operate the process just for edge address_id?

Codelone commented 3 years ago

this is only edge address_id edge.log actually,the vertex "idno" also has the same error,you can look the driver.log 。 Do you want to use my data ,but maybe too big。

Nicole00 commented 3 years ago

this is only edge address_id edge.log actually,the vertex "idno" also has the same error,you can look the driver.log 。 Do you want to use my data ,but maybe too big。

We can try your big scale data, please upload your data here, thank you very much.

Codelone commented 3 years ago

github not allow upload ,please download from dingding ,https://space.dingtalk.com/s/gwHOAxnJcALOEVwinwPaACBjNmFjNGNmYTRkYTg0MjAyYWM5MzFhYThiNTM2ODM4Mw 密码: aWVs

Nicole00 commented 3 years ago

github not allow upload ,please download from dingding ,https://space.dingtalk.com/s/gwHOAxnJcALOEVwinwPaACBjNmFjNGNmYTRkYTg0MjAyYWM5MzFhYThiNTM2ODM4Mw 密码: aWVs

Thanks for the issue and the test data. We have reproduced the bug, and it is because that sst file requires strict ascending order, and your data has many common key, which cause the failure of writing sst file. For now, you can distinct your csv data according to vid (for edge, distinct it according to src & dst). We will fix it soon. Thanks again.

Codelone commented 3 years ago

now, The key of data must be unique ?

Nicole00 commented 3 years ago

now, The key of data must be unique ?

Yes. If you are not urgent, you can wait for our fix pr, working on it.

Nicole00 commented 3 years ago

https://github.com/vesoft-inc/nebula-spark-utils/pull/150 fixed this issue.

Codelone commented 3 years ago

(for edge, distinct it according to src & dst).

I notice that the edges are duplicated according to the src and dst, and the different rank maybe error,Is it possible?

Nicole00 commented 3 years ago

(for edge, distinct it according to src & dst).

I notice that the edges are duplicated according to the src and dst, and the different rank maybe error,Is it possible?

your edge config in Exchange config file does not define the rank field. Its enough to distinct edge data according to src and dst.

Codelone commented 3 years ago

(for edge, distinct it according to src & dst).

I notice that the edges are duplicated according to the src and dst, and the different rank maybe error,Is it possible?

your edge config in Exchange config file does not define the rank field. Its enough to distinct edge data according to src and dst.

if my data have rank ,Is it necessary to ensure that src and dst do not duplicate or just ensure src ,dst and rank not duplicate ?in the real data,we will hava rank, and maybe more the same src,dst but rank different。

Thericecookers commented 3 years ago

I have already merge my branch(fork from v2.5) with #150, but this problem is still existed when exchange generate edge sst files. However, when I used another way to avoid key duplicated like below:

class NebulaSSTWriter(path: String) extends Writer {
...
  private var lastKey: Array[Byte] = Array[Byte](0)
  def write(key: Array[Byte], value: Array[Byte]): Unit = {
    if (!key.sameElements(lastKey)) {
        writer.put(key, value)
    }
    lastKey = key
  }
}

By Using this, My problem was disappear. It's wield. Im using hash policy when generate key , is it because hash collision?