zeromq / pyzmq

PyZMQ: Python bindings for zeromq
http://zguide.zeromq.org/py:all
BSD 3-Clause "New" or "Revised" License
3.67k stars 637 forks source link

BUG: zmq.asyncio.Poller does not return POLLPRI events before timeout #1865

Closed chrisjbillington closed 1 year ago

chrisjbillington commented 1 year ago

What pyzmq version?

25.0.2

What libzmq version?

4.3.4

Python version (and how it was installed)

Python 3.11.3 from Arch Linux repositories

OS

Arch Linux

What happened?

I'm using zmq.asyncio.Poller.poll() to wait for POLLPRI events on non-zmq file descriptors, which is how edges on GPIOs appear on e.g. raspberry pis. The non-asyncio version works fine:

poller = zmq.Poller()
poller.register(f, zmq.POLLPRI) # f is a an open file for a GPIO configured for edge triggering
events = poller.poll(5000) # returns as expected when there's a GPIO edge

However, I'm seeing a problem with the async version, poll() does not return until after the timeout:

poller = zmq.asyncio.Poller()
poller.register(f, zmq.POLLPRI)
events = await poller.poll(5000) # Returns the expected event, but only after the 5000ms timeout.

Below I've given code to reproduce the bug on a regular computer (i.e. my Arch Linux laptop), using a TCP socket to create POLLPRI events instead of GPIOs, for which the outcome is the same. The initial problem with GPIOs was on an embedded ARM computer running Ubuntu core 20.04 with Python 3.8.10, pyzmq 18.1.1, and zmq 4.3.2 from the Ubuntu repositories, so looks like the problem is not very specific to the environment, and is not a recent regression in pyzmq.

Code to reproduce bug

import zmq, zmq.asyncio
import asyncio
import threading
import time
import socket

def server():
    server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_sock.bind(('localhost', 12345))
    server_sock.listen(1)

    client_sock, _ = server_sock.accept()
    while True:
        time.sleep(1)
        # Send some out-of-band data to generate a POLLPRI event
        client_sock.send(b'1', socket.MSG_OOB)

def pollpri_test():

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(('localhost', 12345))

    poller = zmq.Poller()
    poller.register(sock, zmq.POLLPRI)

    while True:
        events = poller.poll(5000)
        if events:
            data = sock.recv(1, socket.MSG_OOB)
            print(data)
        else:
            print("Timed out.")

async def async_pollpri_test():

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(('localhost', 12345))

    poller = zmq.asyncio.Poller()
    poller.register(sock, zmq.POLLPRI)

    while True:
        events = await poller.poll(5000)
        if events:
            data = sock.recv(1, socket.MSG_OOB)
            print(data)
        else:
            print("Timed out")

threading.Thread(target=server, daemon=True).start()
time.sleep(1) # allow time to start up

SYNCHRONOUS = True

if SYNCHRONOUS:
    pollpri_test()  # prints b'1' every 1s
else:
    asyncio.run(async_pollpri_test())  # prints b'1' every 5s

Traceback, if applicable

No response

More info

No response

chrisjbillington commented 1 year ago

Right, so turns out this is just not supported by asyncio. You have to turn the POLLPRI events into POLLIN events by wrapping in a select.epoll:

epoll = select.epoll()
epoll.register(f, select.POLLPRI) # f is a an open file for a GPIO configured for edge triggering

poller = zmq.asyncio.Poller()
poller.register(epoll, zmq.POLLIN) # watch the epoll object instead
events = await poller.poll(5000) # returns as expected when there's a GPIO edge
epoll.poll(0) # Don't forget to clear the epoll object

Not much pyzmq can do about this. Closing.