vesoft-inc / nebula-spark-utils

Spark related libraries and tools
23 stars 31 forks source link

Nebula Algorithm #91

Closed herman72 closed 3 years ago

herman72 commented 3 years ago

Nebula Graph 2.0.0 Deployment type (Single-Host) Hardware info Disk in use (HDD) CPU: Intel(R) Xeon(R) CPU E5-2630 v2 @ 2.60GHz * 10 Ram: 64 G

+----+------------------+------------------+----------------+---------+------------+--------------------+-------------+-----------+ | ID | Name | Partition Number | Replica Factor | Charset | Collate | Vid Type | Atomic Edge | Group | +----+------------------+------------------+----------------+---------+------------+--------------------+-------------+-----------+ | 1 | "CallConnection" | 10 | 1 | "utf8" | "utf8_bin" | "FIXED_STRING(80)" | "false" | "default" | +----+------------------+------------------+----------------+---------+------------+--------------------+-------------+-----------+

I wanna use pagerank algorithm. I use below configuration:

{
  # Spark relation config
  spark: {
    app: {
        name: LPA
        # spark.app.partitionNum
        partitionNum:100
    }
    master:local
  }

  data: {
    # data source. optional of nebula,csv,json
    source: nebula
    # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
    sink: nebula
    # if your algorithm needs weight
    hasWeight: false
  }

  # Nebula Graph relation config
  nebula: {
    # algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
    read: {
        # Nebula metad server address, multiple addresses are split by English comma
        metaAddress: "127.0.0.1:9559"
        # Nebula space
        space: CallConnection
        # Nebula edge types, multiple labels means that data from multiple edges will union together
        labels: ["callTO"]
        # Nebula edge property name for each edge type, this property will be as weight col for algorithm.
        # Make sure the weightCols are corresponding to labels.
        # weightCols: ["start_year"]
    }

    # algo result sink into Nebula. If data.sink is nebula, then this nebula.write config can be valid.
    write:{
        # Nebula graphd server address, multiple addresses are split by English comma
        graphAddress: "127.0.0.1:9669"
        # Nebula metad server address, multiple addresses are split by English comma
        metaAddress: "127.0.0.1:9559,127.0.0.1:9560"
        user:user
        pswd:password
        # Nebula space name
        space:CallConnection
        # Nebula tag name, the algorithm result will be write into this tag
        tag:pagerank
    }
  }

#   local: {
#     # algo's data source from Nebula. If data.source is csv or json, then this local.read can be valid.
#     read:{
#         filePath: "hdfs://127.0.0.1:9000/edge/work_for.csv"
#         # srcId column
#         srcId:"_c0"
#         # dstId column
#         dstId:"_c1"
#         # weight column
#         #weight: "col3"
#         # if csv file has header
#         header: false
#         # csv file's delimiter
#         delimiter:","
#     }

#     # algo result sink into local file. If data.sink is csv or text, then this local.write can be valid.
#     write:{
#         resultPath:/tmp/
#     }
#   }

  algorithm: {
    # the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent,
    # labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount,
    # betweenness]
    executeAlgo: pagerank

    # PageRank parameter
    pagerank: {
        maxIter: 2
        resetProb: 0.15  # default 0.15
    }

 }
}

after run spark-submit as this
spark-submit --master local --class com.vesoft.nebula.algorithm.Main nebula-algorithm-2.0.0.jar -p classes/Algorithm.conf

after about several minutes and use all CPU but not RAM,

console

here is the spark UI

40fefd1fbcce4a0f546635ad80e460e936486db7

spark-submit --master local --class com.vesoft.nebula.algorithm.Main nebula-algorithm-2.0.0.jar -p classes/Algorithm.conf 
21/05/19 06:19:53 WARN Utils: Your hostname, orientserver resolves to a loopback address: 127.0.1.1; using 10.10.1.121 instead (on interface ens33)
21/05/19 06:19:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/05/19 06:20:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
log4j:WARN No appenders could be found for logger (com.vesoft.nebula.algorithm.Main$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/05/19 06:20:00 INFO SparkContext: Running Spark version 2.4.7
21/05/19 06:20:00 INFO SparkContext: Submitted application: LPA
21/05/19 06:20:00 INFO SecurityManager: Changing view acls to: orient
21/05/19 06:20:00 INFO SecurityManager: Changing modify acls to: orient
21/05/19 06:20:00 INFO SecurityManager: Changing view acls groups to: 
21/05/19 06:20:00 INFO SecurityManager: Changing modify acls groups to: 
21/05/19 06:20:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(orient); groups with view permissions: Set(); users  with modify permissions: Set(orient); groups with modify permissions: Set()
21/05/19 06:20:00 INFO Utils: Successfully started service 'sparkDriver' on port 36273.
21/05/19 06:20:00 INFO SparkEnv: Registering MapOutputTracker
21/05/19 06:20:00 INFO SparkEnv: Registering BlockManagerMaster
21/05/19 06:20:00 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/05/19 06:20:00 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/05/19 06:20:00 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-e15b4c05-c314-463e-9f06-a1fca904f7e7
21/05/19 06:20:00 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
21/05/19 06:20:01 INFO SparkEnv: Registering OutputCommitCoordinator
21/05/19 06:20:01 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/05/19 06:20:01 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.10.1.121:4040
21/05/19 06:20:01 INFO SparkContext: Added JAR file:/home/orient/NebulaJavaTools/nebula-spark-utils/nebula-algorithm/target/nebula-algorithm-2.0.0.jar at spark://10.10.1.121:36273/jars/nebula-algorithm-2.0.0.jar with timestamp 1621405201288
21/05/19 06:20:01 INFO Executor: Starting executor ID driver on host localhost
21/05/19 06:20:01 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34449.
21/05/19 06:20:01 INFO NettyBlockTransferService: Server created on 10.10.1.121:34449
21/05/19 06:20:01 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/05/19 06:20:01 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.10.1.121, 34449, None)
21/05/19 06:20:01 INFO BlockManagerMasterEndpoint: Registering block manager 10.10.1.121:34449 with 366.3 MB RAM, BlockManagerId(driver, 10.10.1.121, 34449, None)
21/05/19 06:20:01 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.10.1.121, 34449, None)
21/05/19 06:20:01 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.10.1.121, 34449, None)
21/05/19 06:20:01 INFO ReadNebulaConfig$: NebulaReadConfig={space=CallConnection,label=callTO,returnCols=List(),noColumn=true,partitionNum=10}
21/05/19 06:20:02 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/home/orient/NebulaJavaTools/nebula-spark-utils/nebula-algorithm/target/spark-warehouse').
21/05/19 06:20:02 INFO SharedState: Warehouse path is 'file:/home/orient/NebulaJavaTools/nebula-spark-utils/nebula-algorithm/target/spark-warehouse'.
21/05/19 06:20:02 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
21/05/19 06:20:02 INFO NebulaDataSource: create reader
21/05/19 06:20:02 INFO NebulaDataSource: options {spacename=CallConnection, nocolumn=true, metaaddress=127.0.0.1:9559, label=callTO, type=edge, connectionretry=2, timeout=6000, executionretry=1, paths=[], limit=1000, returncols=, partitionnumber=10}
21/05/19 06:20:02 INFO NebulaDataSourceEdgeReader: dataset's schema: StructType(StructField(_srcId,StringType,false), StructField(_dstId,StringType,false), StructField(_rank,LongType,false))
21/05/19 06:20:04 INFO NebulaDataSource: create reader
21/05/19 06:20:04 INFO NebulaDataSource: options {spacename=CallConnection, nocolumn=true, metaaddress=127.0.0.1:9559, label=callTO, type=edge, connectionretry=2, timeout=6000, executionretry=1, paths=[], limit=1000, returncols=, partitionnumber=10}
21/05/19 06:20:04 INFO DataSourceV2Strategy: 
Pushing operators to class com.vesoft.nebula.connector.NebulaDataSource
Pushed Filters: 
Post-Scan Filters: 
Output: _srcId#0, _dstId#1, _rank#2L

21/05/19 06:20:05 INFO CodeGenerator: Code generated in 205.90032 ms
21/05/19 06:20:05 INFO SparkContext: Starting job: fold at VertexRDDImpl.scala:90
21/05/19 06:20:05 INFO DAGScheduler: Registering RDD 9 (mapPartitions at VertexRDD.scala:356) as input to shuffle 3
21/05/19 06:20:05 INFO DAGScheduler: Registering RDD 27 (mapPartitions at VertexRDDImpl.scala:247) as input to shuffle 1
21/05/19 06:20:05 INFO DAGScheduler: Registering RDD 23 (mapPartitions at VertexRDDImpl.scala:247) as input to shuffle 0
21/05/19 06:20:05 INFO DAGScheduler: Registering RDD 31 (mapPartitions at GraphImpl.scala:208) as input to shuffle 2
21/05/19 06:20:05 INFO DAGScheduler: Got job 0 (fold at VertexRDDImpl.scala:90) with 10 output partitions
21/05/19 06:20:05 INFO DAGScheduler: Final stage: ResultStage 4 (fold at VertexRDDImpl.scala:90)
21/05/19 06:20:05 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0, ShuffleMapStage 3)
21/05/19 06:20:05 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0, ShuffleMapStage 3)
21/05/19 06:20:05 INFO DAGScheduler: Submitting ShuffleMapStage 0 (VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[9] at mapPartitions at VertexRDD.scala:356), which has no missing parents
21/05/19 06:20:05 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 16.4 KB, free 366.3 MB)
21/05/19 06:20:05 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 7.6 KB, free 366.3 MB)
21/05/19 06:20:05 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.10.1.121:34449 (size: 7.6 KB, free: 366.3 MB)
21/05/19 06:20:05 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1184
21/05/19 06:20:05 INFO DAGScheduler: Submitting 10 missing tasks from ShuffleMapStage 0 (VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[9] at mapPartitions at VertexRDD.scala:356) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
21/05/19 06:20:05 INFO TaskSchedulerImpl: Adding task set 0.0 with 10 tasks
21/05/19 06:20:06 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 9761 bytes)
21/05/19 06:20:06 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
21/05/19 06:20:06 INFO Executor: Fetching spark://10.10.1.121:36273/jars/nebula-algorithm-2.0.0.jar with timestamp 1621405201288
21/05/19 06:20:06 INFO TransportClientFactory: Successfully created connection to /10.10.1.121:36273 after 44 ms (0 ms spent in bootstraps)
21/05/19 06:20:06 INFO Utils: Fetching spark://10.10.1.121:36273/jars/nebula-algorithm-2.0.0.jar to /tmp/spark-17e5e989-b080-4a41-aac3-f9fd56f03752/userFiles-693448ab-9c10-468b-a2a9-c926b9e62800/fetchFileTemp3358155586060883895.tmp
21/05/19 06:20:06 INFO Executor: Adding file:/tmp/spark-17e5e989-b080-4a41-aac3-f9fd56f03752/userFiles-693448ab-9c10-468b-a2a9-c926b9e62800/nebula-algorithm-2.0.0.jar to class loader
21/05/19 06:20:07 INFO NebulaEdgePartitionReader: partition index: 1, scanParts: List(1)
21/05/19 06:20:07 INFO CodeGenerator: Code generated in 23.996116 ms

after some minutes,

f59f774bab43bc72693d0d1f4b2225eae0a19188

last

just do nothing and all task gone!

Nicole00 commented 3 years ago

The first half of the job is to read data from Nebula, and then algorithm will be executed. It's strange that mapPartitions lasts 35 minutes. How many records of edge callTO ? Please post the result of SUBMIT JOB STATS;SHOW STATS.

If your algorithm's result will be write back to Nebula, please package nebula-algorithm with branch master.

herman72 commented 3 years ago

The first half of the job is to read data from Nebula, and then algorithm will be executed. It's strange that mapPartitions lasts 35 minutes. How many records of edge callTO ? Please post the result of SUBMIT JOB STATS;SHOW STATS.

If your algorithm's result will be write back to Nebula, please package nebula-algorithm with branch master.

Clipboard - May 16, 2021 8_50 AM

yes, I use nebula-algorithm with branch master

Nicole00 commented 3 years ago

You has 682 million edges, and your job was submit using local mode. You can check the log, maybe there's some OOM error.

Please use yarn mode to submit your spark job, and config more memory for executor and driver.

wey-gu commented 3 years ago

Will close it as it's been inactive for days. Feel free to reopen it.