vesoft-inc / nebula-algorithm

Nebula-Algorithm is a Spark Application based on GraphX, which enables state of art Graph Algorithms to run on top of NebulaGraph and write back results to NebulaGraph.
71 stars 39 forks source link

louvain 跑不出结果 也没有错 #4

Closed riskgod closed 2 years ago

riskgod commented 2 years ago

hey 我在用自带的 louvain来跑,但是没有结果(在nebula里 没有 louvain 这个tag),以下是我的设置,请求社区帮助。谢谢~

space

CREATE SPACE louvain (partition_num=15, replica_factor=1, vid_type=FIXED_STRING(256)) comment="louvain test";

tag

create tag user() comment = 'user';

edge

create edge relation(weight int)  COMMENT = 'user relation'; 

nebula 示例边数据

(root@nebula) [louvain]> FETCH PROP ON relation "1000961" -> "0";
+-------------------------------------------+
| edges_                                    |
+-------------------------------------------+
| [:relation "1000961"->"0" @0 {weight: 1}] |
+-------------------------------------------+

louvain的设置

{
    # Spark相关配置
    spark: {
    app: {
        name: LPA
        # Spark分片数量
        partitionNum:100
    }
    master:local
    }

    data: {
    # 数据源,可选值为nebula、csv、json。
    source: nebula
    # 数据落库,即图计算的结果写入的目标,可选值为nebula、csv、json。
    sink: nebula
    # 算法是否需要权重。
    hasWeight: true
    }

    # Nebula Graph相关配置
    nebula: {
    # 数据源。Nebula Graph作为图计算的数据源时,nebula.read的配置才生效。
    read: {
        # 所有Meta服务的IP地址和端口,多个地址用英文逗号(,)分隔。格式: "ip1:port1,ip2:port2"。
        # 使用docker-compose部署,端口需要填写docker-compose映射到外部的端口
        # 可以用`docker-compose ps`查看
        metaAddress: "127.0.0.1:9559"
        # Nebula Graph图空间名称
        space: louvain
        # Nebula Graph Edge type, 多个labels时,多个边的数据将合并。
        labels: ["relation"]
        # Nebula Graph每个Edge type的属性名称,此属性将作为算法的权重列,请确保和Edge type对应。
        weightCols: ["weight"]
    }

    # 数据落库。图计算结果落库到Nebula Graph时,nebula.write的配置才生效。
    write:{
        graphAddress: "127.0.0.1:9669"
        metaAddress: "127.0.0.1:9559"
        user:root
        pswd:nebula
        space:louvain
        tag:louvain
    }
    }  

    algorithm: {
    # 需要执行的算法,可选值为:pagerank、louvain、connectedcomponent、
    # labelpropagation、shortestpaths、degreestatic、kcore、
    # stronglyconnectedcomponent、trianglecount、betweenness
    executeAlgo: louvain

    # PageRank参数
    pagerank: {
        maxIter: 10
        resetProb: 0.15  # 默认为0.15
    }

    # Louvain参数
    louvain: {
        maxIter: 20
        internalIter: 10
        tol: 0.5
    }

   # ConnectedComponent/StronglyConnectedComponent参数
   connectedcomponent: {
       maxIter: 20
   }

   # LabelPropagation参数
   labelpropagation: {
       maxIter: 20
   }

    # ShortestPath参数
    shortestpaths: {
        # several vertices to compute the shortest path to all vertices.
        landmarks: "1"
    }

    # DegreeStatic参数
    degreestatic: {}

    # KCore参数
    kcore:{
        maxIter:10
        degree:1
    }

    # TriangleCount参数
    trianglecount:{}

    # BetweennessCentrality参数
    betweenness:{
        maxIter:5
    }
}
}

spark 命令

nohup /apphome/test/spark-2.4.8-bin-hadoop2.7/bin/spark-submit --master local[32] --driver-memory 64g --class com.vesoft.nebula.algorithm.Main /root/nebula-spark-utils/nebula-algorithm/target/nebula-algorithm-2.5.1.jar -p /apphome/test/nebula-spark-utils/nebula-algorithm/src/main/resources/application.conf &
wey-gu commented 2 years ago

看您是 nohup 了 spark-submit,这个 submit 的输出(log)有提示什么吗?

riskgod commented 2 years ago

看您是 nohup 了 spark-submit,这个 submit 的输出(log)有提示什么吗?

没有 就是正常的spark log 没有任何err 。

wey-gu commented 2 years ago

@Nicole00 能帮看下么? 这里 sink 到 nebula,没有结果,有什么需要做的么?如果 sink 没有成功为什么没有报错呢? 谢谢!

Nicole00 commented 2 years ago

看您是 nohup 了 spark-submit,这个 submit 的输出(log)有提示什么吗?

没有 就是正常的spark log 没有任何err 。

你好,你的nebula服务是如何安装的,rpm还是docker? sink 配置的nebula但不存在louvain这个tag,这里肯定会有错误的。 你可以把日志贴上来看下

riskgod commented 2 years ago

sink 配置的nebula但不存在louvain这个tag,这里肯定会有错误的。 你可以把日志贴上来看下

hey 我是 rpm 安装的,我来创建下 nebula的 tag试试看 ,有tag的创建语句吗 求个

wey-gu commented 2 years ago

@riskgod as talked offline, drop me some more input on sample data and graph schema lines(DDL), I will reproduce the issue or help provide a workable guideline from my side.

Thanks

wey-gu commented 2 years ago

Here is an example that I performed a Louvain algorithm, it's basically what I had done in this post: https://siwei.io/nebula-livejournal/

Nebula Console:


CREATE SPACE louvain (partition_num=15, replica_factor=1, vid_type=FIXED_STRING(256)) comment="louvain test";

create tag user() comment = 'user';

create edge relation(weight int) COMMENT = 'user relation';

INSERT VERTEX user() VALUE "110":(), "211": (), "312": (), "413": (), "514":(), "615":(); INSERT VERTEX user() VALUE "710":(), "811": (), "912": (), "1013": (), "1114":(), "1215":(); INSERT VERTEX user() VALUE "1310":(), "1411": (), "1512": (), "1613": (), "1714":(), "1815":(); INSERT VERTEX user() VALUE "1910":(), "2011": (), "2112": (), "2213": (), "2314":(), "2415":(); INSERT VERTEX user() VALUE "2510":(), "2611": (), "2712": (), "2813": (), "2914":(), "3015":(); INSERT VERTEX user() VALUE "3110":(), "3211": (), "3312": (), "3413": (), "3514":(), "3615":(); INSERT VERTEX user() VALUE "3710":(), "3811": (), "3912": (), "4013": (), "4114":(), "4215":(); INSERT VERTEX user() VALUE "4310":(), "4411": (), "4512": (), "4613": (), "4714":(), "4815":(); INSERT VERTEX user() VALUE "4910":(), "5011": (), "5112": (), "5213": (), "5314":(), "5415":(); INSERT VERTEX user() VALUE "5510":(), "5611": (), "5712": (), "5813": (), "5914":(), "6015":();

INSERT EDGE relation(weight) VALUES "110"->"211":(1); INSERT EDGE relation(weight) VALUES "312"->"710":(2); INSERT EDGE relation(weight) VALUES "811"->"912":(3); INSERT EDGE relation(weight) VALUES "1013"->"1114":(4); INSERT EDGE relation(weight) VALUES "1215"->"1310":(5); INSERT EDGE relation(weight) VALUES "1411"->"1512":(6); INSERT EDGE relation(weight) VALUES "1613"->"1714":(7); INSERT EDGE relation(weight) VALUES "1815"->"1910":(8); INSERT EDGE relation(weight) VALUES "2011"->"2112":(9); INSERT EDGE relation(weight) VALUES "2213"->"2314":(10); INSERT EDGE relation(weight) VALUES "2415"->"2510":(11); INSERT EDGE relation(weight) VALUES "2611"->"2712":(12); INSERT EDGE relation(weight) VALUES "2813"->"6015" :(13); INSERT EDGE relation(weight) VALUES "2914"->"3015":(14); INSERT EDGE relation(weight) VALUES "3110"->"3211":(15); INSERT EDGE relation(weight) VALUES "3312"->"3413":(16); INSERT EDGE relation(weight) VALUES "3514"->"3615":(17); INSERT EDGE relation(weight) VALUES "3710"->"3811":(18); INSERT EDGE relation(weight) VALUES "3912"->"4013":(19); INSERT EDGE relation(weight) VALUES "4114"->"4215":(20); INSERT EDGE relation(weight) VALUES "413"->"514":(21); INSERT EDGE relation(weight) VALUES "615"->"710":(22); INSERT EDGE relation(weight) VALUES "811"->"912":(23); INSERT EDGE relation(weight) VALUES "1013"->"1114":(24); INSERT EDGE relation(weight) VALUES "1215"->"1310":(25); INSERT EDGE relation(weight) VALUES "1411"->"1512":(26); INSERT EDGE relation(weight) VALUES "1613"->"1714":(27); INSERT EDGE relation(weight) VALUES "1815"->"1910":(28); INSERT EDGE relation(weight) VALUES "2011"->"2112":(29); INSERT EDGE relation(weight) VALUES "2213"->"2314":(30); INSERT EDGE relation(weight) VALUES "2415"->"2510":(31); INSERT EDGE relation(weight) VALUES "2611"->"2712":(32); INSERT EDGE relation(weight) VALUES "2813"->"2914":(33); INSERT EDGE relation(weight) VALUES "3015"->"3110":(34); INSERT EDGE relation(weight) VALUES "3211"->"3312":(35); INSERT EDGE relation(weight) VALUES "3413"->"3514":(36); INSERT EDGE relation(weight) VALUES "3615"->"3710":(37); INSERT EDGE relation(weight) VALUES "3811"->"3912":(38); INSERT EDGE relation(weight) VALUES "4013"->"4114":(39); INSERT EDGE relation(weight) VALUES "4215"->"4310":(40); INSERT EDGE relation(weight) VALUES "4411"->"4512":(41); INSERT EDGE relation(weight) VALUES "4613"->"4714":(42); INSERT EDGE relation(weight) VALUES "4815"->"4910":(43); INSERT EDGE relation(weight) VALUES "5011"->"5112":(44); INSERT EDGE relation(weight) VALUES "5213"->"5314":(45); INSERT EDGE relation(weight) VALUES "5415"->"710":(46); INSERT EDGE relation(weight) VALUES "811"->"912":(47); INSERT EDGE relation(weight) VALUES "1013"->"1114":(48); INSERT EDGE relation(weight) VALUES "1215"->"1310":(49); INSERT EDGE relation(weight) VALUES "1411"->"1512":(50); INSERT EDGE relation(weight) VALUES "1613"->"1714":(51); INSERT EDGE relation(weight) VALUES "1815"->"1910":(52); INSERT EDGE relation(weight) VALUES "2011"->"2112":(53); INSERT EDGE relation(weight) VALUES "2213"->"2314":(54); INSERT EDGE relation(weight) VALUES "2415"->"2510":(55); INSERT EDGE relation(weight) VALUES "2611"->"2712":(56); INSERT EDGE relation(weight) VALUES "2813"->"2914":(57); INSERT EDGE relation(weight) VALUES "3015"->"3110":(58); INSERT EDGE relation(weight) VALUES "3211"->"3312":(59); INSERT EDGE relation(weight) VALUES "3413"->"3514":(60); INSERT EDGE relation(weight) VALUES "3615"->"3710":(61); INSERT EDGE relation(weight) VALUES "3811"->"3912":(62); INSERT EDGE relation(weight) VALUES "4013"->"4114":(63); INSERT EDGE relation(weight) VALUES "4215"->"5510":(64); INSERT EDGE relation(weight) VALUES "5611"->"5712":(65); INSERT EDGE relation(weight) VALUES "5813"->"5914":(66);


> create algorithm env in docker

```bash
cd ~
mkdir -p test/nebula-algorithm
cd test/nebula-algorithm

docker run --name spark-master --network nebula-docker-compose_nebula-net \
    -h spark-master -e ENABLE_INIT_DAEMON=false -d \
    -v ${HOME}/test/nebula-algorithm/:/root \
    bde2020/spark-master:2.4.5-hadoop2.7

wget https://repo1.maven.org/maven2/com/vesoft/nebula-algorithm/2.6.2/nebula-algorithm-2.6.2.jar
vim algo-louvain.conf

docker exec -it spark-master bash
cd /root

/spark/bin/spark-submit --master "local" --conf spark.rpc.askTimeout=6000s \
    --class com.vesoft.nebula.algorithm.Main \
    --driver-memory 16g nebula-algorithm-2.6.2.jar \
    -p algo-louvain.conf
...

22/01/10 10:34:05 INFO TaskSetManager: Starting task 0.0 in stage 796.0 (TID 1000, localhost, executor driver, partition 0, ANY, 7767 bytes)
22/01/10 10:34:05 INFO Executor: Running task 0.0 in stage 796.0 (TID 1000)
22/01/10 10:34:05 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks including 2 local blocks and 0 remote blocks
22/01/10 10:34:05 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
22/01/10 10:34:05 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
22/01/10 10:34:05 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
22/01/10 10:34:05 INFO FileOutputCommitter: Saved output of task 'attempt_20220110103405_0796_m_000000_1000' to file:/output/_temporary/0/task_20220110103405_0796_m_000000
22/01/10 10:34:05 INFO SparkHadoopMapRedUtil: attempt_20220110103405_0796_m_000000_1000: Committed
22/01/10 10:34:05 INFO Executor: Finished task 0.0 in stage 796.0 (TID 1000). 1654 bytes result sent to driver
22/01/10 10:34:05 INFO TaskSetManager: Finished task 0.0 in stage 796.0 (TID 1000) in 120 ms on localhost (executor driver) (1/1)
22/01/10 10:34:05 INFO TaskSchedulerImpl: Removed TaskSet 796.0, whose tasks have all completed, from pool
22/01/10 10:34:05 INFO DAGScheduler: ResultStage 796 (csv at AlgoWriter.scala:53) finished in 0.147 s
22/01/10 10:34:05 INFO DAGScheduler: Job 22 finished: csv at AlgoWriter.scala:53, took 0.309399 s
22/01/10 10:34:05 INFO FileFormatWriter: Write Job 658734e4-ce53-4ca8-92cd-6d7f9421fc54 committed.
22/01/10 10:34:05 INFO FileFormatWriter: Finished processing stats for write job 658734e4-ce53-4ca8-92cd-6d7f9421fc54.
22/01/10 10:34:05 INFO SparkContext: Invoking stop() from shutdown hook
22/01/10 10:34:05 INFO SparkUI: Stopped Spark web UI at http://spark-master:4040
22/01/10 10:34:06 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/01/10 10:34:06 INFO MemoryStore: MemoryStore cleared
22/01/10 10:34:06 INFO BlockManager: BlockManager stopped
22/01/10 10:34:06 INFO BlockManagerMaster: BlockManagerMaster stopped
22/01/10 10:34:06 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/01/10 10:34:06 INFO SparkContext: Successfully stopped SparkContext
22/01/10 10:34:06 INFO ShutdownHookManager: Shutdown hook called
22/01/10 10:34:06 INFO ShutdownHookManager: Deleting directory /tmp/spark-0bdbb853-cbc8-4876-827c-937e1748f6a9
22/01/10 10:34:06 INFO ShutdownHookManager: Deleting directory /tmp/spark-0ab58035-9c32-4b2e-a649-4a34d67a4d06

bash-5.0# ls -l /output/
total 4
-rw-r--r--    1 root     root             0 Jan 10 10:34 _SUCCESS
-rw-r--r--    1 root     root           192 Jan 10 10:34 part-00000-01c6be1f-b8be-4e68-b708-156bab8ec2b6-c000.csv
bash-5.0# head /output/part-00000-01c6be1f-b8be-4e68-b708-156bab8ec2b6-c000.csv
_id,louvain
6015,2813
2914,2813
2813,2813
3514,3015
3413,3015
3312,3015
4114,3015
3211,3015

ref:

algo-louvain.conf


{
# Spark relation config
spark: {
app: {
name: louvain
# spark.app.partitionNum
partitionNum:10
}
master:local
}

data: {

data source. optional of nebula,csv,json

source: nebula
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
sink: csv
# if your algorithm needs weight
hasWeight: true

}

Nebula Graph relation config

nebula: {

algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.

read: {
    # Nebula metad server address, multiple addresses are split by English comma
    metaAddress: "172.20.0.3:9559"
    # Nebula space
    space: louvain
    # Nebula edge types, multiple labels means that data from multiple edges will union together
    labels: ["relation"]
    # Nebula edge property name for each edge type, this property will be as weight col for algorithm.
    # Make sure the weightCols are corresponding to labels.
    weightCols: ["weight"]
}

# algo result sink into Nebula. If data.sink is nebula, then this nebula.write config can be valid.
write:{
    # Nebula graphd server address, multiple addresses are split by English comma
    graphAddress: "graphd:9669"
    # Nebula metad server address, multiple addresses are split by English comma
    metaAddress: "172.20.0.3:9559,172.20.0.4:9559,172.20.0.2:9559"
    user:root
    pswd:nebula
    # Nebula space name
    space:algo
    # Nebula tag name, the algorithm result will be write into this tag
    tag:louvain
    type:insert
}

}

local: {

algo's data source from Nebula. If data.source is csv or json, then this local.read can be valid.

read:{
    filePath: "hdfs://10.1.1.168:9000/edge/work_for.csv"
    # srcId column
    srcId:"_c0"
    # dstId column
    dstId:"_c1"
    # weight column
    #weight: "col3"
    # if csv file has header
    header: false
    # csv file's delimiter
    delimiter:","
}

# algo result sink into local file. If data.sink is csv or text, then this local.write can be valid.
write:{
    resultPath:/output/
}

}

algorithm: {

the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent,

# labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount,
# betweenness]
executeAlgo: louvain

# Louvain parameter
louvain: {
    maxIter: 20
    internalIter: 10
    tol: 0.5

}

connected component parameter.

connectedcomponent: {
    maxIter: 20

}

LabelPropagation parameter

labelpropagation: {
    maxIter: 20

}

ShortestPaths parameter

shortestpaths: {
    # several vertices to compute the shortest path to all vertices.
    landmarks: "1"

}

# Vertex degree statistics parameter
degreestatic: {}

KCore parameter

kcore:{ maxIter:10 degree:1 }

Trianglecount parameter

trianglecount:{}

Betweenness centrality parameter

betweenness:{ maxIter:5 } } }

wey-gu commented 2 years ago

You could firstly try sink to file and use my conf/ insert data to quickly verify everything else is fine, then add sink of nebula afterwards, finally use large volume of data in final go(after previous attempts succeeded)

wey-gu commented 2 years ago

@riskgod how is going now?