Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

sending DNS queries asynchronously with asyncio and dnspython

Say I want to send various types of DNS queries (A, AAAA, NS, SOA, DNSKEY, NSEC3, DS, etc) for Alexa top 1M sites using dnspython.

Doing this one by one would take a bit long, since I'm sending multiple queries for each single site. So I wish to perform some parallelism using asyncio in python 3.

I went through David's generator/coroutine trilogy, http://www.dabeaz.com/talks.html, but I'm still not sure how to finish my simple task...

More specifically,

results = dns.resolver.query('google.com','AAAA')

is a blocking function call, waiting for the DNS reply to come back.

How can I send other queries during this waiting time without using Threads? Since DNS queries are usually udp packets, I thought asyncio might help.

pycares seems not supporting all record types I need, so pycares and aiodns do not work for my case.

Any references and ideas would be helpful.

like image 735
Eniaczz Avatar asked Nov 28 '25 01:11

Eniaczz


1 Answers

As of recently, DNSPython now has native AsyncIO support, though the documentation is a little lacking.

Nevertheless, it's now possible to query using dnspython without needing hacky threadpool solutions.

AsyncIO DNSPython Example

Below is a simple example of using dnspython's AsyncIO Resolver class using a wrapper function, and asyncio.gather for bulk querying efficiently:

from dns.asyncresolver import Resolver
import dns.resolver
import dns.rrset
import asyncio
from typing import Tuple


async def dns_query(domain: str, rtype: str = 'A', **kwargs) -> dns.rrset.RRset:
    kwargs, res_cfg = dict(kwargs), {}
    # extract 'filename' and 'configure' from kwargs if they're present
    # to be passed to Resolver. we pop them to avoid conflicts passing kwargs
    # to .resolve().
    if 'filename' in kwargs: res_cfg['filename'] = kwargs.pop('filename')
    if 'configure' in kwargs: res_cfg['configure'] = kwargs.pop('configure')

    # create an asyncio Resolver instance
    rs = Resolver(**res_cfg)

    # call and asynchronously await .resolve() to obtain the DNS results
    res: dns.resolver.Answer = await rs.resolve(domain, rdtype=rtype, **kwargs)

    # we return the most useful part of Answer: the RRset, which contains
    # the individual records that were found.
    return res.rrset


async def dns_bulk(*queries: Tuple[str, str], **kwargs):
    ret_ex = kwargs.pop('return_exceptions', True)

    # Iterate over the queries and call (but don't await) the dns_query coroutine
    # with each query.
    # Without 'await', they won't properly execute until we await the coroutines
    # either individually, or in bulk using asyncio.gather
    coros = [dns_query(dom, rt, **kwargs) for dom, rt in list(queries)]

    # using asyncio.gather, we can effectively run all of the coroutines
    # in 'coros' at the same time, instead of awaiting them one-by-one.
    #
    # return_exceptions controls whether gather() should immediately
    # fail and re-raise as soon as it detects an exception,
    # or whether it should just capture any exceptions, and simply
    # return them within the results.
    #
    # in this example function, return_exceptions is set to True,
    # which means if one or more of the queries fail, it'll simply
    # store the exceptions and continue running the remaining coros,
    # and return the exceptions inside of the tuple/list of results.
    return await asyncio.gather(*coros, return_exceptions=ret_ex)


async def main():
    queries = [
        ('privex.io', 'AAAA'),
        ('privex.io', 'TXT'),
        ('google.com', 'A'),
        ('google.com', 'AAAA'),
        ('examplesitedoesnotexist.test', 'A'),
    ]
    print(f"\n [...] Sending {len(queries)} bulk queries\n")
    res = await dns_bulk(*queries)
    print(f"\n [+++] Got {len(res)} results! :)\n\n")

    for i, a in enumerate(res):
        print("\n------------------------------------------------------------\n")
        if isinstance(a, Exception):
            print(f" [!!!] Error: Result {i} is an exception! Original query: {queries[i]} || Exception is: {type(a)} - {a!s} \n")
            continue
        print(f" [+++] Got result for query {i} ( {queries[i]} )\n")
        print(f"  >>>  Representation: {a!r}")
        print(f"  >>>  As string:")
        print(f"    {a!s}")
        print()
    print("\n------------------------------------------------------------\n")

asyncio.run(main())

Here's what the output looks like when you run the above script:


 [...] Sending 5 bulk queries


 [+++] Got 5 results! :)



------------------------------------------------------------

 [+++] Got result for query 0 ( ('privex.io', 'AAAA') )

  >>>  Representation: <DNS privex.io. IN AAAA RRset: [<2a07:e00::abc>]>
  >>>  As string:
    privex.io. 221 IN AAAA 2a07:e00::abc


------------------------------------------------------------

 [+++] Got result for query 1 ( ('privex.io', 'TXT') )

  >>>  Representation: <DNS privex.io. IN TXT RRset: [<"v=spf1 include:spf.messagingengine.com include:smtp.privex.io -all">, <"google-site-verification=_0OlLdacq3GAc4NkhOd0pBcLsNya3KApS0iAc6MtbYU">]>
  >>>  As string:
    privex.io. 300 IN TXT "v=spf1 include:spf.messagingengine.com include:smtp.privex.io -all"
privex.io. 300 IN TXT "google-site-verification=_0OlLdacq3GAc4NkhOd0pBcLsNya3KApS0iAc6MtbYU"


------------------------------------------------------------

 [+++] Got result for query 2 ( ('google.com', 'A') )

  >>>  Representation: <DNS google.com. IN A RRset: [<216.58.205.46>]>
  >>>  As string:
    google.com. 143 IN A 216.58.205.46


------------------------------------------------------------

 [+++] Got result for query 3 ( ('google.com', 'AAAA') )

  >>>  Representation: <DNS google.com. IN AAAA RRset: [<2a00:1450:4009:80f::200e>]>
  >>>  As string:
    google.com. 221 IN AAAA 2a00:1450:4009:80f::200e


------------------------------------------------------------

 [!!!] Error: Result 4 is an exception! Original query: ('examplesitedoesnotexist.test', 'A') || Exception is: <class 'dns.resolver.NXDOMAIN'> - The DNS query name does not exist: examplesitedoesnotexist.test. 


------------------------------------------------------------

Background tasks using AsyncIO

Assuming your application is purely AsyncIO, then it's possible to run coroutines in the background, without needing threads:

import asyncio

async def hello():
    for i in range(10):
        print("hello world")
        await asyncio.sleep(2.0)

async def lorem():
    for i in range(20):
        print("lorem ipsum dolor")
        await asyncio.sleep(1.0)


async def my_app():
    print(" [...] creating tsk_hello and tsk_lorem")
    tsk_hello = asyncio.create_task(hello())
    tsk_lorem = asyncio.create_task(lorem())

    # let them both run for 5 seconds
    print(" [...] waiting 5 secs")
    await asyncio.sleep(5.0)

    # now, assuming you wanted to cancel a looping task before it's finished
    # (or tasks that are endless 'while True' loops)
    # we can use the tsk_x task objects to ask them to stop immediately.
    print(" [...] stopping tsk_hello")
    tsk_hello.cancel()
    print(" [...] waiting 4 secs")
    await asyncio.sleep(4.0)
    print(" [...] stopping tsk_lorem")
    tsk_lorem.cancel()

asyncio.run(my_app())

If you run the above example code for AsyncIO background tasks, the output will look like this, showing that both lorem and hello are able to run side-by-side, along with the main entrypoint function:

 [...] waiting 5 secs
hello world
lorem ipsum dolor
lorem ipsum dolor
hello world
lorem ipsum dolor
lorem ipsum dolor
hello world
lorem ipsum dolor
 [...] stopping tsk_hello
 [...] waiting 4 secs
lorem ipsum dolor
lorem ipsum dolor
lorem ipsum dolor
lorem ipsum dolor
 [...] stopping tsk_lorem
like image 99
Someguy123 Avatar answered Nov 29 '25 15:11

Someguy123