# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
import asyncio
import pytest
import dns.asyncbackend
import dns.asyncquery
import dns.message
import dns.query
import dns.tsigkeyring
import dns.versioned
import dns.xfr
# Some tests use a "nano nameserver" for testing. It requires trio
# and threading, so try to import it and if it doesn't work, skip
# those tests.
try:
from .nanonameserver import Server
_nanonameserver_available = True
except ImportError:
_nanonameserver_available = False
class Server(object):
pass
axfr = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN AXFR
;ANSWER
@ 3600 IN SOA foo bar 1 2 3 4 5
@ 3600 IN NS ns1
@ 3600 IN NS ns2
bar.foo 300 IN MX 0 blaz.foo
ns1 3600 IN A 10.0.0.1
ns2 3600 IN A 10.0.0.2
@ 3600 IN SOA foo bar 1 2 3 4 5
'''
axfr1 = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN AXFR
;ANSWER
@ 3600 IN SOA foo bar 1 2 3 4 5
@ 3600 IN NS ns1
@ 3600 IN NS ns2
'''
axfr2 = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;ANSWER
bar.foo 300 IN MX 0 blaz.foo
ns1 3600 IN A 10.0.0.1
ns2 3600 IN A 10.0.0.2
@ 3600 IN SOA foo bar 1 2 3 4 5
'''
base = """@ 3600 IN SOA foo bar 1 2 3 4 5
@ 3600 IN NS ns1
@ 3600 IN NS ns2
bar.foo 300 IN MX 0 blaz.foo
ns1 3600 IN A 10.0.0.1
ns2 3600 IN A 10.0.0.2
"""
axfr_unexpected_origin = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN AXFR
;ANSWER
@ 3600 IN SOA foo bar 1 2 3 4 5
@ 3600 IN SOA foo bar 1 2 3 4 7
'''
ixfr = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN IXFR
;ANSWER
@ 3600 IN SOA foo bar 4 2 3 4 5
@ 3600 IN SOA foo bar 1 2 3 4 5
bar.foo 300 IN MX 0 blaz.foo
ns2 3600 IN A 10.0.0.2
@ 3600 IN SOA foo bar 2 2 3 4 5
ns2 3600 IN A 10.0.0.4
@ 3600 IN SOA foo bar 2 2 3 4 5
@ 3600 IN SOA foo bar 3 2 3 4 5
ns3 3600 IN A 10.0.0.3
@ 3600 IN SOA foo bar 3 2 3 4 5
@ 3600 IN NS ns2
@ 3600 IN SOA foo bar 4 2 3 4 5
@ 3600 IN SOA foo bar 4 2 3 4 5
'''
compressed_ixfr = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN IXFR
;ANSWER
@ 3600 IN SOA foo bar 4 2 3 4 5
@ 3600 IN SOA foo bar 1 2 3 4 5
bar.foo 300 IN MX 0 blaz.foo
ns2 3600 IN A 10.0.0.2
@ 3600 IN NS ns2
@ 3600 IN SOA foo bar 4 2 3 4 5
ns2 3600 IN A 10.0.0.4
ns3 3600 IN A 10.0.0.3
@ 3600 IN SOA foo bar 4 2 3 4 5
'''
ixfr_expected = """@ 3600 IN SOA foo bar 4 2 3 4 5
@ 3600 IN NS ns1
ns1 3600 IN A 10.0.0.1
ns2 3600 IN A 10.0.0.4
ns3 3600 IN A 10.0.0.3
"""
ixfr_first_message = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN IXFR
;ANSWER
@ 3600 IN SOA foo bar 4 2 3 4 5
'''
ixfr_header = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;ANSWER
'''
ixfr_body = [
'@ 3600 IN SOA foo bar 1 2 3 4 5',
'bar.foo 300 IN MX 0 blaz.foo',
'ns2 3600 IN A 10.0.0.2',
'@ 3600 IN SOA foo bar 2 2 3 4 5',
'ns2 3600 IN A 10.0.0.4',
'@ 3600 IN SOA foo bar 2 2 3 4 5',
'@ 3600 IN SOA foo bar 3 2 3 4 5',
'ns3 3600 IN A 10.0.0.3',
'@ 3600 IN SOA foo bar 3 2 3 4 5',
'@ 3600 IN NS ns2',
'@ 3600 IN SOA foo bar 4 2 3 4 5',
'@ 3600 IN SOA foo bar 4 2 3 4 5',
]
ixfrs = [ixfr_first_message]
ixfrs.extend([ixfr_header + l for l in ixfr_body])
good_empty_ixfr = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN IXFR
;ANSWER
@ 3600 IN SOA foo bar 1 2 3 4 5
'''
retry_tcp_ixfr = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN IXFR
;ANSWER
@ 3600 IN SOA foo bar 5 2 3 4 5
'''
bad_empty_ixfr = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN IXFR
;ANSWER
@ 3600 IN SOA foo bar 4 2 3 4 5
@ 3600 IN SOA foo bar 4 2 3 4 5
'''
unexpected_end_ixfr = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN IXFR
;ANSWER
@ 3600 IN SOA foo bar 4 2 3 4 5
@ 3600 IN SOA foo bar 1 2 3 4 5
bar.foo 300 IN MX 0 blaz.foo
ns2 3600 IN A 10.0.0.2
@ 3600 IN NS ns2
@ 3600 IN SOA foo bar 3 2 3 4 5
ns2 3600 IN A 10.0.0.4
ns3 3600 IN A 10.0.0.3
@ 3600 IN SOA foo bar 4 2 3 4 5
'''
unexpected_end_ixfr_2 = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN IXFR
;ANSWER
@ 3600 IN SOA foo bar 4 2 3 4 5
@ 3600 IN SOA foo bar 1 2 3 4 5
bar.foo 300 IN MX 0 blaz.foo
ns2 3600 IN A 10.0.0.2
@ 3600 IN NS ns2
'''
bad_serial_ixfr = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN IXFR
;ANSWER
@ 3600 IN SOA foo bar 4 2 3 4 5
@ 3600 IN SOA foo bar 2 2 3 4 5
bar.foo 300 IN MX 0 blaz.foo
ns2 3600 IN A 10.0.0.2
@ 3600 IN NS ns2
@ 3600 IN SOA foo bar 4 2 3 4 5
ns2 3600 IN A 10.0.0.4
ns3 3600 IN A 10.0.0.3
@ 3600 IN SOA foo bar 4 2 3 4 5
'''
ixfr_axfr = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN IXFR
;ANSWER
@ 3600 IN SOA foo bar 1 2 3 4 5
@ 3600 IN NS ns1
@ 3600 IN NS ns2
bar.foo 300 IN MX 0 blaz.foo
ns1 3600 IN A 10.0.0.1
ns2 3600 IN A 10.0.0.2
@ 3600 IN SOA foo bar 1 2 3 4 5
'''
def test_basic_axfr():
z = dns.versioned.Zone('example.')
m = dns.message.from_text(axfr, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.AXFR) as xfr:
done = xfr.process_message(m)
assert done
ez = dns.zone.from_text(base, 'example.')
assert z == ez
def test_basic_axfr_unversioned():
z = dns.zone.Zone('example.')
m = dns.message.from_text(axfr, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.AXFR) as xfr:
done = xfr.process_message(m)
assert done
ez = dns.zone.from_text(base, 'example.')
assert z == ez
def test_basic_axfr_two_parts():
z = dns.versioned.Zone('example.')
m1 = dns.message.from_text(axfr1, origin=z.origin,
one_rr_per_rrset=True)
m2 = dns.message.from_text(axfr2, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.AXFR) as xfr:
done = xfr.process_message(m1)
assert not done
done = xfr.process_message(m2)
assert done
ez = dns.zone.from_text(base, 'example.')
assert z == ez
def test_axfr_unexpected_origin():
z = dns.versioned.Zone('example.')
m = dns.message.from_text(axfr_unexpected_origin, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.AXFR) as xfr:
with pytest.raises(dns.exception.FormError):
xfr.process_message(m)
def test_basic_ixfr():
z = dns.zone.from_text(base, 'example.',
zone_factory=dns.versioned.Zone)
m = dns.message.from_text(ixfr, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
done = xfr.process_message(m)
assert done
ez = dns.zone.from_text(ixfr_expected, 'example.')
assert z == ez
def test_basic_ixfr_unversioned():
z = dns.zone.from_text(base, 'example.')
m = dns.message.from_text(ixfr, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
done = xfr.process_message(m)
assert done
ez = dns.zone.from_text(ixfr_expected, 'example.')
assert z == ez
def test_compressed_ixfr():
z = dns.zone.from_text(base, 'example.',
zone_factory=dns.versioned.Zone)
m = dns.message.from_text(compressed_ixfr, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
done = xfr.process_message(m)
assert done
ez = dns.zone.from_text(ixfr_expected, 'example.')
assert z == ez
def test_basic_ixfr_many_parts():
z = dns.zone.from_text(base, 'example.',
zone_factory=dns.versioned.Zone)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
done = False
for text in ixfrs:
assert not done
m = dns.message.from_text(text, origin=z.origin,
one_rr_per_rrset=True)
done = xfr.process_message(m)
assert done
ez = dns.zone.from_text(ixfr_expected, 'example.')
assert z == ez
def test_good_empty_ixfr():
z = dns.zone.from_text(ixfr_expected, 'example.',
zone_factory=dns.versioned.Zone)
m = dns.message.from_text(good_empty_ixfr, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
done = xfr.process_message(m)
assert done
ez = dns.zone.from_text(ixfr_expected, 'example.')
assert z == ez
def test_retry_tcp_ixfr():
z = dns.zone.from_text(ixfr_expected, 'example.',
zone_factory=dns.versioned.Zone)
m = dns.message.from_text(retry_tcp_ixfr, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1, is_udp=True) as xfr:
with pytest.raises(dns.xfr.UseTCP):
xfr.process_message(m)
def test_bad_empty_ixfr():
z = dns.zone.from_text(ixfr_expected, 'example.',
zone_factory=dns.versioned.Zone)
m = dns.message.from_text(bad_empty_ixfr, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=3) as xfr:
with pytest.raises(dns.exception.FormError):
xfr.process_message(m)
def test_serial_went_backwards_ixfr():
z = dns.zone.from_text(ixfr_expected, 'example.',
zone_factory=dns.versioned.Zone)
m = dns.message.from_text(bad_empty_ixfr, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=5) as xfr:
with pytest.raises(dns.xfr.SerialWentBackwards):
xfr.process_message(m)
def test_ixfr_is_axfr():
z = dns.zone.from_text(base, 'example.',
zone_factory=dns.versioned.Zone)
m = dns.message.from_text(ixfr_axfr, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=0xffffffff) as xfr:
done = xfr.process_message(m)
assert done
ez = dns.zone.from_text(base, 'example.')
assert z == ez
def test_ixfr_requires_serial():
z = dns.zone.from_text(base, 'example.',
zone_factory=dns.versioned.Zone)
with pytest.raises(ValueError):
dns.xfr.Inbound(z, dns.rdatatype.IXFR)
def test_ixfr_unexpected_end_bad_diff_sequence():
# This is where we get the end serial, but haven't seen all of
# the expected diffs
z = dns.zone.from_text(base, 'example.',
zone_factory=dns.versioned.Zone)
m = dns.message.from_text(unexpected_end_ixfr, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
with pytest.raises(dns.exception.FormError):
xfr.process_message(m)
def test_udp_ixfr_unexpected_end_just_stops():
# This is where everything looks good, but the IXFR just stops
# in the middle.
z = dns.zone.from_text(base, 'example.',
zone_factory=dns.versioned.Zone)
m = dns.message.from_text(unexpected_end_ixfr_2, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1, is_udp=True) as xfr:
with pytest.raises(dns.exception.FormError):
xfr.process_message(m)
def test_ixfr_bad_serial():
z = dns.zone.from_text(base, 'example.',
zone_factory=dns.versioned.Zone)
m = dns.message.from_text(bad_serial_ixfr, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
with pytest.raises(dns.exception.FormError):
xfr.process_message(m)
def test_no_udp_with_axfr():
z = dns.versioned.Zone('example.')
with pytest.raises(ValueError):
with dns.xfr.Inbound(z, dns.rdatatype.AXFR, is_udp=True) as xfr:
pass
refused = '''id 1
opcode QUERY
rcode REFUSED
flags AA
;QUESTION
example. IN AXFR
'''
bad_qname = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
not-example. IN IXFR
'''
bad_qtype = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN AXFR
'''
soa_not_first = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN IXFR
;ANSWER
bar.foo 300 IN MX 0 blaz.foo
'''
soa_not_first_2 = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN IXFR
;ANSWER
@ 300 IN MX 0 blaz.foo
'''
no_answer = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN IXFR
;ADDITIONAL
bar.foo 300 IN MX 0 blaz.foo
'''
axfr_answers_after_final_soa = '''id 1
opcode QUERY
rcode NOERROR
flags AA
;QUESTION
example. IN AXFR
;ANSWER
@ 3600 IN SOA foo bar 1 2 3 4 5
@ 3600 IN NS ns1
@ 3600 IN NS ns2
bar.foo 300 IN MX 0 blaz.foo
ns1 3600 IN A 10.0.0.1
ns2 3600 IN A 10.0.0.2
@ 3600 IN SOA foo bar 1 2 3 4 5
ns3 3600 IN A 10.0.0.3
'''
def test_refused():
z = dns.zone.from_text(base, 'example.',
zone_factory=dns.versioned.Zone)
m = dns.message.from_text(refused, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
with pytest.raises(dns.xfr.TransferError):
xfr.process_message(m)
def test_bad_qname():
z = dns.zone.from_text(base, 'example.',
zone_factory=dns.versioned.Zone)
m = dns.message.from_text(bad_qname, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
with pytest.raises(dns.exception.FormError):
xfr.process_message(m)
def test_bad_qtype():
z = dns.zone.from_text(base, 'example.',
zone_factory=dns.versioned.Zone)
m = dns.message.from_text(bad_qtype, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
with pytest.raises(dns.exception.FormError):
xfr.process_message(m)
def test_soa_not_first():
z = dns.zone.from_text(base, 'example.',
zone_factory=dns.versioned.Zone)
m = dns.message.from_text(soa_not_first, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
with pytest.raises(dns.exception.FormError):
xfr.process_message(m)
m = dns.message.from_text(soa_not_first_2, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
with pytest.raises(dns.exception.FormError):
xfr.process_message(m)
def test_no_answer():
z = dns.zone.from_text(base, 'example.',
zone_factory=dns.versioned.Zone)
m = dns.message.from_text(no_answer, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.IXFR, serial=1) as xfr:
with pytest.raises(dns.exception.FormError):
xfr.process_message(m)
def test_axfr_answers_after_final_soa():
z = dns.versioned.Zone('example.')
m = dns.message.from_text(axfr_answers_after_final_soa, origin=z.origin,
one_rr_per_rrset=True)
with dns.xfr.Inbound(z, dns.rdatatype.AXFR) as xfr:
with pytest.raises(dns.exception.FormError):
xfr.process_message(m)
keyring = dns.tsigkeyring.from_text(
{
'keyname.': 'NjHwPsMKjdN++dOfE5iAiQ=='
}
)
keyname = dns.name.from_text('keyname')
def test_make_query_basic():
z = dns.versioned.Zone('example.')
(q, s) = dns.xfr.make_query(z)
assert q.question[0].rdtype == dns.rdatatype.AXFR
assert s is None
(q, s) = dns.xfr.make_query(z, serial=None)
assert q.question[0].rdtype == dns.rdatatype.AXFR
assert s is None
(q, s) = dns.xfr.make_query(z, serial=10)
assert q.question[0].rdtype == dns.rdatatype.IXFR
assert q.authority[0].rdtype == dns.rdatatype.SOA
assert q.authority[0][0].serial == 10
assert s == 10
with z.writer() as txn:
txn.add('@', 300, dns.rdata.from_text('in', 'soa', '. . 1 2 3 4 5'))
(q, s) = dns.xfr.make_query(z)
assert q.question[0].rdtype == dns.rdatatype.IXFR
assert q.authority[0].rdtype == dns.rdatatype.SOA
assert q.authority[0][0].serial == 1
assert s == 1
(q, s) = dns.xfr.make_query(z, keyring=keyring, keyname=keyname)
assert q.question[0].rdtype == dns.rdatatype.IXFR
assert q.authority[0].rdtype == dns.rdatatype.SOA
assert q.authority[0][0].serial == 1
assert s == 1
assert q.keyname == keyname
def test_make_query_bad_serial():
z = dns.versioned.Zone('example.')
with pytest.raises(ValueError):
dns.xfr.make_query(z, serial='hi')
with pytest.raises(ValueError):
dns.xfr.make_query(z, serial=-1)
with pytest.raises(ValueError):
dns.xfr.make_query(z, serial=4294967296)
def test_extract_serial_from_query():
z = dns.versioned.Zone('example.')
(q, s) = dns.xfr.make_query(z)
xs = dns.xfr.extract_serial_from_query(q)
assert s is None
assert s == xs
(q, s) = dns.xfr.make_query(z, serial=10)
xs = dns.xfr.extract_serial_from_query(q)
assert s == 10
assert s == xs
q = dns.message.make_query('example', 'a')
with pytest.raises(ValueError):
dns.xfr.extract_serial_from_query(q)
class XFRNanoNameserver(Server):
def __init__(self):
super().__init__(origin=dns.name.from_text('example'))
def handle(self, request):
try:
if request.message.question[0].rdtype == dns.rdatatype.IXFR:
text = ixfr
else:
text = axfr
r = dns.message.from_text(text, one_rr_per_rrset=True,
origin=self.origin)
r.id = request.message.id
return r
except Exception:
pass
@pytest.mark.skipif(not _nanonameserver_available,
reason="requires nanonameserver")
def test_sync_inbound_xfr():
with XFRNanoNameserver() as ns:
zone = dns.versioned.Zone('example')
dns.query.inbound_xfr(ns.tcp_address[0], zone, port=ns.tcp_address[1],
udp_mode=dns.query.UDPMode.TRY_FIRST)
dns.query.inbound_xfr(ns.tcp_address[0], zone, port=ns.tcp_address[1],
udp_mode=dns.query.UDPMode.TRY_FIRST)
expected = dns.zone.from_text(ixfr_expected, 'example')
assert zone == expected
async def async_inbound_xfr():
with XFRNanoNameserver() as ns:
zone = dns.versioned.Zone('example')
await dns.asyncquery.inbound_xfr(ns.tcp_address[0], zone,
port=ns.tcp_address[1],
udp_mode=dns.query.UDPMode.TRY_FIRST)
await dns.asyncquery.inbound_xfr(ns.tcp_address[0], zone,
port=ns.tcp_address[1],
udp_mode=dns.query.UDPMode.TRY_FIRST)
expected = dns.zone.from_text(ixfr_expected, 'example')
assert zone == expected
@pytest.mark.skipif(not _nanonameserver_available,
reason="requires nanonameserver")
def test_asyncio_inbound_xfr():
dns.asyncbackend.set_default_backend('asyncio')
async def run():
await async_inbound_xfr()
try:
runner = asyncio.run
except AttributeError:
# this is only needed for 3.6
def old_runner(awaitable):
loop = asyncio.get_event_loop()
return loop.run_until_complete(awaitable)
runner = old_runner
runner(run())
#
# We don't need to do this as it's all generic code, but
# just for extra caution we do it for each backend.
#
try:
import trio
@pytest.mark.skipif(not _nanonameserver_available,
reason="requires nanonameserver")
def test_trio_inbound_xfr():
dns.asyncbackend.set_default_backend('trio')
async def run():
await async_inbound_xfr()
trio.run(run)
except ImportError:
pass
try:
import curio
@pytest.mark.skipif(not _nanonameserver_available,
reason="requires nanonameserver")
def test_curio_inbound_xfr():
dns.asyncbackend.set_default_backend('curio')
async def run():
await async_inbound_xfr()
curio.run(run)
except ImportError:
pass
class UDPXFRNanoNameserver(Server):
def __init__(self):
super().__init__(origin=dns.name.from_text('example'))
self.did_truncation = False
def handle(self, request):
try:
if request.message.question[0].rdtype == dns.rdatatype.IXFR:
if self.did_truncation:
text = ixfr
else:
text = retry_tcp_ixfr
self.did_truncation = True
else:
text = axfr
r = dns.message.from_text(text, one_rr_per_rrset=True,
origin=self.origin)
r.id = request.message.id
return r
except Exception:
pass
@pytest.mark.skipif(not _nanonameserver_available,
reason="requires nanonameserver")
def test_sync_retry_tcp_inbound_xfr():
with UDPXFRNanoNameserver() as ns:
zone = dns.versioned.Zone('example')
dns.query.inbound_xfr(ns.tcp_address[0], zone, port=ns.tcp_address[1],
udp_mode=dns.query.UDPMode.TRY_FIRST)
dns.query.inbound_xfr(ns.tcp_address[0], zone, port=ns.tcp_address[1],
udp_mode=dns.query.UDPMode.TRY_FIRST)
expected = dns.zone.from_text(ixfr_expected, 'example')
assert zone == expected
async def udp_async_inbound_xfr():
with UDPXFRNanoNameserver() as ns:
zone = dns.versioned.Zone('example')
await dns.asyncquery.inbound_xfr(ns.tcp_address[0], zone,
port=ns.tcp_address[1],
udp_mode=dns.query.UDPMode.TRY_FIRST)
await dns.asyncquery.inbound_xfr(ns.tcp_address[0], zone,
port=ns.tcp_address[1],
udp_mode=dns.query.UDPMode.TRY_FIRST)
expected = dns.zone.from_text(ixfr_expected, 'example')
assert zone == expected
@pytest.mark.skipif(not _nanonameserver_available,
reason="requires nanonameserver")
def test_asyncio_retry_tcp_inbound_xfr():
dns.asyncbackend.set_default_backend('asyncio')
async def run():
await udp_async_inbound_xfr()
try:
runner = asyncio.run
except AttributeError:
def old_runner(awaitable):
loop = asyncio.get_event_loop()
return loop.run_until_complete(awaitable)
runner = old_runner
runner(run())