vesoft-inc / nebula

A distributed, fast open-source graph database featuring horizontal scalability and high availability
https://nebula-graph.io
Apache License 2.0
10.8k stars 1.2k forks source link

Encounter error "Unable to activate object" when there are multiple threads / concurrent tasks in Spark #5784

Closed sparkle-apt closed 8 months ago

sparkle-apt commented 10 months ago

Settings

Hardware and software overview

NebulaGraph Database deployment

Computation cluster

Graph data

Others

Issues

When trying scanning full graph data such as count() as shown in the snippet below on the EMR machine, we encountered Unable to activate object error. Snippet:

import org.apache.spark.sql.DataFrame
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}

val ec2_public_ip = "xx.xx.xx.xx"

val config = NebulaConnectionConfig.builder().withMetaAddress(s"${ec2_public_ip}:9559").withConnectionRetry(2).build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig.builder().withSpace("acct2asset_20231130").withLabel("USES").withNoColumn(false).withReturnCols(List()).withPartitionNum(10).build()
val dataset = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()

dataset.count()

Error log:

23/12/15 07:59:57 WARN TaskSetManager: Lost task 1.0 in stage 10.0 (TID 103) (xx.xx.xx.xx executor 23): com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
    at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:100)
    at com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator.next(ScanEdgeResultIterator.java:142)
    at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow(NebulaReader.scala:263)
    at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow$(NebulaReader.scala:221)
    at com.vesoft.nebula.connector.reader.NebulaPartitionReader.hasNextEdgeRow(NebulaPartitionReader.scala:17)
    at com.vesoft.nebula.connector.reader.NebulaEdgePartitionReader.next(NebulaEdgePartitionReader.scala:14)
    at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
    at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:907)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

23/12/15 07:59:57 ERROR TaskSetManager: Task 1 in stage 10.0 failed 4 times; aborting job
23/12/15 07:59:57 WARN TaskSetManager: Lost task 2.3 in stage 10.0 (TID 141) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 7.3 in stage 10.0 (TID 136) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 5.3 in stage 10.0 (TID 133) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 3.3 in stage 10.0 (TID 135) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 4.3 in stage 10.0 (TID 139) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 0.3 in stage 10.0 (TID 140) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 9.3 in stage 10.0 (TID 134) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 6.3 in stage 10.0 (TID 137) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 8.3 in stage 10.0 (TID 138) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 10.0 failed 4 times, most recent failure: Lost task 1.3 in stage 10.0 (TID 132) (xx.xx.xx.xx executor 23): com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
    at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:100)
    at com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator.next(ScanEdgeResultIterator.java:142)
    at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow(NebulaReader.scala:263)
    at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow$(NebulaReader.scala:221)
    at com.vesoft.nebula.connector.reader.NebulaPartitionReader.hasNextEdgeRow(NebulaPartitionReader.scala:17)
    at com.vesoft.nebula.connector.reader.NebulaEdgePartitionReader.next(NebulaEdgePartitionReader.scala:14)
    at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
    at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:907)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2470)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2419)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2418)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2418)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1125)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1125)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1125)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2684)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2626)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2615)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.checkNoFailures(AdaptiveExecutor.scala:147)
  at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.doRun(AdaptiveExecutor.scala:88)
  at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:66)
  at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:204)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:203)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:425)
  at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3047)
  at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3046)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3751)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
  at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3749)
  at org.apache.spark.sql.Dataset.count(Dataset.scala:3046)
  ... 49 elided
Caused by: com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
  at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:100)
  at com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator.next(ScanEdgeResultIterator.java:142)
  at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow(NebulaReader.scala:263)
  at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow$(NebulaReader.scala:221)
  at com.vesoft.nebula.connector.reader.NebulaPartitionReader.hasNextEdgeRow(NebulaPartitionReader.scala:17)
  at com.vesoft.nebula.connector.reader.NebulaEdgePartitionReader.next(NebulaEdgePartitionReader.scala:14)
  at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
  at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:907)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133)
  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
  at org.apache.spark.scheduler.Task.run(Task.scala:131)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:750)

However, we can successfully run the following and get results on EMR machine.

dataset.show()

We also tested scripts involving different volumes of the graph data. When val n_limit = 1000000, we can successfully run the following (which is a modified snippet from nebula-algorithm package):

val ORIGIN_ID_COL = "id"
val fieldNames         = dataset.schema.fieldNames
val n_limit = 1000000
val (srcName, dstName) = (fieldNames(0), fieldNames(1))
val srcIdDF: DataFrame = dataset.select(srcName).withColumnRenamed(srcName, ORIGIN_ID_COL).limit(n_limit)
val dstIdDF: DataFrame = dataset.select(dstName).withColumnRenamed(dstName, ORIGIN_ID_COL).limit(n_limit)
val idDF               = srcIdDF.union(dstIdDF).distinct()
idDF.show()

However, when we increase to val n_limit = 10000000, it failed and we got the same Unable to activate object error.

What we found so far

With more tests going on, we found that when number of all threads / concurrent tasks is 1, there would not be such error, whereas when number of threads is greater than 1, the error appears. We are suspecting that there is certain constraint of NebulaGraph Database and wondering whether proper configuration tuning could help.

Could you please help with this issue? Feel free to let me know if I need provide more information. Thanks a lot!

QingZ11 commented 10 months ago

In the previous post mentioned here: https://discuss.nebula-graph.com.cn/t/topic/9726, zhang_hytc encountered the same issue as you did. You can try the following steps:

First, execute the show hosts command in the nebula-console. This command displays the addresses of the storaged services exposed by the NebulaGraph metad service.

Next, confirm whether you can establish a connection from your local environment to the storaged addresses exposed by the metad service.

sparkle-apt commented 10 months ago

Thanks @QingZ11 for your prompt response. I confirm that the address of the storaged service exposed by the metad service is the public IP address of the storaged service as shown below (The IP of the storaged service is masked due to sensitivity).

show_hosts

And I confirm that I can establish a connection from my EMR cluster to the storaged addresses as shown below (The IP of the storaged service is masked due to sensitivity).

telnet

The issue we encounter is not that we cannot connect to the storaged service under any circumstances. Instead, the problem is that we encounter the error when total-executor-cores is greater than 4. This greatly limits our efficient usage of the graph database for our use cases. Could you please help look into the issue and share insights that help us address it? Thank you!

Nicole00 commented 10 months ago

please make sure all the spark workers can ping the storaged address.

sparkle-apt commented 10 months ago

Thanks @Nicole00 for reminder. I confirm that all Spark workers and the storaged service are within the same VPC network and their ports are connected.

sparkle-apt commented 10 months ago

I've taken the initiative to do some preliminary checks, but so far, those have not led to a resolution. To proceed further and more effectively troubleshoot the issue, could you advise me on the following: Log Files: Which specific log files or which specific information in logs should I review that may contain error messages or indicators related to this issue? Configuration Files: Are there any configuration settings that I should inspect or tweak that might be relevant to this problem? Diagnostic Tools/Commands: Are there tools or commands available to gather more diagnostic information? If you require additional information or context from my end, please let me know, and I'll be sure to provide it. Thanks!

sparkle-apt commented 10 months ago

In addition, we have observed weird behavior in another test, which is to connect to the database and count number via spark-shell.

spark-shell --master yarn --deploy-mode client --driver-memory=2G --executor-memory=2G  --num-executors=2 --executor-cores=2 --conf spark.dynamicAllocation.enabled=false --jars nebula-spark-connector_3.0-3.0-SNAPSHOT-jar-with-dependencies.jar

We run again the following snippet

import org.apache.spark.sql.DataFrame
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}

sc.setLogLevel("INFO")
val ec2_public_ip = "xx.xx.xx.xx"

val config = NebulaConnectionConfig.builder().withMetaAddress(s"${ec2_public_ip}:9559").withConnectionRetry(2).build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig.builder().withSpace("acct2asset_20231130").withLabel("USES").withNoColumn(false).withReturnCols(List()).withPartitionNum(20).build()
val dataset = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
dataset.show()
dataset.count()

The first four tasks raised "Unable to activate object" the error while the following ones did not.

Screenshot 2023-12-20 at 11 10 21 Screenshot 2023-12-20 at 11 10 02

We are concerned about this unstable and unexpected behavior and looking forward to your suggestion. Thanks! @Nicole00 @QingZ11 @wey-gu

Nicole00 commented 10 months ago

So wired! Does the first four tasks are located in the different machines with the other tasks? Can you sure the telnet storaged_host_ip 9779 is ok for all the spark workers?

sparkle-apt commented 10 months ago

@Nicole00 Yes, these tasks all run on the same single machine where storaged service is and I confirm telnet storaged_host_ip 9779 returns Connected to storaged_host_ip.

Nicole00 commented 10 months ago

@Nicole00 Yes, these tasks all run on the same single machine where storaged service is and I confirm telnet storaged_host_ip 9779 returns Connected to storaged_host_ip.

Really wired. If the tasks are all run on ONE SAME single machine, looks like the storaged server is not ready at 10:59:00, but ready at 11:01:09.

Nicole00 commented 10 months ago

could you please provide some log information for nebula storaged?

sparkle-apt commented 10 months ago

could you please provide some log information for nebula storaged?

Sure, could you please let me know what minloglevel and v is needed in log settings so that I could provide logs that help.

Nicole00 commented 10 months ago

You can config minloglevel as 0 and config v as 3 for more detailed info. https://docs.nebula-graph.io/3.6.0/5.configurations-and-logs/2.log-management/logs/#parameter_descriptions

sparkle-apt commented 10 months ago

The logging is configured so that minloglevel is 0 and v is 3. When rerunning the snippet of counting, we however did not observe any error. Instead, the running tasks seemed to be stuck without any task finished while it seems that data is being fetched slowly according to the log, which is abnormal. It seems to me the intensive logging may impact the performance of fetching data. As the full log is large, we truncated it to contain the top and representative information which is attached. nebula-storaged-v3.txt When v is reset to 0, we observed the same error again. However, no warning / error is found in the storaged log. We are getting more confused and not sure if these pieces of info help. Please let me know if you require additional information or context from my end.

sparkle-apt commented 10 months ago

@Nicole00 btw following some random thought, we found tons of TCP connection with TIME_WAIT state when running the code below:

import org.apache.spark.sql.DataFrame
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}

sc.setLogLevel("INFO")
val ec2_public_ip = "xx.xx.xx.xx"

val config = NebulaConnectionConfig.builder().withMetaAddress(s"${ec2_public_ip}:9559").withConnectionRetry(2).build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig.builder().withSpace("acct2asset_20231130").withLabel("USES").withNoColumn(false).withReturnCols(List()).withPartitionNum(20).build()
val dataset = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
dataset.count()

There are around 21k TCP connections:

(base) [ec2-user@xx.xx.xx.xx packages]$ netstat -a | grep -cE ':9779.*TIME_WAIT'
21494

Is this expected?

Nicole00 commented 10 months ago

Sorry for reply late. Theoretically the connections to storaged will be 20(partitionNum) * (number of sotraged instance).

I checked the connection leak problem for the connector, the storageClient will be closed after one partition finish its task and the connectionPool inside can also be closed when storageClient is closed.

How many data in your USES label?

sparkle-apt commented 10 months ago

@Nicole00 No worries! There are 303938330 USES edges in the space.

Nicole00 commented 10 months ago

OK, I'll make a test to see if there any connection leak. And at the mean time, maybe you can update your nebula-spark-connector to the latest version.

sparkle-apt commented 10 months ago

A bit summary of what have been observed so far:

  1. Generally encountered error Unable to activate object when loading graph with number of total nodes greater than 4, observed both on the local and EMR cluster;
  2. Can successfully load with number of total nodes no greater than 4, observed both on the local and EMR cluster;
  3. Observed once that when loading graph the first four tasks raised "Unable to activate object" the error while the following ones did not and successfully finished;
  4. When logging configured at minloglevel = 0 and v = 3, i.e., large amount of logs written to disk, loading graph with number of total nodes greater than 4 can succeed, much much slower though;
  5. There are tons of TCP connection with TIME_WAIT state when loading graph on local

@Nicole00 Do you have any other ideas taking these into consideration? Anything we could try to increase parallelism when reading graph?

Nicole00 commented 9 months ago

I really cannot reproduce your problem. I run the connector in local spark cluster with both 5 nodes and 1 node, and can read nebula's data successfully.

I still think it's a network problem.

Nicole00 commented 9 months ago

This question came up to me very accidentally, it's about the port amount. maybe you can try according to the post. https://blog.csdn.net/gltncx11/article/details/122068479 @sparkle-apt

QingZ11 commented 8 months ago

@sparkle-apt hi, I have noticed that the issue you created hasn’t been updated for nearly a month, is this issue been resolved? If not resolved, can you provide some more information? If solved, can you close this issue?

Thanks a lot for your contribution anyway 😊

sparkle-apt commented 8 months ago

We have decided to not be blocked by this issue for the moment and move forward with other projects and test in larger clusters. We will get back to it when bandwidth allows. So we can close the issue. Thanks for the reminder.