neo4j / graph-data-science

Source code for the Neo4j Graph Data Science library of graph algorithms.
https://neo4j.com/docs/graph-data-science/current/
Other
596 stars 157 forks source link

Unable to run RandomWalk more than once without restarting server #261

Closed hindog closed 11 months ago

hindog commented 1 year ago

Describe the bug If we call gds.randomWalk.stream initially after the server has started:

CALL gds.randomWalk.stream(
  'myGraph',
  {
    walkLength: 3,
    walksPerNode: 1,
    randomSeed: 42,
    concurrency: 1
  }
)
YIELD nodeIds, path
RETURN nodeIds, [node IN nodes(path) | node.label ] AS labels
LIMIT 1000;

.. it returns quickly with results. However, if we try running the same exact call again, it will run for 100 seconds before returning empty results. Also, if we try to run node2vec after running gds.randomWalk.stream, it will fail with:

Neo.ClientError.Procedure.ProcedureCallFailed: Failed to invoke procedure gds.beta.node2vec.stream: Caused by: java.lang.IllegalArgumentException: Unknown subtask: create walks

I took a stack dump of the server during the 2nd call to gds.randomWalk.stream and found these thread stacks relating to RandomWalk:

"neo4j.BoltWorker-7 [bolt] [/10.10.187.10:27306] " #315 prio=5 os_prio=0 cpu=68181.29ms elapsed=687.72s tid=0x00007f45280735a0 nid=0x3aa waiting on condition  [0x00007f44f52e9000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.16.1/Native Method)
    - parking to wait for  <0x00000007f2f387a0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.16.1/LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(java.base@11.0.16.1/AbstractQueuedSynchronizer.java:2123)
    at java.util.concurrent.ArrayBlockingQueue.poll(java.base@11.0.16.1/ArrayBlockingQueue.java:432)
    at org.neo4j.gds.core.utils.queue.QueueBasedSpliterator.poll(QueueBasedSpliterator.java:63)
    at org.neo4j.gds.core.utils.queue.QueueBasedSpliterator.<init>(QueueBasedSpliterator.java:43)
    at org.neo4j.gds.traversal.RandomWalk.walksQueueConsumer(RandomWalk.java:193)
    at org.neo4j.gds.traversal.RandomWalk.compute(RandomWalk.java:105)
    at org.neo4j.gds.traversal.RandomWalk.compute(RandomWalk.java:48)
    at org.neo4j.gds.executor.ProcedureExecutor.lambda$executeAlgorithm$0(ProcedureExecutor.java:133)
    at org.neo4j.gds.executor.ProcedureExecutor$$Lambda$6910/0x0000000844bbf440.get(Unknown Source)
    at org.neo4j.gds.executor.ProcedureExecutor.runWithExceptionLogging(ProcedureExecutor.java:197)
    at org.neo4j.gds.executor.ProcedureExecutor.executeAlgorithm(ProcedureExecutor.java:129)
    at org.neo4j.gds.executor.ProcedureExecutor.compute(ProcedureExecutor.java:109)
    at org.neo4j.gds.paths.randomwalk.RandomWalkStreamProc.stream(RandomWalkStreamProc.java:51)
    at org.neo4j.kernel.impl.proc.GeneratedProcedure_stream15274570006230372.apply(Unknown Source)
    at org.neo4j.procedure.impl.ProcedureRegistry.callProcedure(ProcedureRegistry.java:235)
    at org.neo4j.procedure.impl.GlobalProceduresRegistry.callProcedure(GlobalProceduresRegistry.java:352)
    at org.neo4j.kernel.impl.newapi.AllStoreHolder.callProcedure(AllStoreHolder.java:1092)
    at org.neo4j.kernel.impl.newapi.AllStoreHolder.procedureCallRead(AllStoreHolder.java:1004)
    at org.neo4j.cypher.internal.runtime.interpreted.CallSupport$.$anonfun$callReadOnlyProcedure$1(CallSupport.scala:47)
    at org.neo4j.cypher.internal.runtime.interpreted.CallSupport$$$Lambda$6088/0x0000000844862040.apply(Unknown Source)
    at org.neo4j.cypher.internal.runtime.interpreted.CallSupport$.callProcedure(CallSupport.scala:70)
    at org.neo4j.cypher.internal.runtime.interpreted.CallSupport$.callReadOnlyProcedure(CallSupport.scala:47)
    at org.neo4j.cypher.internal.runtime.interpreted.TransactionBoundReadQueryContext.callReadOnlyProcedure(TransactionBoundQueryContext.scala:1135)
    at org.neo4j.cypher.internal.planning.ExceptionTranslatingReadQueryContext.callReadOnlyProcedure(ExceptionTranslatingQueryContext.scala:225)
    at org.neo4j.cypher.internal.runtime.LazyReadOnlyCallMode$.callProcedure(ProcedureCallMode.scala:50)
    at org.neo4j.codegen.OperatorTaskPipeline0_407.compiledOperate(Unknown Source)
    at org.neo4j.cypher.internal.runtime.pipelined.operators.CompiledTask.operateWithProfile(OperatorCodeGenBaseTemplates.scala:452)
    at org.neo4j.cypher.internal.runtime.pipelined.PipelineTask.executeOperators(PipelineTask.scala:62)
    at org.neo4j.cypher.internal.runtime.pipelined.PipelineTask.executeWorkUnit(PipelineTask.scala:51)
    at org.neo4j.cypher.internal.runtime.pipelined.Worker.executeTask(Worker.scala:144)
    at org.neo4j.cypher.internal.runtime.pipelined.Worker.workOnQuery(Worker.scala:97)
    at org.neo4j.cypher.internal.runtime.pipelined.execution.CallingThreadExecutingQuery.request(CallingThreadExecutingQuery.scala:40)
    at org.neo4j.cypher.internal.PipelinedRuntimeResult.request(PipelinedRuntime.scala:502)
    at org.neo4j.cypher.internal.result.StandardInternalExecutionResult.request(StandardInternalExecutionResult.scala:90)
    at org.neo4j.cypher.internal.result.ClosingExecutionResult.request(ClosingExecutionResult.scala:144)
    at org.neo4j.fabric.stream.QuerySubject$BasicQuerySubject$1.doRequest(QuerySubject.java:184)
    at org.neo4j.fabric.stream.QuerySubject$BasicQuerySubject$1.request(QuerySubject.java:167)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)
    at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138)
    at org.neo4j.fabric.stream.Rx2SyncStream$RecordSubscriber.request(Rx2SyncStream.java:129)
    at org.neo4j.fabric.stream.Rx2SyncStream.maybeRequest(Rx2SyncStream.java:91)
    at org.neo4j.fabric.stream.Rx2SyncStream.readRecord(Rx2SyncStream.java:50)
    at org.neo4j.fabric.bolt.BoltQueryExecutionImpl$QueryExecutionImpl.request(BoltQueryExecutionImpl.java:179)
    at org.neo4j.bolt.runtime.AbstractCypherAdapterStream.handleRecords(AbstractCypherAdapterStream.java:105)
    at org.neo4j.bolt.v3.messaging.ResultHandler.onPullRecords(ResultHandler.java:41)
    at org.neo4j.bolt.v4.messaging.PullResultConsumer.consume(PullResultConsumer.java:42)
    at org.neo4j.bolt.runtime.statemachine.impl.TransactionStateMachine$State.consumeResult(TransactionStateMachine.java:507)
    at org.neo4j.bolt.runtime.statemachine.impl.TransactionStateMachine$State$2.streamResult(TransactionStateMachine.java:351)
    at org.neo4j.bolt.runtime.statemachine.impl.TransactionStateMachine.streamResult(TransactionStateMachine.java:99)
    at org.neo4j.bolt.transaction.StatementProcessorTxManager.streamResults(StatementProcessorTxManager.java:251)
    at org.neo4j.bolt.transaction.StatementProcessorTxManager.pullData(StatementProcessorTxManager.java:111)
    at org.neo4j.bolt.v4.runtime.InTransactionState.processStreamPullResultMessage(InTransactionState.java:78)
    at org.neo4j.bolt.v4.runtime.AbstractStreamingState.processUnsafe(AbstractStreamingState.java:51)
    at org.neo4j.bolt.v4.runtime.InTransactionState.processUnsafe(InTransactionState.java:64)
    at org.neo4j.bolt.v3.runtime.FailSafeBoltStateMachineState.process(FailSafeBoltStateMachineState.java:48)
    at org.neo4j.bolt.runtime.statemachine.impl.AbstractBoltStateMachine.nextState(AbstractBoltStateMachine.java:154)
    at org.neo4j.bolt.runtime.statemachine.impl.AbstractBoltStateMachine.process(AbstractBoltStateMachine.java:102)
    at org.neo4j.bolt.messaging.BoltRequestMessageReader.lambda$doRead$1(BoltRequestMessageReader.java:93)
    at org.neo4j.bolt.messaging.BoltRequestMessageReader$$Lambda$3490/0x0000000840c78440.perform(Unknown Source)
    at org.neo4j.bolt.runtime.DefaultBoltConnection.lambda$enqueue$0(DefaultBoltConnection.java:156)
    at org.neo4j.bolt.runtime.DefaultBoltConnection$$Lambda$3491/0x0000000840c3d440.perform(Unknown Source)
    at org.neo4j.bolt.runtime.DefaultBoltConnection.processNextBatchInternal(DefaultBoltConnection.java:252)
    at org.neo4j.bolt.runtime.DefaultBoltConnection.processNextBatch(DefaultBoltConnection.java:187)
    at org.neo4j.bolt.runtime.DefaultBoltConnection.processNextBatch(DefaultBoltConnection.java:177)
    at org.neo4j.bolt.runtime.scheduling.ExecutorBoltScheduler.executeBatch(ExecutorBoltScheduler.java:257)
    at org.neo4j.bolt.runtime.scheduling.ExecutorBoltScheduler.lambda$scheduleBatchOrHandleError$3(ExecutorBoltScheduler.java:240)
    at org.neo4j.bolt.runtime.scheduling.ExecutorBoltScheduler$$Lambda$3494/0x0000000840c21040.get(Unknown Source)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(java.base@11.0.16.1/CompletableFuture.java:1700)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.16.1/ThreadPoolExecutor.java:1128)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.16.1/ThreadPoolExecutor.java:628)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(java.base@11.0.16.1/Thread.java:829)

   Locked ownable synchronizers:
    - <0x00000005d110b860> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"algo-1" #325 daemon prio=5 os_prio=0 cpu=106.68ms elapsed=571.89s tid=0x0000560b67ec6220 nid=0x410 waiting on condition  [0x00007f44f46e1000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.16.1/Native Method)
    - parking to wait for  <0x00000005d90a4578> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(java.base@11.0.16.1/LockSupport.java:194)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@11.0.16.1/AbstractQueuedSynchronizer.java:2081)
    at java.util.concurrent.ArrayBlockingQueue.put(java.base@11.0.16.1/ArrayBlockingQueue.java:367)
    at org.neo4j.gds.traversal.RandomWalk$RandomWalkTask.flushBuffer(RandomWalk.java:331)
    at org.neo4j.gds.traversal.RandomWalk$RandomWalkTask.run(RandomWalk.java:311)
    at org.neo4j.gds.core.concurrency.ParallelUtil.runWithConcurrency(ParallelUtil.java:408)
    at org.neo4j.gds.core.concurrency.ParallelUtil.runWithConcurrency(ParallelUtil.java:380)
    at org.neo4j.gds.core.concurrency.RunWithConcurrency.run(RunWithConcurrency.java:200)
    at org.neo4j.gds.core.concurrency.RunWithConcurrency$Builder.run(RunWithConcurrency.java:240)
    at org.neo4j.gds.traversal.RandomWalk.tasksRunner(RandomWalk.java:176)
    at org.neo4j.gds.traversal.RandomWalk.lambda$startWalkers$3(RandomWalk.java:149)
    at org.neo4j.gds.traversal.RandomWalk$$Lambda$6915/0x0000000844bc1440.run(Unknown Source)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(java.base@11.0.16.1/CompletableFuture.java:1736)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.16.1/ThreadPoolExecutor.java:1128)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.16.1/ThreadPoolExecutor.java:628)
    at java.lang.Thread.run(java.base@11.0.16.1/Thread.java:829)
    at org.neo4j.internal.helpers.NamedThreadFactory$2.run(NamedThreadFactory.java:110)

  Locked ownable synchronizers:
    - <0x00000005d90a47f0> (a java.util.concurrent.ThreadPoolExecutor$Worker)

NOTE: the algo-1 stack dump is present after the first call to gds.randomWalk.stream completes and doesn't appear to ever terminate.

To Reproduce

Call gds.randomWalk.stream twice.

GDS version: 2.3.1 Neo4j version: 4.4.9 Operating system: 5.4.209-116.367.amzn2.x86_64 #1 SMP Wed Aug 31 00:09:52 UTC 2022 x86_64 GNU/Linux

Steps to reproduce the behavior:

// Node count: 125044
// Relationship count: 4193106
CALL gds.graph.project(
    'myGraph',
    ['Node1', 'Node2'],
    { MY_RELATIONSHIP: { orientation: 'UNDIRECTED' } }
);

CALL gds.randomWalk.stream(
  'myGraph',
  {
    walkLength: 3,
    walksPerNode: 1,
    randomSeed: 42,
    concurrency: 1
  }
)
YIELD nodeIds, path
RETURN nodeIds, [node IN nodes(path) | node.label ] AS labels
LIMIT 1;

CALL gds.randomWalk.stream(
  'myGraph',
  {
    walkLength: 3,
    walksPerNode: 1,
    randomSeed: 42,
    concurrency: 1
  }
)
YIELD nodeIds, path
RETURN nodeIds, [node IN nodes(path) | node.label ] AS labels
LIMIT 1

Alternatively:

Also see https://github.com/neo4j/graph-data-science/pull/262 for unit test that repros the isue.

Expected behavior Should be able to run gds.randomWalk.stream multiple times

Additional context The "returning empty results after 100 seconds" on the 2nd call appears to be controlled by this value.