nim-lang / nim-zmq

Nim ZMQ wrapper
https://nim-lang.github.io/nim-zmq/zmq.html
MIT License
65 stars 17 forks source link

async poller test intermittently blocks #36

Open quantimnot opened 2 years ago

quantimnot commented 2 years ago
nim r tests/tzmq.nim

Result

[OK] reqrep
[OK] pubsub
[OK] inproc
[OK] routerdealer
[OK] pairpair_sndmore
[OK] async pub_sub

blocks

Expected

[OK] reqrep
[OK] pubsub
[OK] inproc
[OK] routerdealer
[OK] pairpair_sndmore
[OK] async pub_sub
[OK] asyncZPoller

More Information

  1. I just noticed that the issue is intermittent. It sometimes passes and sometimes does not. It tends to work if I stick an echo after the var fut = poller.pollAsync(1) line in the test.

  2. It doesn't return from the drain(timeout) call.

  3. The problem still occurs when I use status chronos lib instead of asyncdispatch. This is the shim I use to allow chronos use:

const asyncBackend {.strdefine.} = "asyncdispatch"

when asyncBackend == "asyncdispatch":
  import std/[asyncdispatch]
  export asyncdispatch
elif asyncBackend == "chronos":
  {.define: chronosFutureTracking.}
  import pkg/chronos
  export chronos
  proc hasPendingOperations*(): bool =
    pendingFuturesCount() > 0
  proc addRead*(fd: AsyncFD, cb: proc(fd: AsyncFD): bool {.closure, gcsafe.}) =
    addReader(
      fd,
      (proc(p: pointer) {.gcsafe, raises: [Defect].} =
        try:
          discard cb(fd)
        except Exception as e:
          raise newException(Defect, e.msg)),
      nil)
  proc addWrite*(fd: AsyncFD, cb: proc(fd: AsyncFD): bool {.closure, gcsafe.}) =
    addWriter(
      fd,
      (proc(p: pointer) {.gcsafe, raises: [Defect].} =
        try:
          discard cb(fd)
        except Exception as e:
          raise newException(Defect, e.msg)),
      unsafeAddr(fd))
  proc callSoon*(cb: proc (){.closure, gcsafe.}) =
    chronos.callSoon(AsyncCallback(
        function: (
          proc(p: pointer) {.gcsafe, raises: [Defect].} =
            try:
              cb()
            except Exception as e:
              raise newException(Defect, e.msg)),
        udata: nil))
  proc drain*(timeout = 500) =
    poll()
  template fail*[T](future: Future[T]; error: ref Exception) =
    chronos.fail(future, cast[ref CatchableError](error))
  1. Occurs with refc, orc, and arc. Happens more often with orc/arc rather than refc.

Nim Version

Nim Compiler Version 1.7.1 [MacOSX: amd64]
Compiled at 2022-03-26
Copyright (c) 2006-2022 by Andreas Rumpf

git hash: afbcba909b37b8c06250b141ddb9da4bf5bb9922
active boot switches: -d:release
Clonkk commented 6 months ago

@quantimnot Have you tried with Nim 2.0 ? So far I haven't been able to reproduce the bug.

quantimnot commented 4 months ago

Hey @Clonkk, I was able to reproduce the problem with the latest Nim versions on my macOS 14 M2 computer. I just spent two days of my free time trying to debug the issue, and I must stop myself from spending anymore time on this. I don't use this library, and my computing feels kinda rusty at the moment.

Here is a test harness I was working with:

#!/bin/sh
# SPDX-License-Identifier: Unlicense
#[
ZMQLIB_HOMEBREW="$(pkg-config libzmq --variable libdir)"
ZMQLIB_WITH_DEBUG_SYMBOLS="${PORTS:-}/org.zeromq.libzmq/tree/upstream/obj/lib"
ZMQLIB_DIR=${ZMQLIB_HOMEBREW}
case "$(uname)" in
  Darwin) export DYLD_LIBRARY_PATH="$ZMQLIB_DIR";;
  *) echo "WARNING: THIS ISSUE WAS OBSERVED ON A DARWIN OS.";;
esac
set -eu pipefail
COMMON_ARGS="--outdir:. --nimcache:nimcache --threads:on -d:threadsafe"
DEBUGEE_PATH=./issue36
nim $COMMON_ARGS c --debugger:native -d:issue36 -o:"$DEBUGEE_PATH" "$0"
nim $COMMON_ARGS r "$0" "$DEBUGEE_PATH"
exit
]#

when defined issue36:

  import ./tests/tzmq {.all.}

  #[
    OS: macOS
    This test intermittently hangs on both an old Intel Macbook, and an Arm M2 Macbook.
    The main thread was blocked on ```libsystem_kernel.dylib`poll```
    Whenever the process was interrupted and then continued with lldb, it would give this error:
tzmq.nim(310) tzmq
connections.nim(133) asyncpoll
connections.nim(135) =destroy

Unhandled exception: Connection from/to tcp://127.0.0.1:15571 was destroyed but not closed. [ZmqError]
```

Now, after installing macOS 14.4, the test does NOT hang. It always goes straight to the above error.

]#

asyncpoll()

else: # run the above "debugee" program through this intermittent hang detector

[

NOTICE: (2024-03-05) This test harness is a work in progress...

]#

import std/[strutils, strformat, times, asyncdispatch, os, osproc]

const TIMEOUT_DURATION = 50 # Milliseconds

type ProcessId = typeof processID(Process()) ExecLoopStats = tuple totalDuration = default typeof epochTime() failCount = 0 successCount = 0 totalCount = 0 HangingProcessHandler = proc(pid: ProcessId)

Async procedure to execute the command and apply timeout

proc execCmdWithTimeout(cmd: string, handler: HangingProcessHandler): Future[int] {.async.} = var process = startProcess(cmd, options = {poEvalCommand, poParentStreams}) template exitCode: untyped = result proc cb(fd: AsyncFD): bool = exitCode = process.waitForExit(0) addTimer(TIMEOUT_DURATION, false, cb)

Function to measure program execution duration in milliseconds

proc measureExecTime(cmd: string, handler: HangingProcessHandler): Future[tuple[duration: typeof epochTime(), finished: bool]] {.async.} = let startTime = epochTime() let exitCode = await execCmdWithTimeout(cmd, handler) let endTime = epochTime() (endTime - startTime, exitCode < 128)

proc sequentialExecLoop(cmdStr: string, maxAttempts = 1000, handler: HangingProcessHandler): Future[ExecLoopStats] {.async.} =

Loop the execution of the program and measure duration

for run in 1..maxAttempts:
  let execResult = await measureExecTime(cmdStr, handler)
  inc result.totalCount
  if execResult.finished:
    result.totalDuration += execResult.duration
    inc result.successCount
  else:
    inc result.failCount

proc main() {.async.} = proc handleHang(pid: ProcessId) = echo "hang" let debuggeeExecPath = paramStr(1) let stats = await sequentialExecLoop(debuggeeExecPath, 1000, handleHang)

Calculate average normal execution duration if any runs succeeded

if stats.successCount > 0:
  let averageDuration = stats.totalDuration / float stats.successCount
  echo "Average Successful Duration: ", averageDuration
else:
  echo "All runs timed out."
echo &"hang ratio: {stats.failCount}/{stats.totalCount}"

waitFor main()

Clonkk commented 4 months ago

I don't have a mac but i'll see if i can reproduce the issue on Linux.

Thanks for the effort

Clonkk commented 1 month ago

Ok I had time to understand what happens.

The problem here is that in the async callback I used a blocking receive() function under the assumption that the socket would always receive the message. This assumption is not necessarily true in async context if the receive proc is called before a send() function has been called.

I will fix the test to reflect that and add a docinfo string to reflect that.