Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't re-use asyncio UDP socket with multiple outstanding queries #930

Open
aldem opened this issue May 8, 2023 · 11 comments
Open

Can't re-use asyncio UDP socket with multiple outstanding queries #930

aldem opened this issue May 8, 2023 · 11 comments
Labels
Enhancement Request Future A longer term issue, possibly addressed in the future.

Comments

@aldem
Copy link

aldem commented May 8, 2023

Attempting to send (and await) multiple asynchronous queries using one UDP socket consistently fails with assertion error like in #843

To reproduce:

import asyncio
import socket

import dns.asyncbackend
import dns.asyncquery


async def main() -> None:
    sock = await dns.asyncbackend.get_backend("asyncio").make_socket(
        socket.AF_INET, socket.SOCK_DGRAM
    )

    tasks = []
    for _ in range(5):
        query = dns.message.make_query(dns.name.from_text("example.com."), dns.rdatatype.A)
        tasks.append(asyncio.create_task(dns.asyncquery.udp(query, "1.1.1.1", timeout=5, sock=sock)))

    await asyncio.gather(*tasks)

asyncio.run(main())

Output:

Traceback (most recent call last):
  File "/home/developer/src/python/dnsbug.py", line 20, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/developer/src/python/dnsbug.py", line 18, in main
    await asyncio.gather(*tasks)
  File "/home/developer/src/python/pvenv/lib/python3.9/site-packages/dns/asyncquery.py", line 199, in udp
    (r, received_time, _) = await receive_udp(
  File "/home/developer/src/python/pvenv/lib/python3.9/site-packages/dns/asyncquery.py", line 138, in receive_udp
    (wire, from_address) = await sock.recvfrom(65535, _timeout(expiration))
  File "/home/developer/src/python/pvenv/lib/python3.9/site-packages/dns/_asyncio_backend.py", line 72, in recvfrom
    assert self.protocol.recvfrom is None
AssertionError

Environment: dnspython 2.3.0 on Python 3.9.2 (packaged with Debian 11) and 3.11.3 (compiled from source).

I believe that this is related to the fact that the method DatagramSocket.recvfrom() is non-reentrant.

@rthalley
Copy link
Owner

rthalley commented May 8, 2023

None of dnspython's I/O methods are intended to be reentrant in this way. If you want to do a "gather" style like in your example, then not specifying the socket is the way to go. Each query would use its own socket and reentrancy issues would be avoided. It's probably possible to build something that would multiplex and demultiplex on one async socket, and wake up various futures when the associated I/O completed, but this is not something we're planning on writing at the moment.

@aldem
Copy link
Author

aldem commented May 8, 2023

Not specifying a socket leads to thousands of open descriptors which is less than optimal (I do periodic SOA/DNSSEC checks on mass scale, >100k zones).

Even if you do not plan to - would you accept a PR if I implement this? No promises though, it could take time :)

I really like dnspython and so far this is the only downside.

@bwelling
Copy link
Collaborator

bwelling commented May 8, 2023

It's hard to say whether a PR would be accepted; it would depend a lot on the PR. I think it would be a lot more likely if this behavior were optional and configurable, though.

@rthalley
Copy link
Owner

rthalley commented May 9, 2023

I agree with Brian that how it is done would matter a lot. We should discuss the idea and see if there is agreement on how best to do it. I'd be looking for some sort of demultiplexing socket listener task that wakes up futures, and uses the async receive_udp. (And sending would be via send_udp.). I wouldn't want changes in the general async query code, as this is a very special use case.

@aldem
Copy link
Author

aldem commented May 10, 2023

The more I look at the code the more I lean in the direction to make a dedicated class like asyncresolver but using only single UDP socket.

To handle such case we have to remember all requests sent (query id & destination) and then check against this when receiving responses, handling timeouts and filtering out garbage on the way.

I have yet to find an effective way to do this in Python, but in any case it will be less expensive resource-wise comparing to opening/closing new socket for every request.

EDIT: As to special use case - yes, my case is special but sending multiple queries in a row is no so special, opening/closing sockets for each request is expensive anyway and should be avoided if possible.

@bwelling
Copy link
Collaborator

I don't think it would need to be a new class like asyncresolver; I assume that it would either be a modification to the methods in dns.asyncquery dealing with UDP, or new methods in that module. None of the resolver code would need to change.

The typical way that something like this would be done in a non-async environment involves a map of outstanding requests, the client waiting on an event instead of directly waiting for the packet, and a separate thread reading from the socket and dispatching responses. I assume there's a better solution than "separate thread" that better integrates into an async environment, but I'm not all that familiar with the async models. There is some additional complexity related to the fact that there's no guarantee that query ids are unique.

Also, I disagree with @rthalley that this is a "very special use case". It's not needed for normal DNS resolution, but it's not that uncommon for someone to want to use dnspython to send many queries in parallel.

@rthalley
Copy link
Owner

I'm not against creating this kind of thing, I just didn't want the regular query API to be altered by supporting this. Take the following as brainstorming for more consideration:

What if the nameserver abstraction:

  1. gained persistence
  2. got separate send and receive methods that ensured locking when needed

?

Then you could make two variations of almost the same code (sync and async), which would have a listener task / thread calling the nameserver receive method, and it would try to dispatch things it read to interested parties. Dispatching is basically calling some closure with the received message, and the closure could either wake up a sync/async future or just act like a callback and handle the response directly. There'd also be code to make an addition to the pending queries dictionary and then send the message (with suitable locking if needed). There would be a query API which did a send with a closure with a future and then blocked on the future. The pending queries dictionary would be keyed by (qid, rdtype, rdclass, qname).

Note that the QUIC code already works like this, in the sense that there is a listener that wakes up futures when it finishes a read on a QUIC stream. Also when I wrote the QUIC code I discovered I couldn't have just one common bit of code doing the receive as a non-Trio thread couldn't wake up a Trio condition.

This approach is a decent amount of work, but would be super useful for many things. I went for "expand on the nameserver abstraction" because this lets us have protocol independent dispatcher code. So if you decided you wanted to switch from regular DNS-over-UDP-port-53 to say DNS-over-TLS, you could just change the nameserver object.

@rthalley
Copy link
Owner

rthalley commented Jun 4, 2023

Brian pointed out that my dictionary scheme above isn't right, as it doesn't work if you don't get a question section in the reply. Also, while my "do it with nameserver objects" idea has nice generality, it's also a LOT of work. It may be that the problem can be split up into separate simpler problems. I'll keep thinking on this. I would like such a feature in dnspython, though with the 2.4 release process starting pretty soon, it will be an "after 2.4" thing.

@grigi
Copy link

grigi commented Jun 16, 2023

Have a similar problem and worked around it a bit by using a generic pool manager (https://pypi.org/project/generic-connection-pool/):

from dns._asyncbackend import DatagramSocket
from generic_connection_pool.asyncio import BaseConnectionManager, ConnectionPool

class UdpSocketConnectionManager(BaseConnectionManager[int, DatagramSocket]):
    def __init__(self) -> None:
        self.backend = dns.asyncbackend.get_backend("asyncio")

    async def create(self, endpoint: int) -> DatagramSocket:
        return await self.backend.make_socket(socket.AF_INET, socket.SOCK_DGRAM)

    async def dispose(self, endpoint: int, conn: DatagramSocket) -> None:
        await conn.close()

udp_pool = ConnectionPool[int, DatagramSocket](
    UdpSocketConnectionManager(),
    idle_timeout=30.0,
    max_lifetime=600.0,
    min_idle=0,
    max_size=1000,
    total_max_size=1000,
)

And then for each connection you have a task:

async def query_udp(query):
    async with udp_pool.connection(53) as udp_sock:
        return await dns.asyncquery.udp_with_fallback(query, '1.1.1.1', timeout=5, sock=udp_sock)

And you can gather on each of those. Setting the pool's size works effectively as a concurrency limiter.

@rthalley
Copy link
Owner

rthalley commented Jul 2, 2023

Just an update that I've made some progress on this, though it will still not be in 2.4. I'm not sure I'll get to quite the vision I described, but I definitely have something that works for UDP.

@rthalley
Copy link
Owner

rthalley commented Nov 6, 2023

A further update. The UDP solution I mentioned back in July works just fine, and much like the way the Brian described. It's still hard to bring to dnspython, however, because it requires background tasks to service the receive ports. The background tasks would have to be three different ways (sync, asyncio, and trio), and also there an issue with trio about who would own the nursery. Also, this kind of scheme is really only good for UDP, and for everything else you really want more of the pooling type solution as the communication channels are inherently bound to a destination with TCP, DoT, DoH, and DoQ. So, still pondering!

@rthalley rthalley added the Future A longer term issue, possibly addressed in the future. label Nov 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Enhancement Request Future A longer term issue, possibly addressed in the future.
Projects
None yet
Development

No branches or pull requests

4 participants