enwi / dartzmq

A simple dart zeromq implementation/wrapper around the libzmq C++ library
https://pub.dev/documentation/dartzmq/latest/
MIT License
26 stars 17 forks source link

Received messages are buffered #19

Closed canders-rf closed 1 year ago

canders-rf commented 1 year ago

Messages produced by a server at say 4 hz, will be batched such that 4 messages are received every second. If I change the server to produce at 10hz, I get 10 messages received in a batch every second. I verified this happens regardless if I use a dealer/dealer or pub/sub connection type. I also, verified this happens with a server written in Julia (instead of Python). This seems to be something unique to the dart ZMQ client as it doesn't happen when I used Julia or Python clients.

Here is the output from the Dart code (below). It shows the batching of messages:

image

Here is the important part of the Dart code:


class _MyHomePageState extends State<MyHomePage> {
  int _counter = 0;
  final ZContext _context = ZContext();
  late final ZSocket _socket;
  late StreamSubscription _subscription;
  String _receivedData = '';
  DateTime ? lastTime = null;

 @override
  void initState() {
    _socket = _context.createSocket(SocketType.dealer);
    _socket.connect("tcp://localhost:4482");

    // listen for messages
    _subscription = _socket.messages.listen((msg) {
      _receivedData = msg.toString();
      var now = DateTime.now();
      if (lastTime != null) {
        var diff = now.difference(lastTime!);
        print("msg received  timing = $diff");
      }
      lastTime = now;
      setState(() {
      });
    });
    super.initState();
  }

Here is the entire server Python code:

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.set(zmq.LINGER, 0)
address = "tcp://*:4482"
socket.bind(address)

print("Running...")
try:
    last_time = None
    while True:
        time.sleep(.25)
        try:
          socket.send_string("test", zmq.NOBLOCK)
          now = time.time()
          if last_time is not None:
            print(f"Sent message.  timing = {now - last_time}")
          last_time = now
        except Exception as e:
            pass
finally:
    socket.unbind(address)  # ALWAYS RELEASE PORT
    socket.close()  # ALWAYS RELEASE RESOURCES
    context.term()  # ALWAYS RELEASE RESOURCES

Am I doing something wrong? Would love to get this fixed! Willing to help!

enwi commented 1 year ago

Duplicate of #4