leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
620 stars 25 forks source link

group-by consumers must terminate immediately on input crash #75

Closed lowecg closed 1 year ago

lowecg commented 1 year ago

I have a bunch of nested flows in a mock setup to simulate a data retrieval process.

The implementation I currently have is an expansion of the discussion on Slack

In essence, what I'm trying to achieve is the following:

  1. call a sequence of pages, each of which returns a list of item keys
  2. partition the keys into batches of 100, and try to resolve the key in a remote cache
  3. for cache hits, return the cached values
  4. for cache misses, batch missed keys into 100 and submit them to a service for resolving
  5. store the results in the cache for later use

The tasks will be calling web services, so I've used the via/blk. Calls to the services are made in parallel using (m/?> par flow)

The code consistently locks in the same way, which is the thread dump indicates this location (full thread dump below)

java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.18/AbstractQueuedSynchronizer.java:1345)
        at java.util.concurrent.CountDownLatch.await(java.base@11.0.18/CountDownLatch.java:232)
        at missionary.impl.Fiber$1$1.<init>(Fiber.java:41)
java --version
openjdk 11.0.18 2023-01-17 LTS
OpenJDK Runtime Environment Corretto-11.0.18.10.1 (build 11.0.18+10-LTS)
OpenJDK 64-Bit Server VM Corretto-11.0.18.10.1 (build 11.0.18+10-LTS, mixed mode)
Thread stack: ``` 2023-02-19 10:06:09 Full thread dump OpenJDK 64-Bit Server VM (11.0.18+10-LTS mixed mode): Threads class SMR info: _java_thread_list=0x0000600002120120, length=23, elements={ 0x000000013201a800, 0x00000001421c9800, 0x00000001421cc800, 0x00000001421e8000, 0x00000001421eb000, 0x00000001421ec000, 0x0000000133008800, 0x0000000133009000, 0x00000001320d9800, 0x0000000142788800, 0x000000014279f000, 0x0000000142797000, 0x0000000142797800, 0x00000001427a0000, 0x000000014279e000, 0x000000013251b800, 0x000000013103b800, 0x000000013300b800, 0x0000000132a49000, 0x0000000127838000, 0x000000013252a800, 0x0000000130010800, 0x0000000130830000 } "main" #1 prio=5 os_prio=31 cpu=731.99ms elapsed=35.42s tid=0x000000013201a800 nid=0x1303 waiting on condition [0x000000016bb88000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.18/Native Method) - parking to wait for <0x00000003fe417208> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(java.base@11.0.18/LockSupport.java:194) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.18/AbstractQueuedSynchronizer.java:885) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.18/AbstractQueuedSynchronizer.java:1039) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.18/AbstractQueuedSynchronizer.java:1345) at java.util.concurrent.CountDownLatch.await(java.base@11.0.18/CountDownLatch.java:232) at missionary.impl.Fiber$1$1.(Fiber.java:41) at missionary.impl.Fiber$1.park(Fiber.java:18) at missionary.impl.Fiber.park(Fiber.java:85) at missionary.core$park.invokeStatic(core.cljc:173) at missionary.core$park.invoke(core.cljc:172) at missionary_mock$run.invokeStatic(missionary_mock.clj:116) at missionary_mock$run.invoke(missionary_mock.clj:109) at core$_main.invokeStatic(core.clj:6) at core$_main.doInvoke(core.clj:5) at clojure.lang.RestFn.invoke(RestFn.java:397) at clojure.lang.AFn.applyToHelper(AFn.java:152) at clojure.lang.RestFn.applyTo(RestFn.java:132) at core.main(Unknown Source) Locked ownable synchronizers: - None "Reference Handler" #2 daemon prio=10 os_prio=31 cpu=0.37ms elapsed=35.41s tid=0x00000001421c9800 nid=0x5403 waiting on condition [0x000000017c9de000] java.lang.Thread.State: RUNNABLE at java.lang.ref.Reference.waitForReferencePendingList(java.base@11.0.18/Native Method) at java.lang.ref.Reference.processPendingReferences(java.base@11.0.18/Reference.java:241) at java.lang.ref.Reference$ReferenceHandler.run(java.base@11.0.18/Reference.java:213) Locked ownable synchronizers: - None "Finalizer" #3 daemon prio=8 os_prio=31 cpu=0.14ms elapsed=35.41s tid=0x00000001421cc800 nid=0x4503 in Object.wait() [0x000000017cbea000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(java.base@11.0.18/Native Method) - waiting on <0x00000003c00013f0> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(java.base@11.0.18/ReferenceQueue.java:155) - waiting to re-lock in wait() <0x00000003c00013f0> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(java.base@11.0.18/ReferenceQueue.java:176) at java.lang.ref.Finalizer$FinalizerThread.run(java.base@11.0.18/Finalizer.java:170) Locked ownable synchronizers: - None "Signal Dispatcher" #4 daemon prio=9 os_prio=31 cpu=0.33ms elapsed=35.41s tid=0x00000001421e8000 nid=0x5503 runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE Locked ownable synchronizers: - None "Service Thread" #5 daemon prio=9 os_prio=31 cpu=0.02ms elapsed=35.41s tid=0x00000001421eb000 nid=0x7e03 runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE Locked ownable synchronizers: - None "C2 CompilerThread0" #6 daemon prio=9 os_prio=31 cpu=748.75ms elapsed=35.41s tid=0x00000001421ec000 nid=0x5803 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE No compile task Locked ownable synchronizers: - None "C1 CompilerThread0" #9 daemon prio=9 os_prio=31 cpu=381.94ms elapsed=35.41s tid=0x0000000133008800 nid=0x7b03 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE No compile task Locked ownable synchronizers: - None "Sweeper thread" #10 daemon prio=9 os_prio=31 cpu=0.02ms elapsed=35.41s tid=0x0000000133009000 nid=0x7903 runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE Locked ownable synchronizers: - None "Common-Cleaner" #11 daemon prio=8 os_prio=31 cpu=0.33ms elapsed=35.39s tid=0x00000001320d9800 nid=0x7503 in Object.wait() [0x000000017db56000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(java.base@11.0.18/Native Method) - waiting on <0x00000003c00018c0> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(java.base@11.0.18/ReferenceQueue.java:155) - waiting to re-lock in wait() <0x00000003c00018c0> (a java.lang.ref.ReferenceQueue$Lock) at jdk.internal.ref.CleanerImpl.run(java.base@11.0.18/CleanerImpl.java:148) at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) at jdk.internal.misc.InnocuousThread.run(java.base@11.0.18/InnocuousThread.java:161) Locked ownable synchronizers: - None "missionary blk-0" #12 daemon prio=5 os_prio=31 cpu=5.32ms elapsed=34.63s tid=0x0000000142788800 nid=0xa903 waiting on condition [0x000000017f3e6000] java.lang.Thread.State: TIMED_WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.18/Native Method) - parking to wait for <0x00000003fe445788> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.18/LockSupport.java:234) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(java.base@11.0.18/SynchronousQueue.java:462) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(java.base@11.0.18/SynchronousQueue.java:361) at java.util.concurrent.SynchronousQueue.poll(java.base@11.0.18/SynchronousQueue.java:937) at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.18/ThreadPoolExecutor.java:1053) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.18/ThreadPoolExecutor.java:1114) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.18/ThreadPoolExecutor.java:628) at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) Locked ownable synchronizers: - None "missionary blk-1" #13 daemon prio=5 os_prio=31 cpu=5.51ms elapsed=34.63s tid=0x000000014279f000 nid=0xa703 waiting on condition [0x000000017f5f2000] java.lang.Thread.State: TIMED_WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.18/Native Method) - parking to wait for <0x00000003fe445788> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.18/LockSupport.java:234) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(java.base@11.0.18/SynchronousQueue.java:462) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(java.base@11.0.18/SynchronousQueue.java:361) at java.util.concurrent.SynchronousQueue.poll(java.base@11.0.18/SynchronousQueue.java:937) at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.18/ThreadPoolExecutor.java:1053) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.18/ThreadPoolExecutor.java:1114) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.18/ThreadPoolExecutor.java:628) at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) Locked ownable synchronizers: - None "missionary blk-2" #14 daemon prio=5 os_prio=31 cpu=3.11ms elapsed=34.63s tid=0x0000000142797000 nid=0xa403 waiting on condition [0x000000017f7fe000] java.lang.Thread.State: TIMED_WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.18/Native Method) - parking to wait for <0x00000003fe445788> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.18/LockSupport.java:234) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(java.base@11.0.18/SynchronousQueue.java:462) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(java.base@11.0.18/SynchronousQueue.java:361) at java.util.concurrent.SynchronousQueue.poll(java.base@11.0.18/SynchronousQueue.java:937) at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.18/ThreadPoolExecutor.java:1053) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.18/ThreadPoolExecutor.java:1114) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.18/ThreadPoolExecutor.java:628) at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) Locked ownable synchronizers: - None "missionary blk-3" #15 daemon prio=5 os_prio=31 cpu=4.58ms elapsed=34.63s tid=0x0000000142797800 nid=0x8303 waiting on condition [0x000000017fa0a000] java.lang.Thread.State: TIMED_WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.18/Native Method) - parking to wait for <0x00000003fe445788> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.18/LockSupport.java:234) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(java.base@11.0.18/SynchronousQueue.java:462) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(java.base@11.0.18/SynchronousQueue.java:361) at java.util.concurrent.SynchronousQueue.poll(java.base@11.0.18/SynchronousQueue.java:937) at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.18/ThreadPoolExecutor.java:1053) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.18/ThreadPoolExecutor.java:1114) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.18/ThreadPoolExecutor.java:628) at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) Locked ownable synchronizers: - None "missionary blk-4" #16 daemon prio=5 os_prio=31 cpu=3.82ms elapsed=34.63s tid=0x00000001427a0000 nid=0x8403 waiting on condition [0x000000017fc16000] java.lang.Thread.State: TIMED_WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.18/Native Method) - parking to wait for <0x00000003fe445788> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.18/LockSupport.java:234) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(java.base@11.0.18/SynchronousQueue.java:462) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(java.base@11.0.18/SynchronousQueue.java:361) at java.util.concurrent.SynchronousQueue.poll(java.base@11.0.18/SynchronousQueue.java:937) at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.18/ThreadPoolExecutor.java:1053) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.18/ThreadPoolExecutor.java:1114) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.18/ThreadPoolExecutor.java:628) at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) Locked ownable synchronizers: - None "missionary blk-5" #17 daemon prio=5 os_prio=31 cpu=2.80ms elapsed=34.62s tid=0x000000014279e000 nid=0x9f03 waiting on condition [0x000000017fe22000] java.lang.Thread.State: TIMED_WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.18/Native Method) - parking to wait for <0x00000003fe445788> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.18/LockSupport.java:234) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(java.base@11.0.18/SynchronousQueue.java:462) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(java.base@11.0.18/SynchronousQueue.java:361) at java.util.concurrent.SynchronousQueue.poll(java.base@11.0.18/SynchronousQueue.java:937) at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.18/ThreadPoolExecutor.java:1053) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.18/ThreadPoolExecutor.java:1114) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.18/ThreadPoolExecutor.java:628) at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) Locked ownable synchronizers: - None "missionary scheduler" #18 daemon prio=5 os_prio=31 cpu=32.33ms elapsed=34.62s tid=0x000000013251b800 nid=0x8803 waiting on condition [0x00000002a8206000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(java.base@11.0.18/Native Method) at missionary.impl.Sleep$Scheduler.run(Sleep.java:28) Locked ownable synchronizers: - None "missionary blk-6" #19 daemon prio=5 os_prio=31 cpu=16.43ms elapsed=34.09s tid=0x000000013103b800 nid=0x7307 waiting on condition [0x000000017dd62000] java.lang.Thread.State: TIMED_WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.18/Native Method) - parking to wait for <0x00000003fe445788> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.18/LockSupport.java:234) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(java.base@11.0.18/SynchronousQueue.java:462) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(java.base@11.0.18/SynchronousQueue.java:361) at java.util.concurrent.SynchronousQueue.poll(java.base@11.0.18/SynchronousQueue.java:937) at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.18/ThreadPoolExecutor.java:1053) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.18/ThreadPoolExecutor.java:1114) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.18/ThreadPoolExecutor.java:628) at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) Locked ownable synchronizers: - None "Attach Listener" #20 daemon prio=9 os_prio=31 cpu=160.92ms elapsed=10.22s tid=0x000000013300b800 nid=0x5e0f waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE Locked ownable synchronizers: - None "RMI TCP Accept-0" #22 daemon prio=9 os_prio=31 cpu=0.99ms elapsed=8.69s tid=0x0000000132a49000 nid=0x8d03 runnable [0x00000002a861e000] java.lang.Thread.State: RUNNABLE at java.net.PlainSocketImpl.socketAccept(java.base@11.0.18/Native Method) at java.net.AbstractPlainSocketImpl.accept(java.base@11.0.18/AbstractPlainSocketImpl.java:474) at java.net.ServerSocket.implAccept(java.base@11.0.18/ServerSocket.java:565) at java.net.ServerSocket.accept(java.base@11.0.18/ServerSocket.java:533) at sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(jdk.management.agent@11.0.18/LocalRMIServerSocketFactory.java:52) at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(java.rmi@11.0.18/TCPTransport.java:394) at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(java.rmi@11.0.18/TCPTransport.java:366) at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) Locked ownable synchronizers: - None "RMI TCP Connection(1)-127.0.0.1" #23 daemon prio=9 os_prio=31 cpu=80.07ms elapsed=8.67s tid=0x0000000127838000 nid=0x9c03 runnable [0x00000002a8829000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(java.base@11.0.18/Native Method) at java.net.SocketInputStream.socketRead(java.base@11.0.18/SocketInputStream.java:115) at java.net.SocketInputStream.read(java.base@11.0.18/SocketInputStream.java:168) at java.net.SocketInputStream.read(java.base@11.0.18/SocketInputStream.java:140) at java.io.BufferedInputStream.fill(java.base@11.0.18/BufferedInputStream.java:252) at java.io.BufferedInputStream.read(java.base@11.0.18/BufferedInputStream.java:271) - locked <0x00000003fe45fa70> (a java.io.BufferedInputStream) at java.io.FilterInputStream.read(java.base@11.0.18/FilterInputStream.java:83) at sun.rmi.transport.tcp.TCPTransport.handleMessages(java.rmi@11.0.18/TCPTransport.java:544) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(java.rmi@11.0.18/TCPTransport.java:796) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(java.rmi@11.0.18/TCPTransport.java:677) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler$$Lambda$91/0x0000000800655040.run(java.rmi@11.0.18/Unknown Source) at java.security.AccessController.doPrivileged(java.base@11.0.18/Native Method) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(java.rmi@11.0.18/TCPTransport.java:676) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.18/ThreadPoolExecutor.java:1128) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.18/ThreadPoolExecutor.java:628) at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) Locked ownable synchronizers: - <0x00000003fe463e18> (a java.util.concurrent.ThreadPoolExecutor$Worker) "RMI Scheduler(0)" #24 daemon prio=9 os_prio=31 cpu=0.23ms elapsed=8.65s tid=0x000000013252a800 nid=0x9b03 waiting on condition [0x00000002a8a36000] java.lang.Thread.State: TIMED_WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.18/Native Method) - parking to wait for <0x00000003fe42e8c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.18/LockSupport.java:234) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(java.base@11.0.18/AbstractQueuedSynchronizer.java:2123) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(java.base@11.0.18/ScheduledThreadPoolExecutor.java:1182) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(java.base@11.0.18/ScheduledThreadPoolExecutor.java:899) at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.18/ThreadPoolExecutor.java:1054) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.18/ThreadPoolExecutor.java:1114) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.18/ThreadPoolExecutor.java:628) at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) Locked ownable synchronizers: - None "JMX server connection timeout 25" #25 daemon prio=9 os_prio=31 cpu=3.09ms elapsed=8.65s tid=0x0000000130010800 nid=0x9203 in Object.wait() [0x00000002a8c42000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(java.base@11.0.18/Native Method) - waiting on <0x00000003fe4b9588> (a [I) at com.sun.jmx.remote.internal.ServerCommunicatorAdmin$Timeout.run(java.management@11.0.18/ServerCommunicatorAdmin.java:171) - waiting to re-lock in wait() <0x00000003fe4b9588> (a [I) at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) Locked ownable synchronizers: - None "RMI TCP Connection(2)-127.0.0.1" #26 daemon prio=9 os_prio=31 cpu=13.61ms elapsed=7.58s tid=0x0000000130830000 nid=0x8b07 runnable [0x00000002a8411000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(java.base@11.0.18/Native Method) at java.net.SocketInputStream.socketRead(java.base@11.0.18/SocketInputStream.java:115) at java.net.SocketInputStream.read(java.base@11.0.18/SocketInputStream.java:168) at java.net.SocketInputStream.read(java.base@11.0.18/SocketInputStream.java:140) at java.io.BufferedInputStream.fill(java.base@11.0.18/BufferedInputStream.java:252) at java.io.BufferedInputStream.read(java.base@11.0.18/BufferedInputStream.java:271) - locked <0x00000003ffeb6b90> (a java.io.BufferedInputStream) at java.io.FilterInputStream.read(java.base@11.0.18/FilterInputStream.java:83) at sun.rmi.transport.tcp.TCPTransport.handleMessages(java.rmi@11.0.18/TCPTransport.java:544) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(java.rmi@11.0.18/TCPTransport.java:796) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(java.rmi@11.0.18/TCPTransport.java:677) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler$$Lambda$91/0x0000000800655040.run(java.rmi@11.0.18/Unknown Source) at java.security.AccessController.doPrivileged(java.base@11.0.18/Native Method) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(java.rmi@11.0.18/TCPTransport.java:676) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.18/ThreadPoolExecutor.java:1128) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.18/ThreadPoolExecutor.java:628) at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) Locked ownable synchronizers: - <0x00000003ffe275a0> (a java.util.concurrent.ThreadPoolExecutor$Worker) "VM Thread" os_prio=31 cpu=25.38ms elapsed=35.42s tid=0x00000001421c2000 nid=0x4103 runnable "GC Thread#0" os_prio=31 cpu=17.62ms elapsed=35.42s tid=0x0000000132012000 nid=0x2c03 runnable "GC Thread#1" os_prio=31 cpu=16.68ms elapsed=35.02s tid=0x0000000142528800 nid=0x5f03 runnable "GC Thread#2" os_prio=31 cpu=16.00ms elapsed=35.02s tid=0x0000000142336000 nid=0x6f03 runnable "GC Thread#3" os_prio=31 cpu=17.25ms elapsed=35.02s tid=0x0000000132ce5000 nid=0x6e03 runnable "GC Thread#4" os_prio=31 cpu=16.34ms elapsed=35.02s tid=0x0000000132da7000 nid=0x6c03 runnable "GC Thread#5" os_prio=31 cpu=20.16ms elapsed=35.02s tid=0x0000000132da7800 nid=0x6b03 runnable "GC Thread#6" os_prio=31 cpu=16.05ms elapsed=35.02s tid=0x0000000132dbe000 nid=0x6203 runnable "GC Thread#7" os_prio=31 cpu=16.02ms elapsed=35.02s tid=0x0000000132dbf000 nid=0x6403 runnable "GC Thread#8" os_prio=31 cpu=21.04ms elapsed=35.02s tid=0x0000000132503800 nid=0x6803 runnable "G1 Main Marker" os_prio=31 cpu=0.28ms elapsed=35.42s tid=0x0000000142042800 nid=0x3903 runnable "G1 Conc#0" os_prio=31 cpu=13.05ms elapsed=35.42s tid=0x0000000142043000 nid=0x3803 runnable "G1 Conc#1" os_prio=31 cpu=11.70ms elapsed=34.87s tid=0x0000000132511800 nid=0x6603 runnable "G1 Refine#0" os_prio=31 cpu=0.06ms elapsed=35.42s tid=0x0000000142168000 nid=0x3103 runnable "G1 Young RemSet Sampling" os_prio=31 cpu=4.03ms elapsed=35.42s tid=0x0000000142168800 nid=0x3203 runnable "VM Periodic Task Thread" os_prio=31 cpu=18.15ms elapsed=35.39s tid=0x00000001320d2800 nid=0x7703 waiting on condition JNI global refs: 18, weak refs: 0 ```

I note that the mocking I'm doing heavily uses m/sleep, and there was this issue. The stack trace seems quite different, however.

I'll share a repro project very shortly

lowecg commented 1 year ago

Repro project

https://github.com/lowecg/missionary-deadlock

leonoel commented 1 year ago

Thank you for reporting this !

I've managed to reduce it further.

(m/? (m/reduce conj
       (m/ap (m/?> (val (m/?> 2 (->> (m/seed (range))
                                  (m/eduction (map (fn [x] (assert (< x 2)) x)))
                                  (m/group-by identity))))))))

This doesn't terminate, it should fail immediately. This is a bug in either group-by or ap, related to input failure.

lowecg commented 1 year ago

You're very welcome, Leo. Thank you for looking into this so quickly.

Fantastic news that you're able to isolate the issue.

leonoel commented 1 year ago

OK, I understand the problem completely now.

First, here is a workaround for your use case - increase maximum parallelism to 3 :

(defn fetch-missing-items [cache-response-flow]
  (m/ap (let [[hit? flow] (m/?> 3 (m/group-by #(contains? % :result) cache-response-flow))]
          (m/?> (if hit?
                  flow
                  (->> flow
                       (m/eduction (map :key) (partition-all 100))
                       fetch-item-details
                       (m/eduction cat (partition-all 25))
                       store-item-details
                       (m/eduction cat)))))))

Now, the issue. Currently, when group-by's input crashes, the error is redirected on output. When the error is consumed, the active group consumers are terminated along with the main process. This ordering of events is problematic when the consumer backpressure prevents transfers (in this case, ?> when max parallelism has been reached), because then the group consumers are kept alive and never terminate, which stalls the rest of the pipeline indefinitely.

Proposed fix : when group-by's input crashes, propagate the error on output and terminate all active group consumers immediately (i.e don't wait for the main consumer to transfer the error).

lowecg commented 1 year ago

Thanks for the explanation and workaround - I've hammered the loop for the last 30 minutes, and it's holding up perfectly.

leonoel commented 1 year ago

Fixed in b.28