rthalley / dnspython

a powerful DNS toolkit for python
http://www.dnspython.org
Other
2.42k stars 509 forks source link

Can't re-use asyncio UDP socket with multiple outstanding queries #930

Open aldem opened 1 year ago

aldem commented 1 year ago

Attempting to send (and await) multiple asynchronous queries using one UDP socket consistently fails with assertion error like in #843

To reproduce:

import asyncio
import socket

import dns.asyncbackend
import dns.asyncquery

async def main() -> None:
    sock = await dns.asyncbackend.get_backend("asyncio").make_socket(
        socket.AF_INET, socket.SOCK_DGRAM
    )

    tasks = []
    for _ in range(5):
        query = dns.message.make_query(dns.name.from_text("example.com."), dns.rdatatype.A)
        tasks.append(asyncio.create_task(dns.asyncquery.udp(query, "1.1.1.1", timeout=5, sock=sock)))

    await asyncio.gather(*tasks)

asyncio.run(main())

Output:

Traceback (most recent call last):
  File "/home/developer/src/python/dnsbug.py", line 20, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/developer/src/python/dnsbug.py", line 18, in main
    await asyncio.gather(*tasks)
  File "/home/developer/src/python/pvenv/lib/python3.9/site-packages/dns/asyncquery.py", line 199, in udp
    (r, received_time, _) = await receive_udp(
  File "/home/developer/src/python/pvenv/lib/python3.9/site-packages/dns/asyncquery.py", line 138, in receive_udp
    (wire, from_address) = await sock.recvfrom(65535, _timeout(expiration))
  File "/home/developer/src/python/pvenv/lib/python3.9/site-packages/dns/_asyncio_backend.py", line 72, in recvfrom
    assert self.protocol.recvfrom is None
AssertionError

Environment: dnspython 2.3.0 on Python 3.9.2 (packaged with Debian 11) and 3.11.3 (compiled from source).

I believe that this is related to the fact that the method DatagramSocket.recvfrom() is non-reentrant.

rthalley commented 1 year ago

None of dnspython's I/O methods are intended to be reentrant in this way. If you want to do a "gather" style like in your example, then not specifying the socket is the way to go. Each query would use its own socket and reentrancy issues would be avoided. It's probably possible to build something that would multiplex and demultiplex on one async socket, and wake up various futures when the associated I/O completed, but this is not something we're planning on writing at the moment.

aldem commented 1 year ago

Not specifying a socket leads to thousands of open descriptors which is less than optimal (I do periodic SOA/DNSSEC checks on mass scale, >100k zones).

Even if you do not plan to - would you accept a PR if I implement this? No promises though, it could take time :)

I really like dnspython and so far this is the only downside.

bwelling commented 1 year ago

It's hard to say whether a PR would be accepted; it would depend a lot on the PR. I think it would be a lot more likely if this behavior were optional and configurable, though.

rthalley commented 1 year ago

I agree with Brian that how it is done would matter a lot. We should discuss the idea and see if there is agreement on how best to do it. I'd be looking for some sort of demultiplexing socket listener task that wakes up futures, and uses the async receive_udp. (And sending would be via send_udp.). I wouldn't want changes in the general async query code, as this is a very special use case.

aldem commented 1 year ago

The more I look at the code the more I lean in the direction to make a dedicated class like asyncresolver but using only single UDP socket.

To handle such case we have to remember all requests sent (query id & destination) and then check against this when receiving responses, handling timeouts and filtering out garbage on the way.

I have yet to find an effective way to do this in Python, but in any case it will be less expensive resource-wise comparing to opening/closing new socket for every request.

EDIT: As to special use case - yes, my case is special but sending multiple queries in a row is no so special, opening/closing sockets for each request is expensive anyway and should be avoided if possible.

bwelling commented 1 year ago

I don't think it would need to be a new class like asyncresolver; I assume that it would either be a modification to the methods in dns.asyncquery dealing with UDP, or new methods in that module. None of the resolver code would need to change.

The typical way that something like this would be done in a non-async environment involves a map of outstanding requests, the client waiting on an event instead of directly waiting for the packet, and a separate thread reading from the socket and dispatching responses. I assume there's a better solution than "separate thread" that better integrates into an async environment, but I'm not all that familiar with the async models. There is some additional complexity related to the fact that there's no guarantee that query ids are unique.

Also, I disagree with @rthalley that this is a "very special use case". It's not needed for normal DNS resolution, but it's not that uncommon for someone to want to use dnspython to send many queries in parallel.

rthalley commented 1 year ago

I'm not against creating this kind of thing, I just didn't want the regular query API to be altered by supporting this. Take the following as brainstorming for more consideration:

What if the nameserver abstraction:

1) gained persistence 2) got separate send and receive methods that ensured locking when needed

?

Then you could make two variations of almost the same code (sync and async), which would have a listener task / thread calling the nameserver receive method, and it would try to dispatch things it read to interested parties. Dispatching is basically calling some closure with the received message, and the closure could either wake up a sync/async future or just act like a callback and handle the response directly. There'd also be code to make an addition to the pending queries dictionary and then send the message (with suitable locking if needed). There would be a query API which did a send with a closure with a future and then blocked on the future. The pending queries dictionary would be keyed by (qid, rdtype, rdclass, qname).

Note that the QUIC code already works like this, in the sense that there is a listener that wakes up futures when it finishes a read on a QUIC stream. Also when I wrote the QUIC code I discovered I couldn't have just one common bit of code doing the receive as a non-Trio thread couldn't wake up a Trio condition.

This approach is a decent amount of work, but would be super useful for many things. I went for "expand on the nameserver abstraction" because this lets us have protocol independent dispatcher code. So if you decided you wanted to switch from regular DNS-over-UDP-port-53 to say DNS-over-TLS, you could just change the nameserver object.

rthalley commented 1 year ago

Brian pointed out that my dictionary scheme above isn't right, as it doesn't work if you don't get a question section in the reply. Also, while my "do it with nameserver objects" idea has nice generality, it's also a LOT of work. It may be that the problem can be split up into separate simpler problems. I'll keep thinking on this. I would like such a feature in dnspython, though with the 2.4 release process starting pretty soon, it will be an "after 2.4" thing.

grigi commented 1 year ago

Have a similar problem and worked around it a bit by using a generic pool manager (https://pypi.org/project/generic-connection-pool/):

from dns._asyncbackend import DatagramSocket
from generic_connection_pool.asyncio import BaseConnectionManager, ConnectionPool

class UdpSocketConnectionManager(BaseConnectionManager[int, DatagramSocket]):
    def __init__(self) -> None:
        self.backend = dns.asyncbackend.get_backend("asyncio")

    async def create(self, endpoint: int) -> DatagramSocket:
        return await self.backend.make_socket(socket.AF_INET, socket.SOCK_DGRAM)

    async def dispose(self, endpoint: int, conn: DatagramSocket) -> None:
        await conn.close()

udp_pool = ConnectionPool[int, DatagramSocket](
    UdpSocketConnectionManager(),
    idle_timeout=30.0,
    max_lifetime=600.0,
    min_idle=0,
    max_size=1000,
    total_max_size=1000,
)

And then for each connection you have a task:

async def query_udp(query):
    async with udp_pool.connection(53) as udp_sock:
        return await dns.asyncquery.udp_with_fallback(query, '1.1.1.1', timeout=5, sock=udp_sock)

And you can gather on each of those. Setting the pool's size works effectively as a concurrency limiter.

rthalley commented 1 year ago

Just an update that I've made some progress on this, though it will still not be in 2.4. I'm not sure I'll get to quite the vision I described, but I definitely have something that works for UDP.

rthalley commented 10 months ago

A further update. The UDP solution I mentioned back in July works just fine, and much like the way the Brian described. It's still hard to bring to dnspython, however, because it requires background tasks to service the receive ports. The background tasks would have to be three different ways (sync, asyncio, and trio), and also there an issue with trio about who would own the nursery. Also, this kind of scheme is really only good for UDP, and for everything else you really want more of the pooling type solution as the communication channels are inherently bound to a destination with TCP, DoT, DoH, and DoQ. So, still pondering!