enwi / dartzmq

A simple dart zeromq implementation/wrapper around the libzmq C++ library
https://pub.dev/documentation/dartzmq/latest/
MIT License
26 stars 17 forks source link

How to use Req-Rep mode #28

Closed JSYRD closed 6 months ago

JSYRD commented 1 year ago

I'm a noob in ZeroMQ. The guide says:

The REQ-REP socket pair is in lockstep. The client issues zmq_send() and then zmq_recv(), in a loop (or once if that’s all it needs). Doing any other sequence (e.g., sending two messages in a row) will result in a return code of -1 from the send or recv call. Similarly, the service issues zmq_recv() and then zmq_send() in that order, as often as it needs to.

But there's no such method like zmq_recv, so I tried to use socket.messages.listen() to receive. It DID receive messages, but always throwing exceptions:

[ERROR:flutter/runtime/dart_vm_initializer.cc(41)] Unhandled Exception: ZeroMQException(156384763): Operation cannot be accomplished in current state
#0      _checkErrorCode
package:dartzmq/src/exception.dart:56
#1      _checkReturnCode
package:dartzmq/src/exception.dart:49
#2      ZContext._poll
package:dartzmq/src/zeromq.dart:109
#3      ZContext._startPolling.<anonymous closure>
package:dartzmq/src/zeromq.dart:68
#4      _Timer._runTimers (dart:isolate-patch/timer_impl.dart:398:19)
#5      _Timer._handleMessage (dart:isolate-patch/timer_impl.dart:429:5)
#6      _RawReceivePort._handleMessage (dart:isolate-patch/isolate_patch.dart:189:12)

I have no idea how to fix this.

The source code is below:

  @override
  void initState() {
    // omitted codes
    enterQueueSocket = ZMQHelper.getNewSocket(Config.replyUrl, SocketType.req);
    // omitted codes
    super.initState();
  }
  void enterQueue() {
    enterQueueSocket.send(utf8.encode(json.encode({
      "enterQueue": true,
      "name": _nameController.text,
      "clientId": "${ZMQHelper.context.hashCode}"
    })));
    enterQueueSocket.messages.listen((event) {
      for (var element in event) {
        print(utf8.decode(element.payload));
      }
    });
  }

I'd appreciate it if anyone would help me.

enwi commented 1 year ago

First of all you should only listen to the socket messages once:

  @override
  void initState() {
    // omitted codes
    enterQueueSocket = ZMQHelper.getNewSocket(Config.replyUrl, SocketType.req);
    enterQueueSocket.messages.listen((event) {
      for (var element in event) {
        print(utf8.decode(element.payload));
      }
    });
    // omitted codes
    super.initState();
  }
  void enterQueue() {
    enterQueueSocket.send(utf8.encode(json.encode({
      "enterQueue": true,
      "name": _nameController.text,
      "clientId": "${ZMQHelper.context.hashCode}"
    })));
  }

I would also recommend using a dealer socket, because if you send a message while the reply has not been received yet, you will also get an exception. With the dealer socket being asynchronous you don't run into this issue.

JSYRD commented 1 year ago

Thank you for your help! Actually, I've tried this after I rep this issue, but the result was the same. So I searched for the async ways to solve this problem, using dealer. The way you use dealer doesn't work for me at all. I've found a new way to use dealer-rep. I think I may push an pr later.

enwi commented 1 year ago

Would the solution provided in #21 work for you? Maybe we need to have different sockets for synchronous socket types that are not handled by the internal loop.

JSYRD commented 1 year ago

Thanks! It seems shall work for me. But given the logic that the plugin works, using dealer instead is a better choice. I've added notes about how to use dealer/rep pattern correctly in this pr #29 .

jens-hj commented 9 months ago

Would the solution provided in #21 work for you? Maybe we need to have different sockets for synchronous socket types that are not handled by the internal loop.

This would be perfect for my use-case, and also what I had expected to have available