zeromq / libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
https://www.zeromq.org
Mozilla Public License 2.0
9.62k stars 2.35k forks source link

pyzmq: HWT not respected in PUSH/PULL #4529

Open matanfih opened 1 year ago

matanfih commented 1 year ago

Issue description

maybe I'm missing something here, but it looks like HWT does not block push pull as expected

Environment

Minimal test code / Steps to reproduce the issue

push:

import time
import zmq

context = zmq.Context()

# Set up a PUSH socket with a high water mark of 10 messages for sending
socket = context.socket(zmq.PUSH)
socket.setsockopt(zmq.SNDHWM, 1500)
socket.set_hwm(10)
socket.bind("tcp://127.0.0.1:5566")

for i in range(200000):
    try:
        message = f"Message [{i}::{time.time()}]"
        print(f"{len(message)}:::Message [{i}::{time.time()}]")
        socket.send_string(f"Message [{i}::{time.time()}]", zmq.DONTWAIT)
    except zmq.error.Again:
        # Handle the situation where the socket is blocked due to the high water mark being reached
        print("Socket is blocked! Waiting for the worker to consume messages...")
        while True:
            # Check whether the socket is ready for sending
            # We use poll with a timeout of 1000ms to avoid busy-waiting
            if socket.poll(timeout=5000, flags=zmq.POLLOUT):
                # If the socket is ready for sending, break out of the loop and try again
                break
            else:
                # If the socket is not ready for sending, wait for the worker to consume messages
                print("Worker is slow! Waiting...")
                time.sleep(1)

# Clean up
socket.close()
context.term()

pull

import time
import zmq

context = zmq.Context()

receiver = context.socket(zmq.PULL)
receiver.setsockopt(zmq.RCVHWM, 1)
receiver.connect("tcp://127.0.0.1:5566")

def worker():
    i = 0
    while True:
        try:
            message = receiver.recv_string(zmq.NOBLOCK)
            print(f"Received message: [{i}:{time.time()}]{message}")
            time.sleep(500e-3)
            i+=1
        except zmq.Again:
            print("no messages")
            time.sleep(100e-3)

worker()

What's the actual result? (include assertion message & call stack if applicable)

pusher keep on bombarding the socket until reaching what it seems like maximum mem [70K+ messages]

What's the expected result?

pusher is stopping from time to time to let puller clear some of queued messages

snippet from log:

...
Message [72747::1678881796.1216621]
Message [72748::1678881796.1216683]
Message [72749::1678881796.1216772]
Message [72750::1678881796.121687]
Message [72751::1678881796.1216931]
Socket is blocked! Waiting for the worker to consume messages...
Worker is slow! Waiting...
Worker is slow! Waiting...
matanfih commented 1 year ago

I think I got it, would like you guys to approve it before closing what I was missing was not configuring BUF and HWM for both sides, once configured it works great ** once approved I assume next step is to update (the) with how to properly back-pressure

e.g.

PUSH

import zmq
import time

context = zmq.Context()
sender = context.socket(zmq.PUSH)
sender.setsockopt(zmq.SNDHWM, 2)  # set high water mark to 2
sender.setsockopt(zmq.SNDBUF, 20)
sender.connect("tcp://localhost:5559")

for i in range(1000):
    message = f"Message {i}"
    sender.send_string(message)
    print(f"Sent: {message}")
#    time.sleep(100e-3)

PUSH exploiting Again

import zmq
import time

context = zmq.Context()
sender = context.socket(zmq.PUSH)
sender.setsockopt(zmq.SNDHWM, 30) 
sender.setsockopt(zmq.SNDBUF, 20)
sender.connect("tcp://localhost:5560")

for i in range(1000):
    message = f"Message {i}"
    try:
        sender.send_string(message, zmq.NOBLOCK)
        print(f"Sent: {message}")
    except zmq.Again as again:
        print(again)
        time.sleep(50e-3)
        sender.send_string(message)
        print(f"Sent: {message}")

PULL

import zmq
import time

context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.setsockopt(zmq.RCVHWM, 2)  # set high water mark to 2
receiver.setsockopt(zmq.RCVBUF, 20)
receiver.bind("tcp://*:5559")

while True:
    message = receiver.recv()
    print(f"Received: {message}")
    time.sleep(100e-3)