apache / incubator-hugegraph-toolchain

HugeGraph toolchain - include a series useful graph modules
https://hugegraph.apache.org/
Apache License 2.0
88 stars 91 forks source link

[Bug] Spark Loader insertSuccessCnt statistics incorrect #502

Open liuxiaocs7 opened 1 year ago

liuxiaocs7 commented 1 year ago

Bug Type (问题类型)

others (please comment below)

Spark Loader success but insertSuccessCnt is not right, always be 0.

Before submit

Environment (环境信息)

Expected & Actual behavior (期望与实际表现)

23/08/04 00:11:49 INFO HugeGraphSparkLoader:
 Finished  load example/spark/vertex_software.json  data
23/08/04 00:11:49 INFO DAGScheduler: Job 5 finished: foreachPartition at HugeGraphSparkLoader.java:154, took 1.023988 s
23/08/04 00:11:49 INFO DAGScheduler: ResultStage 3 (foreachPartition at HugeGraphSparkLoader.java:154) finished in 1.021 s
23/08/04 00:11:49 INFO HugeGraphSparkLoader:
 Finished  load example/spark/edge_knows.json  data
23/08/04 00:11:49 INFO DAGScheduler: Job 4 is finished. Cancelling potential speculative or zombie tasks for this job
23/08/04 00:11:49 INFO TaskSchedulerImpl: Killing all running tasks in stage 3: Stage finished
23/08/04 00:11:49 INFO DAGScheduler: Job 4 finished: foreachPartition at HugeGraphSparkLoader.java:154, took 1.025300 s
23/08/04 00:11:49 INFO HugeGraphSparkLoader:
 Finished  load example/spark/vertex_person.json  data
23/08/04 00:11:49 INFO HugeGraphSparkLoader:
 ------------The data load task is complete-------------------

 insertSuccessCnt:       0
 ---------------------------------------------

23/08/04 00:11:49 INFO SparkUI: Stopped Spark web UI at http://192.168.34.164:4040
23/08/04 00:11:49 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/08/04 00:11:49 INFO MemoryStore: MemoryStore cleared
23/08/04 00:11:49 INFO BlockManager: BlockManager stopped
23/08/04 00:11:49 INFO BlockManagerMaster: BlockManagerMaster stopped
23/08/04 00:11:49 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/08/04 00:11:49 INFO SparkContext: Successfully stopped SparkContext
23/08/04 00:11:49 INFO SparkContext: SparkContext already stopped.
23/08/04 00:11:49 INFO SparkContext: SparkContext already stopped.
23/08/04 00:12:49 INFO ShutdownHookManager: Shutdown hook called
23/08/04 00:12:49 INFO ShutdownHookManager: Deleting directory /tmp/spark-41fba547-151e-4a5a-8982-fa9df80978db
23/08/04 00:12:49 INFO ShutdownHookManager: Deleting directory /tmp/spark-8c9b8b45-a9cf-4876-a719-b7446f4e46cd

Vertex/Edge example (问题点 / 边数据举例)

No response

Schema [VertexLabel, EdgeLabel, IndexLabel] (元数据结构)

// Define schema
    schema.propertyKey("name").asText().ifNotExist().create();
    schema.propertyKey("age").asInt().ifNotExist().create();
    schema.propertyKey("city").asText().ifNotExist().create();
    schema.propertyKey("weight").asDouble().ifNotExist().create();
    schema.propertyKey("lang").asText().ifNotExist().create();
    schema.propertyKey("date").asText().ifNotExist().create();
    schema.propertyKey("price").asDouble().ifNotExist().create();

    schema.vertexLabel("person")
            .properties("name", "age", "city")
            .useCustomizeStringId()
            .nullableKeys("age", "city")
            .ifNotExist()
            .create();

    schema.vertexLabel("software")
            .properties("name", "lang", "price")
            .useCustomizeStringId()
            .ifNotExist()
            .create();

    schema.edgeLabel("knows")
            .sourceLabel("person")
            .targetLabel("person")
            .properties("date", "weight")
            .ifNotExist()
            .create();

    schema.edgeLabel("created")
            .sourceLabel("person")
            .targetLabel("software")
            .properties("date", "weight")
            .ifNotExist()
            .create();
{
  "vertices": [
    {
      "label": "person",
      "input": {
        "type": "file",
        "path": "example/spark/vertex_person.json",
        "format": "JSON",
        "header": ["name", "age", "city"],
        "charset": "UTF-8",
        "skipped_line": {
          "regex": "(^#|^//).*"
        }
      },
      "id": "name",
      "null_values": ["NULL", "null", ""]
    },
    {
      "label": "software",
      "input": {
        "type": "file",
        "path": "example/spark/vertex_software.json",
        "format": "JSON",
        "header": ["id","name", "lang", "price","ISBN"],
        "charset": "GBK"
      },
      "id": "name",
      "ignored": ["ISBN"]
    }
  ],
  "edges": [
    {
      "label": "knows",
      "source": ["source_name"],
      "target": ["target_name"],
      "input": {
        "type": "file",
        "path": "example/spark/edge_knows.json",
        "format": "JSON",
        "date_format": "yyyyMMdd",
        "header": ["source_name","target_name", "date", "weight"]
      },
      "field_mapping": {
        "source_name": "name",
        "target_name": "name"
      }
    }
  ]
}
haohao0103 commented 10 months ago

@liuxiaocs7 hi,bro,What is your command to submit the task? Whether the --sink-type is specified. My guess is that insertSuccessCnt only works with bypass load data...