nim-lang / nim-zmq

Nim ZMQ wrapper
https://nim-lang.github.io/nim-zmq/zmq.html
MIT License
67 stars 17 forks source link

Exception thrown when using async and orc/arc vs refc/boehm #33

Closed quantimnot closed 2 years ago

quantimnot commented 2 years ago

I'm not solid on asyncdispatch, orc, or zmq. I may be doing something wrong, but here is the situation I've run into.

# sub.nim
import
  std/asyncdispatch,
  pkg/zmq

proc subscriber {.async.} =
  var subscriber = zmq.connect("tcp://127.0.0.1:5571", SUB)
  defer: subscriber.close()
  subscriber.setsockopt(SUBSCRIBE, "")
  while true:
    var msg = await subscriber.receiveAsync()
    echo msg

waitFor subscriber()
# pub.nim
import
  std/asyncdispatch,
  pkg/zmq

proc publisher {.async.} =
  var publisher = zmq.listen("tcp://127.0.0.1:5571", PUB)
  defer: publisher.close()
  var n = 0
  while true:
    publisher.send("topic", SNDMORE)
    publisher.send("test " & $n)
    await sleepAsync(1000)
    inc n

waitFor publisher()

Run with ORC

nim r --mm:orc pub &
nim r --mm:orc sub

Run with ORC Output

Error: unhandled exception: Connection from/to tcp://127.0.0.1:5571 was destroyed but not closed.

Run with REFC

nim r --mm:refc pub &
nim r --mm:refc sub

Run with REFC Output

...
topic
test 2
topic
test 3
...

I also got ORC to work when I commented out the ZConnection destructor in connections.nim.

#[
  Destructor
]#
# when defined(gcDestructors):
#   proc close*(c: var ZConnection, linger: int = 0)
#   proc `=destroy`(x: var ZConnection) =
#     if x.alive:
#       raise newException(ZmqError, &"Connection from/to {x.sockaddr} was destroyed but not closed.")

Another observation is that commenting out the publisher.send("topic", SNDMORE) line in the publisher will also make the subscriber work (without the topic of course).

Also, I read this comment in zmq.nim, but I don't know if it applies:

## When using asynchrone procs, be careful of the internal state of the ZMQ Socket.
## Some Socket (such as REP/REQ) cannot send two message in a row without a receive (or vice-versa)
Clonkk commented 2 years ago

I checked your code and it seems correct; after further investigation it is indeed a bug in the async receive/send proc. I'll fix it in my next PR.

Also, I read this comment in zmq.nim, but I don't know if it applies:

When using asynchrone procs, be careful of the internal state of the ZMQ Socket. Some Socket (such as REP/REQ) cannot send two message in a row without a receive (or vice-versa)

When using asynchronous procedure you don't necessarily control in which order your proc gets executed. Some socket (mostly REP / REQ) have an internal state machine and will enter a blocked state if you perform operation out of order;

For instance in a REQ socket doing send -> receive -> send works but send -> send -> receive will block after the second send. PUB / SUB is not a synchrone pattern in ZMQ so you shouldn't have any trouble with async.

Clonkk commented 2 years ago

@quantimnot This should be fixed with release v1.3.0