zeromq / libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
https://www.zeromq.org
Mozilla Public License 2.0
9.57k stars 2.34k forks source link

Integer overflow in ZMQ_HEARTBEAT_TTL #4226

Open g4b1nagy opened 3 years ago

g4b1nagy commented 3 years ago

Issue description

Setting this on a server socket works: ZMQ_HEARTBEAT_TTL == 65599 while this causes clients to disconnect at every PING heartbeat: ZMQ_HEARTBEAT_TTL == 65600

Since the max INT value on 2 bytes is 65535 and since the value is rounded down to the nearest decisecond in libzmq, this seems to point to an integer overflow. Other values such as ZMQ_HEARTBEAT_IVL, ZMQ_HEARTBEAT_TIMEOUT might be affected as well, but I couldn't test for these as easily.

Environment

Tested on Linux x86_64 using both:

Minimal test code / Steps to reproduce the issue

# server.py

import logging
import threading
import time

import zmq
from zmq.utils.monitor import recv_monitor_message

SERVER_ADDRESS = 'tcp://*:8080'
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(message)s')
ZMQ_EVENT_ID_NAME_MAP = {getattr(zmq, name): name for name in dir(zmq) if name.startswith('EVENT_')}

def event_monitor(monitor):
    while monitor.poll():
        event = recv_monitor_message(monitor)
        event['event'] = ZMQ_EVENT_ID_NAME_MAP[event['event']]
        logging.info(event)

context = zmq.Context()
socket = context.socket(zmq.ROUTER)
monitor = socket.get_monitor_socket()
monitor_thread = threading.Thread(target=event_monitor, args=(monitor,))
monitor_thread.start()

socket.setsockopt(zmq.HEARTBEAT_IVL, 1000)

# This works just fine:
# socket.setsockopt(zmq.HEARTBEAT_TTL, 65599)

# This causes clients to disconnect every second:
# socket.setsockopt(zmq.HEARTBEAT_TTL, 65600)

socket.bind(SERVER_ADDRESS)
time.sleep(1000)
# client.py

import logging
import threading

import zmq
from zmq.utils.monitor import recv_monitor_message

SERVER_ADDRESS = f'tcp://127.0.0.1:8080'
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(message)s')
ZMQ_EVENT_ID_NAME_MAP = {getattr(zmq, name): name for name in dir(zmq) if name.startswith('EVENT_')}

def event_monitor(monitor):
    while monitor.poll():
        event = recv_monitor_message(monitor)
        event['event'] = ZMQ_EVENT_ID_NAME_MAP[event['event']]
        logging.info(event)

context = zmq.Context()
socket = context.socket(zmq.DEALER)
monitor = socket.get_monitor_socket()
monitor_thread = threading.Thread(target=event_monitor, args=(monitor,))
monitor_thread.start()
socket.setsockopt_string(zmq.IDENTITY, 'client')
socket.connect(SERVER_ADDRESS)
while True:
    data = socket.recv()
    logging.info(f'Received: {data}')

What's the actual result? (include assertion message & call stack if applicable)

Every second:
2021-06-28 14:14:01,743 - {'event': 'EVENT_DISCONNECTED', 'value': 13, 'endpoint': b'tcp://127.0.0.1:8080'}
2021-06-28 14:14:01,868 - {'event': 'EVENT_ACCEPTED', 'value': 13, 'endpoint': b'tcp://127.0.0.1:8080'}
2021-06-28 14:14:01,868 - {'event': 'EVENT_HANDSHAKE_SUCCEEDED', 'value': 0, 'endpoint': b'tcp://127.0.0.1:8080'}

What's the expected result?

A stable connection.

ljluestc commented 3 months ago

import logging
import threading
import time
import zmq
from zmq.utils.monitor import recv_monitor_message

SERVER_ADDRESS = 'tcp://*:8080'
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(message)s')
ZMQ_EVENT_ID_NAME_MAP = {getattr(zmq, name): name for name in dir(zmq) if name.startswith('EVENT_')}

def event_monitor(monitor):
    while monitor.poll():
        event = recv_monitor_message(monitor)
        event['event'] = ZMQ_EVENT_ID_NAME_MAP[event['event']]
        logging.info(event)

def set_heartbeat_ttl(socket, ttl):
    # Limit the value of ZMQ_HEARTBEAT_TTL to a maximum allowed value
    max_ttl = 65535
    if ttl > max_ttl:
        ttl = max_ttl
    socket.setsockopt(zmq.HEARTBEAT_TTL, ttl)

context = zmq.Context()
socket = context.socket(zmq.ROUTER)
monitor = socket.get_monitor_socket()
monitor_thread = threading.Thread(target=event_monitor, args=(monitor,))
monitor_thread.start()

socket.setsockopt(zmq.HEARTBEAT_IVL, 1000)

# Set heartbeat TTL with a value within the allowed range
set_heartbeat_ttl(socket, 65599)

socket.bind(SERVER_ADDRESS)
time.sleep(1000)