zeromq / pyzmq

PyZMQ: Python bindings for zeromq
http://zguide.zeromq.org/py:all
BSD 3-Clause "New" or "Revised" License
3.65k stars 637 forks source link

BUG: Memory leak on receiver side in asyncio #1928

Closed Azmisov closed 6 months ago

Azmisov commented 6 months ago

What pyzmq version?

25.1.0

What libzmq version?

4.3.4

Python version (and how it was installed)

3.11.5

OS

Ubuntu 22.04

What happened?

Memory usage climbs quickly on the receiver to > 8GiB, at which point I kill it before it can crash my system. Memory usage is constant for the sending side.

Client (receiver):

import zmq.asyncio, gc, asyncio

async def client():
    ctx = zmq.asyncio.Context()
    sock = ctx.socket(zmq.PAIR)
    sock.setsockopt(zmq.RCVBUF, 1024*10)
    sock.setsockopt(zmq.RCVHWM, 1)
    sock.connect("ipc:///tmp/memtest")

    print("client ready")
    i = 0
    while True:
        data = await sock.recv(copy=True)
        print("client recv", i)
        del data
        gc.collect()
        i += 1

asyncio.run(client())

Server (sender):

import zmq.asyncio, asyncio

async def server():
    ctx = zmq.asyncio.Context()
    sock = ctx.socket(zmq.PAIR)
    sock.setsockopt(zmq.SNDBUF, 1024*10)
    sock.setsockopt(zmq.SNDHWM, 1)
    sock.bind("ipc:///tmp/memtest")

    # random data
    data = b'0'*1000000

    print("server ready")
    i = 0
    while True:
        await sock.send(data, copy=True)
        print("server send", i)
        i += 1

asyncio.run(server())

Code to reproduce bug

See above

Traceback, if applicable

None

More info

None

minrk commented 6 months ago

Confirming this is specific to asyncio and an always-busy socket. Trading zmq.asyncio.Context() for zmq.Context() in the receiver results in no memory growth beyond ~45MB. Similarly, if there's ever a pause between messages, references will be cleaned up.

The underlying issue is that in the example given, the event loop is never idle, so Future done_callbacks are never called (the job of the event loop), which cleanup references to the Future, which in turn contains a reference to the message.

Adding an asyncio.sleep(0) (e.g. every 1000 messages) gives the event loop a moment to do its usual cleanup. I'm surprised asyncio never calls the done_callbacks in the await, but that's a question for asyncio. pyzmq isn't in control of the disposal of Futures, that's the event loop's job.

Ultimately, this is a result of the optimization that pyzmq doesn't use coroutines, it directly uses Futures. When pyzmq is busy (i.e. there's always a message to recv), these Futures are immediately complete, meaning the event loop doesn't idle if a zmq socket is the only thing that's ever awaited and zmq never actually has to wait for a message. If there were any actually async calls, cleanup would occur, which is why asyncio.sleep(0) works.

This issue exists in general, but this particular reference can be avoided by not registering the done callback in the event (#1929). But if you actually have any other coroutines waiting to run, this busy coroutine will never yield to them as long as the socket has messages to receive.

You can see the same effect with pure asyncio:

import asyncio
from functools import partial

def printer(msg, _f):
    print(msg)

async def main():
    print("start")
    for i in range(10):
        digit = str(i % 10)
        data = digit.encode("ascii") * 1_000_000
        f = asyncio.Future()
        f.add_done_callback(partial(printer, f"done_callback {i}"))
        f.set_result(data)
        print(f"await {i}")
        # f is already done! this await doesn't yield to anything
        await f
    print("end")

asyncio.run(main())

which produces:

start
await 0
await 1
await 2
await 3
await 4
await 5
await 6
await 7
await 8
await 9
end
done_callback 0
done_callback 1
done_callback 2
done_callback 3
done_callback 4
done_callback 5
done_callback 6
done_callback 7
done_callback 8
done_callback 9
Azmisov commented 6 months ago

Okay that's very interesting, I didn't know that was how python futures behave.

In the python docs, it does say future's callbacks are scheduled with call_soon , so wouldn't get called when you await the result; so apparently the correct behavior. I opened this discussion on discuss.python.org to change it to match how tasks behave instead (where done callback is completed by the time await returns). Probably won't come to anything.

minrk commented 6 months ago

Yeah, I'm just surprised that the presence of await doesn't give call_soon any chance to fire. I thought you would have to avoid the await, like:

f = s.recv()
if f.done():
    result = f.result()
else:
    result = await f

to starve other tasks.

This isn't the first time starvation has been an issue, but the performance benefit is substantial, if I recall. It's a tricky trade-off. I've thought perhaps a flag on the sockets to have a more polite, cooperative per-call yield could be desirable, but folks are going to have to find it first, if it's opt-in.