Codebase list dnsviz / ddf89b94-ddfd-409c-85ed-27f58d40e7d7/upstream/sid
Import upstream version 0.9.3+git20210914.1.881485e Debian Janitor 2 years ago
32 changed file(s) with 2626 addition(s) and 2575 deletion(s). Raw diff Collapse all Expand all
2323 The remainer of this section covers other methods of installation, including a
2424 list of [dependencies](#dependencies), installation to a
2525 [virtual environment](#installation-in-a-virtual-environment), and installation
26 on [Fedora](#fedora-rpm-build-and-install) and
27 [RHEL7](#rhel7-rpm-build-and-install).
26 on [Fedora, RHEL 8, CentOS 8,](#fedora--rhel-8--centos-8-rpm-build-and-install) and
27 [RHEL 7](#rhel-7-rpm-build-and-install).
2828
2929 Instructions for running in a Docker container are also available
3030 [later in this document](#docker-container).
9393 ```
9494
9595
96 ### Fedora RPM Build and Install
96 ### Fedora / RHEL 8 / CentOS 8 RPM Build and Install
97
98 *RHEL 8 only*: Enable CodeReady Linux Builder by following the instructions [here](https://access.redhat.com/articles/4348511).
99
100 *CentOS 8 only*: Enable PowerTools and EPEL with the following two commands:
101 ```
102 $ sudo dnf config-manager --set-enabled powertools
103 $ sudo dnf install epel-release
104 ```
105
106 The remaining instructions are for Fedora, RHEL 8, and CentOS 8.
97107
98108 Install the tools for building an RPM, and set up the rpmbuild tree.
99109 ```
114124 ```
115125 $ sudo dnf install python3-dns python3-pygraphviz python3-m2crypto
116126 ```
117 (Note that as of Fedora 33, the latest version of M2Crypto is 0.35.2. If you
127 (Note that as of Fedora 33 / RHEL 8 / CentOS 8, the latest version of M2Crypto is 0.35.2. If you
118128 would like support for DNSSEC algorithms 15 (Ed25519) and 16 (Ed448), you will
119129 need to install M2Crypto using `pip3`. For example, see [installation to a
120130 virtual environment](#installation-in-a-virtual-environment).)
126136 ```
127137
128138
129 ### RHEL7 RPM Build and Install
139 ### RHEL 7 RPM Build and Install
130140
131141 Install pygraphviz, M2Crypto, and dnspython, after installing their build dependencies.
132142 ```
16281628 required_params = []
16291629
16301630 class MissingRRSIGForAlg(ResponseError):
1631 description_template = 'The %(source)s RRset for the zone included algorithm %(algorithm)s (%(algorithm_text)s), but no RRSIG with algorithm %(algorithm)d covering the RRset was returned in the response.'
1631 description_template = 'The %(source)s RRset for the zone included algorithm %(algorithm)d (%(algorithm_text)s), but no RRSIG with algorithm %(algorithm)d covering the RRset was returned in the response.'
16321632 references = ['RFC 4035, Sec. 2.2', 'RFC 6840, Sec. 5.11']
16331633 required_params = ['algorithm']
16341634 source = None
19021902
19031903 _abstract = False
19041904 code = 'MISSING_SEP_FOR_ALG'
1905 description_template = "The %(source)s RRset for the zone included algorithm %(algorithm)s (%(algorithm_text)s), but no %(source)s RR matched a DNSKEY with algorithm %(algorithm)d that signs the zone's DNSKEY RRset."
1905 description_template = "The %(source)s RRset for the zone included algorithm %(algorithm)d (%(algorithm_text)s), but no %(source)s RR matched a DNSKEY with algorithm %(algorithm)d that signs the zone's DNSKEY RRset."
19061906 references = ['RFC 4035, Sec. 2.2', 'RFC 6840, Sec. 5.11']
19071907 required_params = ['algorithm']
19081908
15741574 rrset_info.rrset.rdtype != dns.rdatatype.DS and \
15751575 rrsig_status.dnskey is not None:
15761576 if rrset_info.rrset.rdtype == dns.rdatatype.DNSKEY:
1577 self.ksks.add(rrsig_status.dnskey)
1577 if self.ksks is not None:
1578 self.ksks.add(rrsig_status.dnskey)
15781579 else:
1579 self.zsks.add(rrsig_status.dnskey)
1580 if self.zsks is not None:
1581 self.zsks.add(rrsig_status.dnskey)
15801582
15811583 key = rrsig_status.rrset, rrsig_status.rrsig
15821584 break
16841686 self.response_errors = {}
16851687 self.response_warnings = {}
16861688
1687 if self.is_zone():
1689 if (self.name, dns.rdatatype.DNSKEY) in self.queries:
16881690 self.zsks = set()
16891691 self.ksks = set()
16901692
17131715 self._populate_invalid_response_status(query)
17141716
17151717 def _finalize_key_roles(self):
1716 if self.is_zone():
1718 if (self.name, dns.rdatatype.DNSKEY) in self.queries:
17171719 self.published_keys = set(self.get_dnskeys()).difference(self.zsks.union(self.ksks))
17181720 self.revoked_keys = set([x for x in self.get_dnskeys() if x.rdata.flags & fmt.DNSKEY_FLAGS['revoke']])
17191721
10301030 for cname in self.cname_targets:
10311031 for target in self.cname_targets[cname]:
10321032 self.cname_targets[cname][target] = self.__class__.deserialize(target, d, cache=cache)
1033 # these are optional
10331034 for signer in self.external_signers:
1034 self.external_signers[signer] = self.__class__.deserialize(signer, d, cache=cache)
1035
1036 # these two are optional
1035 if lb2s(signer.canonicalize().to_text()) in d:
1036 self.external_signers[signer] = self.__class__.deserialize(signer, d, cache=cache)
10371037 for target in self.ns_dependencies:
10381038 if lb2s(target.canonicalize().to_text()) in d:
10391039 self.ns_dependencies[target] = self.__class__.deserialize(target, d, cache=cache)
17461746 if self.dns_cookies:
17471747 self.logger.debug('Preparing DNS cookie diagnostic query %s/%s...' % (fmt.humanize_name(name_obj.name), dns.rdatatype.to_text(dns.rdatatype.SOA)))
17481748 queries[(name_obj.name, -(dns.rdatatype.SOA+104))] = self.diagnostic_query_bad_server_cookie(name_obj.name, dns.rdatatype.SOA, self.rdclass, servers, bailiwick, self.client_ipv4, self.client_ipv6, odd_ports=odd_ports, cookie_bad=COOKIE_BAD)
1749
1750 # NSEC3PARAM
1751 self.logger.debug('Preparing query %s/%s...' % (fmt.humanize_name(name_obj.name), dns.rdatatype.to_text(dns.rdatatype.NSEC3PARAM)))
1752 queries[(name_obj.name, dns.rdatatype.NSEC3PARAM)] = self.diagnostic_query(name_obj.name, dns.rdatatype.NSEC3PARAM, self.rdclass, servers, bailiwick, self.client_ipv4, self.client_ipv6, odd_ports=odd_ports, cookie_bad=COOKIE_STANDIN)
17491753
17501754 # negative queries for all zones
17511755 self._set_negative_queries(name_obj)
278278 if self.validation_status == RRSIG_STATUS_VALID:
279279 self.validation_status = RRSIG_STATUS_EXPIRED
280280 self.errors.append(Errors.ExpirationInPast(expiration=fmt.timestamp_to_datetime(self.rrsig.expiration), reference_time=fmt.timestamp_to_datetime(self.reference_ts)))
281 elif self.reference_ts + min_ttl >= self.rrsig.expiration:
281 elif self.reference_ts + min_ttl > self.rrsig.expiration:
282282 self.errors.append(Errors.TTLBeyondExpiration(expiration=fmt.timestamp_to_datetime(self.rrsig.expiration), rrsig_ttl=min_ttl, reference_time=fmt.timestamp_to_datetime(self.reference_ts)))
283283 elif self.reference_ts + CLOCK_SKEW_WARNING >= self.rrsig.expiration:
284284 self.warnings.append(Errors.ExpirationWithinClockSkew(expiration=fmt.timestamp_to_datetime(self.rrsig.expiration), reference_time=fmt.timestamp_to_datetime(self.reference_ts)))
4343 from dnsviz.analysis import OfflineDomainNameAnalysis, DNS_RAW_VERSION
4444 from dnsviz.config import DNSVIZ_SHARE_PATH, JQUERY_PATH, JQUERY_UI_PATH, JQUERY_UI_CSS_PATH, RAPHAEL_PATH
4545 from dnsviz.format import latin1_binary_to_string as lb2s
46 from dnsviz.util import get_trusted_keys, get_default_trusted_keys
46 from dnsviz.util import get_trusted_keys, get_default_trusted_keys, io_try_buffered
4747
4848 # If the import of DNSAuthGraph fails because of the lack of pygraphviz, it
4949 # will be reported later
153153 self.parser = argparse.ArgumentParser(description='Graph the assessment of diagnostic DNS queries', prog=prog)
154154
155155 # python3/python2 dual compatibility
156 stdin_buffer = io.open(sys.stdin.fileno(), 'rb', closefd=False)
157 stdout_buffer = io.open(sys.stdout.fileno(), 'wb', closefd=False)
156 stdin_buffer = io_try_buffered(sys.stdin, 'rb', closefd=False)
157 stdout_buffer = io_try_buffered(sys.stdout, 'wb', closefd=False)
158158
159159 try:
160160 self.parser.add_argument('-f', '--names-file',
4242
4343 from dnsviz.analysis import OfflineDomainNameAnalysis, DNS_RAW_VERSION
4444 from dnsviz.format import latin1_binary_to_string as lb2s
45 from dnsviz.util import get_trusted_keys
45 from dnsviz.util import get_trusted_keys, io_try_buffered
4646
4747 # If the import of DNSAuthGraph fails because of the lack of pygraphviz, it
4848 # will be reported later
169169 self.parser = argparse.ArgumentParser(description='Assess diagnostic DNS queries', prog=prog)
170170
171171 # python3/python2 dual compatibility
172 stdin_buffer = io.open(sys.stdin.fileno(), 'rb', closefd=False)
173 stdout_buffer = io.open(sys.stdout.fileno(), 'wb', closefd=False)
172 stdin_buffer = io_try_buffered(sys.stdin, 'rb', closefd=False)
173 stdout_buffer = io_try_buffered(sys.stdout, 'wb', closefd=False)
174174
175175 try:
176176 self.parser.add_argument('-f', '--names-file',
4242
4343 from dnsviz.analysis import TTLAgnosticOfflineDomainNameAnalysis, DNS_RAW_VERSION
4444 from dnsviz.format import latin1_binary_to_string as lb2s
45 from dnsviz.util import get_trusted_keys, get_default_trusted_keys
45 from dnsviz.util import get_trusted_keys, get_default_trusted_keys, io_try_buffered
4646
4747 # If the import of DNSAuthGraph fails because of the lack of pygraphviz, it
4848 # will be reported later
311311 self.parser = argparse.ArgumentParser(description='Print the assessment of diagnostic DNS queries', prog=prog)
312312
313313 # python3/python2 dual compatibility
314 stdin_buffer = io.open(sys.stdin.fileno(), 'rb', closefd=False)
315 stdout_buffer = io.open(sys.stdout.fileno(), 'wb', closefd=False)
314 stdin_buffer = io_try_buffered(sys.stdin, 'rb', closefd=False)
315 stdout_buffer = io_try_buffered(sys.stdout, 'wb', closefd=False)
316316
317317 try:
318318 self.parser.add_argument('-f', '--names-file',
118118 global tm
119119 if tm is not None:
120120 tm.close()
121 tm = None
121122
122123 def _init_stub_resolver():
123124 global resolver
490491 _allow_stop_at = None
491492 _handle_file_arg = None
492493
493 def __init__(self, domain, stop_at, resolver):
494 _resolvers_initialized = False
495 _stub_resolver = None
496 _full_resolver = None
497
498 def __init__(self, domain, stop_at):
494499 if not (self._allow_file is not None and \
495500 self._allow_name_only is not None and \
496501 self._allow_addr_only is not None and \
502507 raise argparse.ArgumentTypeError('The "+" may not be specified with this option')
503508
504509 self.domain = domain
505 self._resolver = resolver
506510 self._nsi = 1
507511
508512 self.delegation_mapping = {}
511515 self.filename = None
512516
513517 self.delegation_mapping[(self.domain, dns.rdatatype.NS)] = dns.rrset.RRset(self.domain, dns.rdataclass.IN, dns.rdatatype.NS)
518
519 @classmethod
520 def init_resolvers(cls):
521 if not NameServerMappingsForDomain._resolvers_initialized:
522 tm = transport.DNSQueryTransportManager()
523 try:
524 NameServerMappingsForDomain._stub_resolver = Resolver.from_file(RESOLV_CONF, StandardRecursiveQueryCD, transport_manager=tm)
525 except ResolvConfError:
526 pass
527 NameServerMappingsForDomain._full_resolver = PrivateFullResolver(transport_manager=tm)
528 NameServerMappingsForDomain._resolvers_initialized = True
529
530 @classmethod
531 def cleanup_resolvers(cls):
532 NameServerMappingsForDomain._stub_resolver = None
533 NameServerMappingsForDomain._full_resolver = None
534 NameServerMappingsForDomain._resolvers_initialized = False
514535
515536 @classmethod
516537 def _strip_port(cls, s):
539560 self._handle_name_addr_mapping(name_addr)
540561
541562 def _handle_name_no_addr(self, name, port):
563 resolver = None
564 self.init_resolvers()
565 if self._stub_resolver is not None:
566 resolver = self._stub_resolver
567 else:
568 resolver = self._full_resolver
542569 query_tuples = ((name, dns.rdatatype.A, dns.rdataclass.IN), (name, dns.rdatatype.AAAA, dns.rdataclass.IN))
543 answer_map = self._resolver.query_multiple_for_answer(*query_tuples)
570 answer_map = resolver.query_multiple_for_answer(*query_tuples)
544571 found_answer = False
545572 for (n, rdtype, rdclass) in answer_map:
546573 a = answer_map[(n, rdtype, rdclass)]
761788 _handle_file_arg = None
762789
763790 class DSForDomain:
764 def __init__(self, domain, stop_at, resolver):
791 def __init__(self, domain, stop_at):
765792 self.domain = domain
766793
767794 if stop_at and not self._allow_stop_at:
816843 class DomainListArgHelper:
817844 STOP_RE = re.compile(r'^(.*)\+$')
818845
819 def __init__(self, resolver):
820 self._resolver = resolver
821
822846 @classmethod
823847 def _strip_stop_marker(cls, s):
824848 match = cls.STOP_RE.search(s)
854878 if list_arg is not None:
855879 list_arg = list_arg.strip()
856880
857 obj = cls(domain, stop_at, self._resolver)
881 obj = cls(domain, stop_at)
858882
859883 if list_arg:
860884 obj.handle_list_arg(list_arg)
861885 return obj
862886
863887 def _handle_list_arg(self, cls, list_arg):
864 obj = cls(WILDCARD_EXPLICIT_DELEGATION, False, self._resolver)
888 obj = cls(WILDCARD_EXPLICIT_DELEGATION, False)
865889 obj.handle_list_arg(list_arg)
866890 return obj
867891
880904 class ArgHelper:
881905 BRACKETS_RE = re.compile(r'^\[(.*)\]$')
882906
883 def __init__(self, resolver, logger):
884 self._resolver = resolver
907 def __init__(self, logger):
885908 self.parser = None
886909
887910 self.odd_ports = {}
905928 self.args = None
906929 self._arg_mapping = None
907930
908 self._resolver = resolver
909931 self._logger = logger
910932 self._zones_to_serve = []
911933
912934 def build_parser(self, prog):
913935 self.parser = argparse.ArgumentParser(description='Issue diagnostic DNS queries', prog=prog)
914 helper = DomainListArgHelper(self._resolver)
936 helper = DomainListArgHelper()
915937
916938 # python3/python2 dual compatibility
917939 stdout_buffer = io.open(sys.stdout.fileno(), 'wb', closefd=False)
12361258
12371259 def populate_recursive_servers(self):
12381260 if not self.args.authoritative_analysis and not self.args.recursive_servers:
1261 try:
1262 resolver = Resolver.from_file(RESOLV_CONF, StandardRecursiveQueryCD, transport_manager=tm)
1263 except ResolvConfError:
1264 raise argparse.ArgumentTypeError('If servers are not specified with the %s option, then %s must have valid nameserver entries.\n' % \
1265 (self._arg_mapping['recursive_servers'], RESOLV_CONF))
12391266 if (WILDCARD_EXPLICIT_DELEGATION, dns.rdatatype.NS) not in self.explicit_delegations:
12401267 self.explicit_delegations[(WILDCARD_EXPLICIT_DELEGATION, dns.rdatatype.NS)] = dns.rrset.RRset(WILDCARD_EXPLICIT_DELEGATION, dns.rdataclass.IN, dns.rdatatype.NS)
1241 for i, server in enumerate(self._resolver._servers):
1268 for i, server in enumerate(resolver._servers):
12421269 if IPAddr(server).version == 6:
12431270 rdtype = dns.rdatatype.AAAA
12441271 else:
14461473 zone.serve()
14471474
14481475 def build_helper(logger, cmd, subcmd):
1449 try:
1450 resolver = Resolver.from_file(RESOLV_CONF, StandardRecursiveQueryCD, transport_manager=tm)
1451 except ResolvConfError:
1452 sys.stderr.write('File %s not found or contains no nameserver entries.\n' % RESOLV_CONF)
1453 sys.exit(1)
1454
1455 arghelper = ArgHelper(resolver, logger)
1476 arghelper = ArgHelper(logger)
14561477 arghelper.build_parser('%s %s' % (cmd, subcmd))
14571478 return arghelper
14581479
14591480 def main(argv):
1460 global tm
14611481 global th_factories
14621482 global explicit_delegations
14631483 global odd_ports
14641484
14651485 try:
1466 _init_tm()
14671486 arghelper = build_helper(logger, sys.argv[0], argv[0])
14681487 arghelper.parse_args(argv[1:])
14691488 logger.setLevel(arghelper.get_log_level())
15121531 kwargs = {}
15131532 dnsviz_meta = { 'version': DNS_RAW_VERSION, 'names': [lb2s(n.to_text()) for n in arghelper.names] }
15141533
1534 NameServerMappingsForDomain.cleanup_resolvers()
1535
1536 _init_tm()
1537
15151538 name_objs = []
15161539 if arghelper.args.input_file:
15171540 cache = {}
15321555
15331556 name_objs = a.analyze(arghelper.names)
15341557
1558 _cleanup_tm()
1559
15351560 name_objs = [x for x in name_objs if x is not None]
15361561
15371562 if not name_objs:
15521577 logger.error('Interrupted.')
15531578 sys.exit(4)
15541579
1555 # tm is global (because of possible multiprocessing), so we need to
1556 # explicitly close it here
1557 finally:
1558 _cleanup_tm()
1559
15601580 if __name__ == "__main__":
15611581 main(sys.argv)
15411541 # If this was a network error, determine if it was a binding
15421542 # error
15431543 if err == RESPONSE_ERROR_NETWORK_ERROR:
1544 if errno1 == errno.EADDRNOTAVAIL:
1545 # Address not unavailable
1546 if qh._client is not None:
1547 raise SourceAddressBindError('Unable to bind to local address %s (%s)' % (qh._client, errno.errorcode[errno1]))
1548 else:
1549 raise SourceAddressBindError('Unable to bind to local address (%s)' % (errno.errorcode[errno1]))
1544 if errno1 == errno.EADDRNOTAVAIL and qh._client is not None:
1545 raise SourceAddressBindError('Unable to bind to local address %s (%s)' % (qh._client, errno.errorcode[errno1]))
15501546 elif errno1 == errno.EADDRINUSE or \
15511547 (errno1 == errno.EACCES and qtm.src is None):
15521548 # Address/port in use (EADDRINUSE) or insufficient
15551551 raise PortBindError('Unable to bind to local port %d (%s)' % (qh.params['sport'], errno.errorcode[errno1]))
15561552 else:
15571553 raise PortBindError('Unable to bind to local port (%s)' % (errno.errorcode[errno1]))
1558 elif qtm.src is None and errno1 not in (errno.EHOSTUNREACH, errno.ENETUNREACH, errno.EAFNOSUPPORT):
1559 # If source is None it didn't bind properly. If the
1560 # errno1 value after bind() is EHOSTUNREACH or
1561 # ENETUNREACH, it is because there was no proper IPv4
1562 # or IPv6 connectivity (which is handled elsewhere).
1563 # If socket() failed and resulted in an errno value of
1564 # EAFNOSUPPORT, then likewise there is not IPv6
1565 # support. In other cases, it was something unknown, so
1554 elif qtm.src is None and errno1 not in (errno.EHOSTUNREACH, errno.ENETUNREACH, errno.EAFNOSUPPORT, errno.EADDRNOTAVAIL):
1555 # If source is None it didn't bind properly. There are several sub-cases:
1556 # 1. If the bind() failed and resulted in an errno
1557 # value of EHOSTUNREACH, it is because there was no
1558 # proper IPv4 or IPv6 connectivity; the error for
1559 # this is handled elsewhere).
1560 # 2. If socket() failed and resulted in an errno value
1561 # of EAFNOSUPPORT, then there is no IPv6 support.
1562 # 3. If connect() failed and resulted in an errno value
1563 # of EADDRNOTAVAIL, then there is no IPv6 support.
1564 # In other cases, it was something unknown, so
15661565 # raise an error.
15671566 raise BindError('Unable to bind to local address (%s)' % (errno.errorcode.get(errno1, "unknown")))
15681567
14021402 self._event_map = {}
14031403
14041404 self._close = threading.Event()
1405 t = threading.Thread(target=self._loop)
1405 # python3/python2 dual compatibility
1406 try:
1407 # python 3
1408 t = threading.Thread(target=self._loop, daemon=True)
1409 except TypeError:
1410 # python 2
1411 t = threading.Thread(target=self._loop)
1412 t.daemon = True
14061413 t.start()
14071414
14081415 def close(self):
183183 ip = IPAddr(s.getsockname()[0])
184184 s.close()
185185 return ip
186
187 def io_try_buffered(f, mode, closefd=True):
188 """Try opening buffered reader, but allow unbuffered on failure.
189
190 Required to pass tests under pytest."""
191 try:
192 return io.open(f.fileno(), mode, closefd=closefd)
193 except io.UnsupportedOperation:
194 # raised by f.fileno()
195 return f
12951295 for signed_keys, rrset_info in name_obj.get_dnskey_sets():
12961296 for rrsig in name_obj.rrsig_status[rrset_info]:
12971297 signer_obj = name_obj.get_name(rrsig.signer)
1298 if rrsig.signer != name_obj.name and not is_dlv:
1299 self.graph_zone_auth(signer_obj, False)
1298 if signer_obj is not None:
1299 # if we have the analysis corresponding to the signer, then
1300 # graph it too, if it was different from what we were
1301 # expecting
1302 if rrsig.signer != name_obj.name and not is_dlv:
1303 self.graph_zone_auth(signer_obj, False)
13001304 for dnskey in name_obj.rrsig_status[rrset_info][rrsig]:
13011305 rrsig_status = name_obj.rrsig_status[rrset_info][rrsig][dnskey]
13021306 if dnskey is None:
126126 share/html/dnssec-template.html
127127 share/js/dnsviz.js
128128 share/trusted-keys/root.txt
129 tests/dnsviz_graph_options.py
130 tests/dnsviz_graph_run.py
131 tests/dnsviz_grok_options.py
132 tests/dnsviz_grok_run.py
133 tests/dnsviz_print_options.py
134 tests/dnsviz_print_run.py
135 tests/dnsviz_probe_options.py
136 tests/dnsviz_probe_run_offline.py
137 tests/dnsviz_probe_run_online.py
129 tests/test_dnsviz_graph_options.py
130 tests/test_dnsviz_graph_run.py
131 tests/test_dnsviz_grok_options.py
132 tests/test_dnsviz_grok_run.py
133 tests/test_dnsviz_print_options.py
134 tests/test_dnsviz_print_run.py
135 tests/test_dnsviz_probe_options.py
136 tests/test_dnsviz_probe_run_offline.py
137 tests/test_dnsviz_probe_run_online.py
138138 tests/data/example-authoritative.json.gz
139139 tests/data/example-recursive.json.gz
140140 tests/data/root-authoritative.json.gz
+0
-346
tests/dnsviz_graph_options.py less more
0 import argparse
1 import binascii
2 import datetime
3 import gzip
4 import importlib
5 import io
6 import logging
7 import os
8 import subprocess
9 import tempfile
10 import unittest
11
12 import dns.name, dns.rdatatype, dns.rrset, dns.zone
13
14 from dnsviz.format import utc
15 from dnsviz.util import get_default_trusted_keys
16
17 mod = importlib.import_module('dnsviz.commands.graph')
18 GraphArgHelper = getattr(mod, 'GraphArgHelper')
19 AnalysisInputError = getattr(mod, 'AnalysisInputError')
20
21 DATA_DIR = os.path.dirname(__file__)
22 EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
23
24
25 class DNSVizGraphOptionsTestCase(unittest.TestCase):
26 def setUp(self):
27 self.logger = logging.getLogger()
28 for handler in self.logger.handlers:
29 self.logger.removeHandler(handler)
30 self.logger.addHandler(logging.NullHandler())
31
32 def test_rrtype_list(self):
33 arg1 = 'A,AAAA,MX,CNAME'
34 arg1_with_spaces = ' A , AAAA , MX , CNAME '
35 arg2 = 'A'
36 arg3 = 'A,BLAH'
37 arg4_empty = ''
38 arg4_empty_spaces = ' '
39
40 type_list1 = [dns.rdatatype.A, dns.rdatatype.AAAA, dns.rdatatype.MX, dns.rdatatype.CNAME]
41 type_list2 = [dns.rdatatype.A]
42 empty_list = []
43
44 self.assertEqual(GraphArgHelper.comma_separated_dns_types(arg1), type_list1)
45 self.assertEqual(GraphArgHelper.comma_separated_dns_types(arg1_with_spaces), type_list1)
46 self.assertEqual(GraphArgHelper.comma_separated_dns_types(arg2), type_list2)
47 self.assertEqual(GraphArgHelper.comma_separated_dns_types(arg4_empty), empty_list)
48 self.assertEqual(GraphArgHelper.comma_separated_dns_types(arg4_empty_spaces), empty_list)
49
50 # invalid schema
51 with self.assertRaises(argparse.ArgumentTypeError):
52 GraphArgHelper.comma_separated_dns_types(arg3)
53
54 def test_integer_list(self):
55 arg1 = '1,2,3,4,5'
56 arg1_with_spaces = ' 1 , 2 , 3 , 4 , 5 '
57 arg2 = '1'
58 arg3 = '1,A'
59 arg4_empty = ''
60 arg4_empty_spaces = ' '
61
62 int_list1 = [1,2,3,4,5]
63 int_list2 = [1]
64 empty_list = []
65
66 int_set1 = set([1,2,3,4,5])
67 int_set2 = set([1])
68 empty_set = set([])
69
70 self.assertEqual(GraphArgHelper.comma_separated_ints(arg1), int_list1)
71 self.assertEqual(GraphArgHelper.comma_separated_ints(arg1_with_spaces), int_list1)
72 self.assertEqual(GraphArgHelper.comma_separated_ints(arg2), int_list2)
73 self.assertEqual(GraphArgHelper.comma_separated_ints(arg4_empty), empty_list)
74 self.assertEqual(GraphArgHelper.comma_separated_ints(arg4_empty_spaces), empty_list)
75
76 self.assertEqual(GraphArgHelper.comma_separated_ints_set(arg1), int_set1)
77 self.assertEqual(GraphArgHelper.comma_separated_ints_set(arg1_with_spaces), int_set1)
78 self.assertEqual(GraphArgHelper.comma_separated_ints_set(arg2), int_set2)
79 self.assertEqual(GraphArgHelper.comma_separated_ints_set(arg4_empty), empty_set)
80 self.assertEqual(GraphArgHelper.comma_separated_ints_set(arg4_empty_spaces), empty_set)
81
82 # invalid schema
83 with self.assertRaises(argparse.ArgumentTypeError):
84 GraphArgHelper.comma_separated_ints(arg3)
85
86 def test_valid_domain_name(self):
87 arg1 = '.'
88 arg2 = 'www.example.com'
89 arg3 = 'www..example.com'
90
91 self.assertEqual(GraphArgHelper.valid_domain_name(arg1), dns.name.from_text(arg1))
92 self.assertEqual(GraphArgHelper.valid_domain_name(arg2), dns.name.from_text(arg2))
93
94 # invalid domain name
95 with self.assertRaises(argparse.ArgumentTypeError):
96 GraphArgHelper.valid_domain_name(arg3)
97
98 def test_ingest_input(self):
99 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_bad_json:
100 example_bad_json.write(b'{')
101
102 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_no_version:
103 example_no_version.write(b'{}')
104
105 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_invalid_version_1:
106 example_invalid_version_1.write(b'{ "_meta._dnsviz.": { "version": 1.11 } }')
107
108 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_invalid_version_2:
109 example_invalid_version_2.write(b'{ "_meta._dnsviz.": { "version": 5.0 } }')
110
111 with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
112 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_auth_out:
113 example_auth_out.write(example_auth_in.read())
114
115 try:
116 args = ['-r', example_auth_out.name]
117 arghelper = GraphArgHelper(self.logger)
118 arghelper.build_parser('graph')
119 arghelper.parse_args(args)
120 arghelper.ingest_input()
121
122 # Bad json
123 args = ['-r', example_bad_json.name]
124 arghelper = GraphArgHelper(self.logger)
125 arghelper.build_parser('graph')
126 arghelper.parse_args(args)
127 with self.assertRaises(AnalysisInputError):
128 arghelper.ingest_input()
129
130 # No version
131 args = ['-r', example_no_version.name]
132 arghelper = GraphArgHelper(self.logger)
133 arghelper.build_parser('graph')
134 arghelper.parse_args(args)
135 with self.assertRaises(AnalysisInputError):
136 arghelper.ingest_input()
137
138 # Invalid version
139 args = ['-r', example_invalid_version_1.name]
140 arghelper = GraphArgHelper(self.logger)
141 arghelper.build_parser('graph')
142 arghelper.parse_args(args)
143 with self.assertRaises(AnalysisInputError):
144 arghelper.ingest_input()
145
146 # Invalid version
147 args = ['-r', example_invalid_version_2.name]
148 arghelper = GraphArgHelper(self.logger)
149 arghelper.build_parser('graph')
150 arghelper.parse_args(args)
151 with self.assertRaises(AnalysisInputError):
152 arghelper.ingest_input()
153
154 finally:
155 for tmpfile in (example_auth_out, example_bad_json, example_no_version, \
156 example_invalid_version_1, example_invalid_version_2):
157 os.remove(tmpfile.name)
158
159 def test_ingest_names(self):
160 args = ['example.com', 'example.net']
161 arghelper = GraphArgHelper(self.logger)
162 arghelper.build_parser('graph')
163 arghelper.parse_args(args)
164 arghelper.ingest_names()
165 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net')])
166
167 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as names_file:
168 names_file.write(b'example.com\nexample.net\n')
169
170 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_names_only:
171 example_names_only.write(b'{ "_meta._dnsviz.": { "version": 1.2, "names": [ "example.com.", "example.net.", "example.org." ] } }')
172
173 try:
174 args = ['-f', names_file.name]
175 arghelper = GraphArgHelper(self.logger)
176 arghelper.build_parser('graph')
177 arghelper.parse_args(args)
178 arghelper.ingest_names()
179 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net')])
180
181 args = ['-r', example_names_only.name]
182 arghelper = GraphArgHelper(self.logger)
183 arghelper.build_parser('graph')
184 arghelper.parse_args(args)
185 arghelper.ingest_input()
186 arghelper.ingest_names()
187 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net'), dns.name.from_text('example.org')])
188
189 args = ['-r', example_names_only.name, 'example.com']
190 arghelper = GraphArgHelper(self.logger)
191 arghelper.build_parser('graph')
192 arghelper.parse_args(args)
193 arghelper.ingest_input()
194 arghelper.ingest_names()
195 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com')])
196 finally:
197 for tmpfile in (names_file, example_names_only):
198 os.remove(tmpfile.name)
199
200 def test_trusted_keys_file(self):
201 tk1 = 'example.com. IN DNSKEY 256 3 7 AwEAAZ2YEuBl4X58v1CezDfZjT1viYn5kY3MF3lSDjvHjMZ6gJlYt4Qq oIdpChifmeJldEX9/wPc04Tg7MlEfV3m0x2j80dMyObM0FZTxzMgbTFk Zs0AWrDXELieGkFZv1FB9YoxSX2XqvpFxwvPyyszUtCy/c5hrb6vfKRB Jh+qIO+NsNrl6O8NiYjWWNjdiFw+c2BxzpArQoaA+rcoyDYwH4xGpvTw YLnE9HmkwTSQuwASkgWgX3KgTmsDEw4I0P5Tk+wvmNnaqDhmFMHJK5Oh 92wUX+ppxxSgUx4UIJmftzi7sCg0qekIYUf99Dkn7OlC8X0rjj+xO4cD hbTjGkxmsD0='
202 tk2 = 'example.com. IN DNSKEY 256 3 7 AwEAAaerI6CXvvG6U3UxkB0PXj+ORyGFtABYJ6JG3NL6w1KKlZl+73AS aPEEa7SXeuWmAWE1N3rsbnrMBvepBXkCbP609eoo2mJ8bsozT/NNwSSc FP1Ddw4wxpZAC/+/K736rF1HbI3ROS/rBTr7RW6rWzcyPbYFuUMVzrAM ZSJNJsTDcmyGc5Is3cFzNcrd3/Gmcjt8TKMmGq51HXWzFvxro7EH6aOl K6G4O4+mzaUKp91mg7DAVhX8yXnadXUZQ4yDfLzSleYQ2TroQqeSgI3X m/gUoACm3ELUOr84TmIKZ67X/zBTx8tHC5iBWY2tbIKqiJY7I4/aW4S4 NraCSRbDpbM='
203 tk1_rdata = ' '.join(tk1.split()[3:])
204 tk2_rdata = ' '.join(tk2.split()[3:])
205 tk_explicit = [(dns.name.from_text('example.com'), dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DNSKEY, tk1_rdata)),
206 (dns.name.from_text('example.com'), dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DNSKEY, tk2_rdata))]
207
208 now = datetime.datetime.now(utc)
209 tk_default = get_default_trusted_keys(now)
210
211 args = ['example.com']
212 arghelper = GraphArgHelper(self.logger)
213 arghelper.build_parser('graph')
214 arghelper.parse_args(args)
215 arghelper.aggregate_trusted_key_info()
216 self.assertEqual(arghelper.trusted_keys, None)
217 arghelper.update_trusted_key_info(now)
218 self.assertEqual(arghelper.trusted_keys, tk_default)
219
220 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as tk1_file:
221 tk1_file.write(tk1.encode('utf-8'))
222
223 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as tk2_file:
224 tk2_file.write(tk2.encode('utf-8'))
225
226 try:
227 args = ['-t', tk1_file.name, '-t', tk2_file.name, 'example.com']
228 arghelper = GraphArgHelper(self.logger)
229 arghelper.build_parser('graph')
230 arghelper.parse_args(args)
231 arghelper.aggregate_trusted_key_info()
232 arghelper.update_trusted_key_info(now)
233 self.assertEqual(arghelper.trusted_keys, tk_explicit)
234
235 args = ['-t', '/dev/null', 'example.com']
236 arghelper = GraphArgHelper(self.logger)
237 arghelper.build_parser('graph')
238 arghelper.parse_args(args)
239 arghelper.aggregate_trusted_key_info()
240 arghelper.update_trusted_key_info(now)
241 self.assertEqual(arghelper.trusted_keys, [])
242
243 finally:
244 for tmpfile in (tk1_file, tk2_file):
245 os.remove(tmpfile.name)
246
247 def test_option_combination_errors(self):
248
249 # Names file and command-line domain names are mutually exclusive
250 args = ['-f', '/dev/null', 'example.com']
251 arghelper = GraphArgHelper(self.logger)
252 arghelper.build_parser('graph')
253 arghelper.parse_args(args)
254 with self.assertRaises(argparse.ArgumentTypeError):
255 arghelper.check_args()
256
257 # Names file and command-line domain names are mutually exclusive
258 args = ['-O', '-o', '/dev/null']
259 arghelper = GraphArgHelper(self.logger)
260 arghelper.build_parser('graph')
261 arghelper.parse_args(args)
262 with self.assertRaises(argparse.ArgumentTypeError):
263 arghelper.check_args()
264
265 # But this is allowed
266 args = ['-o', '/dev/null']
267 arghelper = GraphArgHelper(self.logger)
268 arghelper.build_parser('graph')
269 arghelper.parse_args(args)
270 arghelper.check_args()
271
272 # So is this
273 args = ['-O']
274 arghelper = GraphArgHelper(self.logger)
275 arghelper.build_parser('graph')
276 arghelper.parse_args(args)
277 arghelper.check_args()
278
279 def test_output_format(self):
280
281 args = ['-T', 'png', '-o', 'foo.dot']
282 arghelper = GraphArgHelper(self.logger)
283 arghelper.build_parser('graph')
284 arghelper.parse_args(args)
285 arghelper.set_kwargs()
286 self.assertEqual(arghelper.output_format, 'png')
287
288 args = ['-o', 'foo.dot']
289 arghelper = GraphArgHelper(self.logger)
290 arghelper.build_parser('graph')
291 arghelper.parse_args(args)
292 arghelper.set_kwargs()
293 self.assertEqual(arghelper.output_format, 'dot')
294
295 args = ['-o', 'foo.png']
296 arghelper = GraphArgHelper(self.logger)
297 arghelper.build_parser('graph')
298 arghelper.parse_args(args)
299 arghelper.set_kwargs()
300 self.assertEqual(arghelper.output_format, 'png')
301
302 args = ['-o', 'foo.html']
303 arghelper = GraphArgHelper(self.logger)
304 arghelper.build_parser('graph')
305 arghelper.parse_args(args)
306 arghelper.set_kwargs()
307 self.assertEqual(arghelper.output_format, 'html')
308
309 args = ['-o', 'foo.svg']
310 arghelper = GraphArgHelper(self.logger)
311 arghelper.build_parser('graph')
312 arghelper.parse_args(args)
313 arghelper.set_kwargs()
314 self.assertEqual(arghelper.output_format, 'svg')
315
316 args = ['-o', 'foo.xyz']
317 arghelper = GraphArgHelper(self.logger)
318 arghelper.build_parser('graph')
319 arghelper.parse_args(args)
320 with self.assertRaises(argparse.ArgumentTypeError):
321 arghelper.set_kwargs()
322
323 args = ['-o', 'png']
324 arghelper = GraphArgHelper(self.logger)
325 arghelper.build_parser('graph')
326 arghelper.parse_args(args)
327 with self.assertRaises(argparse.ArgumentTypeError):
328 arghelper.set_kwargs()
329
330 args = ['-o', '-']
331 arghelper = GraphArgHelper(self.logger)
332 arghelper.build_parser('graph')
333 arghelper.parse_args(args)
334 arghelper.set_kwargs()
335 self.assertEqual(arghelper.output_format, 'dot')
336
337 args = []
338 arghelper = GraphArgHelper(self.logger)
339 arghelper.build_parser('graph')
340 arghelper.parse_args(args)
341 arghelper.set_kwargs()
342 self.assertEqual(arghelper.output_format, 'dot')
343
344 if __name__ == '__main__':
345 unittest.main()
+0
-157
tests/dnsviz_graph_run.py less more
0 import gzip
1 import io
2 import os
3 import subprocess
4 import tempfile
5 import unittest
6
7 DATA_DIR = os.path.dirname(__file__)
8 EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
9 EXAMPLE_RECURSIVE = os.path.join(DATA_DIR, 'data', 'example-recursive.json.gz')
10 ROOT_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'root-authoritative.json.gz')
11 ROOT_RECURSIVE = os.path.join(DATA_DIR, 'data', 'root-recursive.json.gz')
12
13 class DNSGraphRunTestCase(unittest.TestCase):
14 def setUp(self):
15 self.devnull = io.open('/dev/null', 'wb')
16 self.current_cwd = os.getcwd()
17 self.dnsviz_bin = os.path.join(self.current_cwd, 'bin', 'dnsviz')
18
19 tk = 'example.com. IN DNSKEY 256 3 7 AwEAAZ2YEuBl4X58v1CezDfZjT1viYn5kY3MF3lSDjvHjMZ6gJlYt4Qq oIdpChifmeJldEX9/wPc04Tg7MlEfV3m0x2j80dMyObM0FZTxzMgbTFk Zs0AWrDXELieGkFZv1FB9YoxSX2XqvpFxwvPyyszUtCy/c5hrb6vfKRB Jh+qIO+NsNrl6O8NiYjWWNjdiFw+c2BxzpArQoaA+rcoyDYwH4xGpvTw YLnE9HmkwTSQuwASkgWgX3KgTmsDEw4I0P5Tk+wvmNnaqDhmFMHJK5Oh 92wUX+ppxxSgUx4UIJmftzi7sCg0qekIYUf99Dkn7OlC8X0rjj+xO4cD hbTjGkxmsD0='
20
21 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.tk_file:
22 self.tk_file.write(tk.encode('utf-8'))
23
24 with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
25 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_auth_out:
26 self.example_auth_out.write(example_auth_in.read())
27
28 with gzip.open(EXAMPLE_RECURSIVE, 'rb') as example_rec_in:
29 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_rec_out:
30 self.example_rec_out.write(example_rec_in.read())
31
32 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.names_file:
33 self.names_file.write('example.com\nexample.net\n'.encode('utf-8'))
34
35 self.output = tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False)
36 self.output.close()
37
38 self.run_cwd = tempfile.mkdtemp(prefix='dnsviz')
39
40 def tearDown(self):
41 self.devnull.close()
42 os.remove(self.tk_file.name)
43 os.remove(self.example_auth_out.name)
44 os.remove(self.example_rec_out.name)
45 os.remove(self.names_file.name)
46 os.remove(self.output.name)
47 subprocess.check_call(['rm', '-rf', self.run_cwd])
48
49 def test_dnsviz_graph_input(self):
50 with io.open(self.output.name, 'wb') as fh_out:
51 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
52 p = subprocess.Popen([self.dnsviz_bin, 'graph'], stdin=subprocess.PIPE, stdout=fh_out)
53 p.communicate(fh_in.read())
54 self.assertEqual(p.returncode, 0)
55
56 with io.open(self.output.name, 'wb') as fh_out:
57 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
58 p = subprocess.Popen([self.dnsviz_bin, 'graph', '-r', '-'], stdin=subprocess.PIPE, stdout=fh_out)
59 p.communicate(fh_in.read())
60 self.assertEqual(p.returncode, 0)
61
62 with io.open(self.output.name, 'wb') as fh:
63 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name], stdout=fh), 0)
64
65 def test_dnsviz_graph_names_input(self):
66 with io.open(self.output.name, 'wb') as fh:
67 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-f', self.names_file.name], stdout=fh), 0)
68
69 with io.open(self.output.name, 'wb') as fh_out:
70 with io.open(self.names_file.name, 'rb') as fh_in:
71 p = subprocess.Popen([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-f', '-'], stdin=subprocess.PIPE, stdout=fh_out)
72 p.communicate(fh_in.read())
73 self.assertEqual(p.returncode, 0)
74
75 def test_dnsviz_graph_tk_input(self):
76 with io.open(self.output.name, 'wb') as fh:
77 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-t', self.tk_file.name], stdout=fh), 0)
78
79 with io.open(self.output.name, 'wb') as fh_out:
80 with io.open(self.tk_file.name, 'rb') as fh_in:
81 p = subprocess.Popen([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-t', '-'], stdin=subprocess.PIPE, stdout=fh_out)
82 p.communicate(fh_in.read())
83 self.assertEqual(p.returncode, 0)
84
85 def test_dnsviz_graph_output(self):
86 with io.open(self.output.name, 'wb') as fh:
87 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name], cwd=self.run_cwd, stdout=fh), 0)
88
89 with io.open(self.output.name, 'wb') as fh:
90 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-Tdot', '-o', '-'], cwd=self.run_cwd, stdout=fh), 0)
91
92 with io.open(self.output.name, 'wb') as fh:
93 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-o', 'all.dot'], cwd=self.run_cwd, stdout=fh), 0)
94 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'all.dot')))
95 self.assertFalse(os.path.exists(os.path.join(self.run_cwd, 'example.com.dot')))
96 self.assertFalse(os.path.exists(os.path.join(self.run_cwd, 'example.net.dot')))
97
98 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-O'], cwd=self.run_cwd), 0)
99 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'example.com.dot')))
100 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'example.net.dot')))
101
102 def test_dnsviz_graph_input_auth(self):
103 with io.open(self.output.name, 'wb') as fh_out:
104 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
105 p = subprocess.Popen([self.dnsviz_bin, 'graph'], stdin=subprocess.PIPE, stdout=fh_out)
106 p.communicate(fh_in.read())
107 self.assertEqual(p.returncode, 0)
108
109 with io.open(self.output.name, 'wb') as fh_out:
110 with gzip.open(ROOT_AUTHORITATIVE) as fh_in:
111 p = subprocess.Popen([self.dnsviz_bin, 'graph'], stdin=subprocess.PIPE, stdout=fh_out)
112 p.communicate(fh_in.read())
113 self.assertEqual(p.returncode, 0)
114
115 def test_dnsviz_graph_input_rec(self):
116 with io.open(self.output.name, 'wb') as fh_out:
117 with gzip.open(EXAMPLE_RECURSIVE) as fh_in:
118 p = subprocess.Popen([self.dnsviz_bin, 'graph'], stdin=subprocess.PIPE, stdout=fh_out)
119 p.communicate(fh_in.read())
120 self.assertEqual(p.returncode, 0)
121
122 with io.open(self.output.name, 'wb') as fh_out:
123 with gzip.open(ROOT_RECURSIVE) as fh_in:
124 p = subprocess.Popen([self.dnsviz_bin, 'graph'], stdin=subprocess.PIPE, stdout=fh_out)
125 p.communicate(fh_in.read())
126 self.assertEqual(p.returncode, 0)
127
128 def test_dnsviz_graph_output_format(self):
129 magic_codes_mapping = {
130 'dot': b'digraph',
131 'png': b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A',
132 'svg': b'<?xml ',
133 'html': b'<?xml ',
134 }
135
136 for fmt in ('dot', 'png', 'svg', 'html'):
137 with io.open(self.output.name, 'wb') as fh:
138 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-o', 'all.'+fmt], cwd=self.run_cwd, stdout=fh), 0)
139 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'all.'+fmt)))
140 self.assertFalse(os.path.exists(os.path.join(self.run_cwd, 'example.com.' + fmt)))
141 self.assertFalse(os.path.exists(os.path.join(self.run_cwd, 'example.net.' + fmt)))
142
143 with io.open(os.path.join(self.run_cwd, 'all.' + fmt), 'rb') as fh:
144 first_bytes = fh.read(len(magic_codes_mapping[fmt]))
145 self.assertEqual(first_bytes, magic_codes_mapping[fmt])
146
147 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-T', fmt, '-O'], cwd=self.run_cwd), 0)
148 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'example.com.' + fmt)))
149 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'example.net.' + fmt)))
150
151 with io.open(os.path.join(self.run_cwd, 'example.com.' + fmt), 'rb') as fh:
152 first_bytes = fh.read(len(magic_codes_mapping[fmt]))
153 self.assertEqual(first_bytes, magic_codes_mapping[fmt])
154
155 if __name__ == '__main__':
156 unittest.main()
+0
-264
tests/dnsviz_grok_options.py less more
0 import argparse
1 import datetime
2 import gzip
3 import importlib
4 import io
5 import logging
6 import os
7 import subprocess
8 import tempfile
9 import unittest
10
11 import dns.name, dns.rdatatype, dns.rrset, dns.zone
12
13 from dnsviz.format import utc
14 from dnsviz.util import get_default_trusted_keys
15
16 mod = importlib.import_module('dnsviz.commands.grok')
17 GrokArgHelper = getattr(mod, 'GrokArgHelper')
18 AnalysisInputError = getattr(mod, 'AnalysisInputError')
19
20 DATA_DIR = os.path.dirname(__file__)
21 EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
22
23
24 class DNSVizGrokOptionsTestCase(unittest.TestCase):
25 def setUp(self):
26 self.logger = logging.getLogger()
27 for handler in self.logger.handlers:
28 self.logger.removeHandler(handler)
29 self.logger.addHandler(logging.NullHandler())
30
31 def test_integer_list(self):
32 arg1 = '1,2,3,4,5'
33 arg1_with_spaces = ' 1 , 2 , 3 , 4 , 5 '
34 arg2 = '1'
35 arg3 = '1,A'
36 arg4_empty = ''
37 arg4_empty_spaces = ' '
38
39 int_list1 = [1,2,3,4,5]
40 int_list2 = [1]
41 empty_list = []
42
43 int_set1 = set([1,2,3,4,5])
44 int_set2 = set([1])
45 empty_set = set([])
46
47 self.assertEqual(GrokArgHelper.comma_separated_ints(arg1), int_list1)
48 self.assertEqual(GrokArgHelper.comma_separated_ints(arg1_with_spaces), int_list1)
49 self.assertEqual(GrokArgHelper.comma_separated_ints(arg2), int_list2)
50 self.assertEqual(GrokArgHelper.comma_separated_ints(arg4_empty), empty_list)
51 self.assertEqual(GrokArgHelper.comma_separated_ints(arg4_empty_spaces), empty_list)
52
53 self.assertEqual(GrokArgHelper.comma_separated_ints_set(arg1), int_set1)
54 self.assertEqual(GrokArgHelper.comma_separated_ints_set(arg1_with_spaces), int_set1)
55 self.assertEqual(GrokArgHelper.comma_separated_ints_set(arg2), int_set2)
56 self.assertEqual(GrokArgHelper.comma_separated_ints_set(arg4_empty), empty_set)
57 self.assertEqual(GrokArgHelper.comma_separated_ints_set(arg4_empty_spaces), empty_set)
58
59 # invalid schema
60 with self.assertRaises(argparse.ArgumentTypeError):
61 GrokArgHelper.comma_separated_ints(arg3)
62
63 def test_valid_domain_name(self):
64 arg1 = '.'
65 arg2 = 'www.example.com'
66 arg3 = 'www..example.com'
67
68 self.assertEqual(GrokArgHelper.valid_domain_name(arg1), dns.name.from_text(arg1))
69 self.assertEqual(GrokArgHelper.valid_domain_name(arg2), dns.name.from_text(arg2))
70
71 # invalid domain name
72 with self.assertRaises(argparse.ArgumentTypeError):
73 GrokArgHelper.valid_domain_name(arg3)
74
75 def test_ingest_input(self):
76 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_bad_json:
77 example_bad_json.write(b'{')
78
79 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_no_version:
80 example_no_version.write(b'{}')
81
82 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_invalid_version_1:
83 example_invalid_version_1.write(b'{ "_meta._dnsviz.": { "version": 1.11 } }')
84
85 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_invalid_version_2:
86 example_invalid_version_2.write(b'{ "_meta._dnsviz.": { "version": 5.0 } }')
87
88 with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
89 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_auth_out:
90 example_auth_out.write(example_auth_in.read())
91
92 try:
93 args = ['-r', example_auth_out.name]
94 arghelper = GrokArgHelper(self.logger)
95 arghelper.build_parser('grok')
96 arghelper.parse_args(args)
97 arghelper.ingest_input()
98
99 # Bad json
100 args = ['-r', example_bad_json.name]
101 arghelper = GrokArgHelper(self.logger)
102 arghelper.build_parser('grok')
103 arghelper.parse_args(args)
104 with self.assertRaises(AnalysisInputError):
105 arghelper.ingest_input()
106
107 # No version
108 args = ['-r', example_no_version.name]
109 arghelper = GrokArgHelper(self.logger)
110 arghelper.build_parser('grok')
111 arghelper.parse_args(args)
112 with self.assertRaises(AnalysisInputError):
113 arghelper.ingest_input()
114
115 # Invalid version
116 args = ['-r', example_invalid_version_1.name]
117 arghelper = GrokArgHelper(self.logger)
118 arghelper.build_parser('grok')
119 arghelper.parse_args(args)
120 with self.assertRaises(AnalysisInputError):
121 arghelper.ingest_input()
122
123 # Invalid version
124 args = ['-r', example_invalid_version_2.name]
125 arghelper = GrokArgHelper(self.logger)
126 arghelper.build_parser('grok')
127 arghelper.parse_args(args)
128 with self.assertRaises(AnalysisInputError):
129 arghelper.ingest_input()
130
131 finally:
132 for tmpfile in (example_auth_out, example_bad_json, example_no_version, \
133 example_invalid_version_1, example_invalid_version_2):
134 os.remove(tmpfile.name)
135
136 def test_ingest_names(self):
137 args = ['example.com', 'example.net']
138 arghelper = GrokArgHelper(self.logger)
139 arghelper.build_parser('grok')
140 arghelper.parse_args(args)
141 arghelper.ingest_names()
142 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net')])
143
144 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as names_file:
145 names_file.write(b'example.com\nexample.net\n')
146
147 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_names_only:
148 example_names_only.write(b'{ "_meta._dnsviz.": { "version": 1.2, "names": [ "example.com.", "example.net.", "example.org." ] } }')
149
150 try:
151 args = ['-f', names_file.name]
152 arghelper = GrokArgHelper(self.logger)
153 arghelper.build_parser('grok')
154 arghelper.parse_args(args)
155 arghelper.ingest_names()
156 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net')])
157
158 args = ['-r', example_names_only.name]
159 arghelper = GrokArgHelper(self.logger)
160 arghelper.build_parser('grok')
161 arghelper.parse_args(args)
162 arghelper.ingest_input()
163 arghelper.ingest_names()
164 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net'), dns.name.from_text('example.org')])
165
166 args = ['-r', example_names_only.name, 'example.com']
167 arghelper = GrokArgHelper(self.logger)
168 arghelper.build_parser('grok')
169 arghelper.parse_args(args)
170 arghelper.ingest_input()
171 arghelper.ingest_names()
172 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com')])
173 finally:
174 for tmpfile in (names_file, example_names_only):
175 os.remove(tmpfile.name)
176
177 def test_trusted_keys_file(self):
178 tk1 = 'example.com. IN DNSKEY 256 3 7 AwEAAZ2YEuBl4X58v1CezDfZjT1viYn5kY3MF3lSDjvHjMZ6gJlYt4Qq oIdpChifmeJldEX9/wPc04Tg7MlEfV3m0x2j80dMyObM0FZTxzMgbTFk Zs0AWrDXELieGkFZv1FB9YoxSX2XqvpFxwvPyyszUtCy/c5hrb6vfKRB Jh+qIO+NsNrl6O8NiYjWWNjdiFw+c2BxzpArQoaA+rcoyDYwH4xGpvTw YLnE9HmkwTSQuwASkgWgX3KgTmsDEw4I0P5Tk+wvmNnaqDhmFMHJK5Oh 92wUX+ppxxSgUx4UIJmftzi7sCg0qekIYUf99Dkn7OlC8X0rjj+xO4cD hbTjGkxmsD0='
179 tk2 = 'example.com. IN DNSKEY 256 3 7 AwEAAaerI6CXvvG6U3UxkB0PXj+ORyGFtABYJ6JG3NL6w1KKlZl+73AS aPEEa7SXeuWmAWE1N3rsbnrMBvepBXkCbP609eoo2mJ8bsozT/NNwSSc FP1Ddw4wxpZAC/+/K736rF1HbI3ROS/rBTr7RW6rWzcyPbYFuUMVzrAM ZSJNJsTDcmyGc5Is3cFzNcrd3/Gmcjt8TKMmGq51HXWzFvxro7EH6aOl K6G4O4+mzaUKp91mg7DAVhX8yXnadXUZQ4yDfLzSleYQ2TroQqeSgI3X m/gUoACm3ELUOr84TmIKZ67X/zBTx8tHC5iBWY2tbIKqiJY7I4/aW4S4 NraCSRbDpbM='
180 tk1_rdata = ' '.join(tk1.split()[3:])
181 tk2_rdata = ' '.join(tk2.split()[3:])
182 tk_explicit = [(dns.name.from_text('example.com'), dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DNSKEY, tk1_rdata)),
183 (dns.name.from_text('example.com'), dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DNSKEY, tk2_rdata))]
184
185 args = ['example.com']
186 arghelper = GrokArgHelper(self.logger)
187 arghelper.build_parser('grok')
188 arghelper.parse_args(args)
189 arghelper.aggregate_trusted_key_info()
190 self.assertEqual(arghelper.trusted_keys, None)
191 arghelper.update_trusted_key_info()
192 self.assertEqual(arghelper.trusted_keys, [])
193
194 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as tk1_file:
195 tk1_file.write(tk1.encode('utf-8'))
196
197 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as tk2_file:
198 tk2_file.write(tk2.encode('utf-8'))
199
200 try:
201 args = ['-t', tk1_file.name, '-t', tk2_file.name, 'example.com']
202 arghelper = GrokArgHelper(self.logger)
203 arghelper.build_parser('grok')
204 arghelper.parse_args(args)
205 arghelper.aggregate_trusted_key_info()
206 arghelper.update_trusted_key_info()
207 self.assertEqual(arghelper.trusted_keys, tk_explicit)
208
209 args = ['-t', '/dev/null', 'example.com']
210 arghelper = GrokArgHelper(self.logger)
211 arghelper.build_parser('grok')
212 arghelper.parse_args(args)
213 arghelper.aggregate_trusted_key_info()
214 arghelper.update_trusted_key_info()
215 self.assertEqual(arghelper.trusted_keys, [])
216
217 finally:
218 for tmpfile in (tk1_file, tk2_file):
219 os.remove(tmpfile.name)
220
221 def test_option_combination_errors(self):
222
223 # Names file and command-line domain names are mutually exclusive
224 args = ['-f', '/dev/null', 'example.com']
225 arghelper = GrokArgHelper(self.logger)
226 arghelper.build_parser('grok')
227 arghelper.parse_args(args)
228 with self.assertRaises(argparse.ArgumentTypeError):
229 arghelper.check_args()
230
231 def test_log_level(self):
232
233 # Names file and command-line domain names are mutually exclusive
234 args = []
235 arghelper = GrokArgHelper(self.logger)
236 arghelper.build_parser('grok')
237 arghelper.parse_args(args)
238 arghelper.set_kwargs()
239 self.assertEqual(arghelper.log_level, logging.DEBUG)
240
241 args = ['-l', 'info']
242 arghelper = GrokArgHelper(self.logger)
243 arghelper.build_parser('grok')
244 arghelper.parse_args(args)
245 arghelper.set_kwargs()
246 self.assertEqual(arghelper.log_level, logging.INFO)
247
248 args = ['-l', 'warning']
249 arghelper = GrokArgHelper(self.logger)
250 arghelper.build_parser('grok')
251 arghelper.parse_args(args)
252 arghelper.set_kwargs()
253 self.assertEqual(arghelper.log_level, logging.WARNING)
254
255 args = ['-l', 'error']
256 arghelper = GrokArgHelper(self.logger)
257 arghelper.build_parser('grok')
258 arghelper.parse_args(args)
259 arghelper.set_kwargs()
260 self.assertEqual(arghelper.log_level, logging.ERROR)
261
262 if __name__ == '__main__':
263 unittest.main()
+0
-127
tests/dnsviz_grok_run.py less more
0 import gzip
1 import io
2 import os
3 import subprocess
4 import tempfile
5 import unittest
6
7 DATA_DIR = os.path.dirname(__file__)
8 EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
9 EXAMPLE_RECURSIVE = os.path.join(DATA_DIR, 'data', 'example-recursive.json.gz')
10 ROOT_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'root-authoritative.json.gz')
11 ROOT_RECURSIVE = os.path.join(DATA_DIR, 'data', 'root-recursive.json.gz')
12
13 class DNSGrokRunTestCase(unittest.TestCase):
14 def setUp(self):
15 self.devnull = io.open('/dev/null', 'wb')
16 self.current_cwd = os.getcwd()
17 self.dnsviz_bin = os.path.join(self.current_cwd, 'bin', 'dnsviz')
18
19 tk = 'example.com. IN DNSKEY 256 3 7 AwEAAZ2YEuBl4X58v1CezDfZjT1viYn5kY3MF3lSDjvHjMZ6gJlYt4Qq oIdpChifmeJldEX9/wPc04Tg7MlEfV3m0x2j80dMyObM0FZTxzMgbTFk Zs0AWrDXELieGkFZv1FB9YoxSX2XqvpFxwvPyyszUtCy/c5hrb6vfKRB Jh+qIO+NsNrl6O8NiYjWWNjdiFw+c2BxzpArQoaA+rcoyDYwH4xGpvTw YLnE9HmkwTSQuwASkgWgX3KgTmsDEw4I0P5Tk+wvmNnaqDhmFMHJK5Oh 92wUX+ppxxSgUx4UIJmftzi7sCg0qekIYUf99Dkn7OlC8X0rjj+xO4cD hbTjGkxmsD0='
20
21 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.tk_file:
22 self.tk_file.write(tk.encode('utf-8'))
23
24 with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
25 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_auth_out:
26 self.example_auth_out.write(example_auth_in.read())
27
28 with gzip.open(EXAMPLE_RECURSIVE, 'rb') as example_rec_in:
29 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_rec_out:
30 self.example_rec_out.write(example_rec_in.read())
31
32 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.names_file:
33 self.names_file.write('example.com\nexample.net\n'.encode('utf-8'))
34
35 self.output = tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False)
36 self.output.close()
37
38 self.run_cwd = tempfile.mkdtemp(prefix='dnsviz')
39
40 def tearDown(self):
41 self.devnull.close()
42 os.remove(self.tk_file.name)
43 os.remove(self.example_auth_out.name)
44 os.remove(self.example_rec_out.name)
45 os.remove(self.names_file.name)
46 os.remove(self.output.name)
47 subprocess.check_call(['rm', '-rf', self.run_cwd])
48
49 def test_dnsviz_grok_input(self):
50 with io.open(self.output.name, 'wb') as fh_out:
51 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
52 p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
53 p.communicate(fh_in.read())
54 self.assertEqual(p.returncode, 0)
55
56 with io.open(self.output.name, 'wb') as fh_out:
57 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
58 p = subprocess.Popen([self.dnsviz_bin, 'grok', '-r', '-'], stdin=subprocess.PIPE, stdout=fh_out)
59 p.communicate(fh_in.read())
60 self.assertEqual(p.returncode, 0)
61
62 with io.open(self.output.name, 'wb') as fh:
63 self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name], stdout=fh), 0)
64
65 def test_dnsviz_grok_names_input(self):
66 with io.open(self.output.name, 'wb') as fh:
67 self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-f', self.names_file.name], stdout=fh), 0)
68
69 with io.open(self.output.name, 'wb') as fh_out:
70 with io.open(self.names_file.name, 'rb') as fh_in:
71 p = subprocess.Popen([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-f', '-'], stdin=subprocess.PIPE, stdout=fh_out)
72 p.communicate(fh_in.read())
73 self.assertEqual(p.returncode, 0)
74
75 def test_dnsviz_grok_tk_input(self):
76 pass
77
78 #FIXME
79 #with io.open(self.output.name, 'wb') as fh:
80 # self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-t', self.tk_file.name], stdout=fh), 0)
81
82 #with io.open(self.output.name, 'wb') as fh_out:
83 # with io.open(self.tk_file.name, 'rb') as fh_in:
84 # p = subprocess.Popen([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-t', '-'], stdin=subprocess.PIPE, stdout=fh_out)
85 # p.communicate(fh_in.read())
86 # self.assertEqual(p.returncode, 0)
87
88 def test_dnsviz_grok_output(self):
89 with io.open(self.output.name, 'wb') as fh:
90 self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name], cwd=self.run_cwd, stdout=fh), 0)
91
92 with io.open(self.output.name, 'wb') as fh:
93 self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-o', '-'], cwd=self.run_cwd, stdout=fh), 0)
94
95 with io.open(self.output.name, 'wb') as fh:
96 self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-o', 'all.json'], cwd=self.run_cwd, stdout=fh), 0)
97 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'all.json')))
98
99 def test_dnsviz_grok_input_auth(self):
100 with io.open(self.output.name, 'wb') as fh_out:
101 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
102 p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
103 p.communicate(fh_in.read())
104 self.assertEqual(p.returncode, 0)
105
106 with io.open(self.output.name, 'wb') as fh_out:
107 with gzip.open(ROOT_AUTHORITATIVE) as fh_in:
108 p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
109 p.communicate(fh_in.read())
110 self.assertEqual(p.returncode, 0)
111
112 def test_dnsviz_grok_input_rec(self):
113 with io.open(self.output.name, 'wb') as fh_out:
114 with gzip.open(EXAMPLE_RECURSIVE) as fh_in:
115 p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
116 p.communicate(fh_in.read())
117 self.assertEqual(p.returncode, 0)
118
119 with io.open(self.output.name, 'wb') as fh_out:
120 with gzip.open(ROOT_RECURSIVE) as fh_in:
121 p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
122 p.communicate(fh_in.read())
123 self.assertEqual(p.returncode, 0)
124
125 if __name__ == '__main__':
126 unittest.main()
+0
-281
tests/dnsviz_print_options.py less more
0 import argparse
1 import binascii
2 import datetime
3 import gzip
4 import importlib
5 import io
6 import logging
7 import os
8 import subprocess
9 import tempfile
10 import unittest
11
12 import dns.name, dns.rdatatype, dns.rrset, dns.zone
13
14 from dnsviz.format import utc
15 from dnsviz.util import get_default_trusted_keys
16
17 mod = importlib.import_module('dnsviz.commands.print')
18 PrintArgHelper = getattr(mod, 'PrintArgHelper')
19 AnalysisInputError = getattr(mod, 'AnalysisInputError')
20
21 DATA_DIR = os.path.dirname(__file__)
22 EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
23
24
25 class DNSVizPrintOptionsTestCase(unittest.TestCase):
26 def setUp(self):
27 self.logger = logging.getLogger()
28 for handler in self.logger.handlers:
29 self.logger.removeHandler(handler)
30 self.logger.addHandler(logging.NullHandler())
31
32 def test_rrtype_list(self):
33 arg1 = 'A,AAAA,MX,CNAME'
34 arg1_with_spaces = ' A , AAAA , MX , CNAME '
35 arg2 = 'A'
36 arg3 = 'A,BLAH'
37 arg4_empty = ''
38 arg4_empty_spaces = ' '
39
40 type_list1 = [dns.rdatatype.A, dns.rdatatype.AAAA, dns.rdatatype.MX, dns.rdatatype.CNAME]
41 type_list2 = [dns.rdatatype.A]
42 empty_list = []
43
44 self.assertEqual(PrintArgHelper.comma_separated_dns_types(arg1), type_list1)
45 self.assertEqual(PrintArgHelper.comma_separated_dns_types(arg1_with_spaces), type_list1)
46 self.assertEqual(PrintArgHelper.comma_separated_dns_types(arg2), type_list2)
47 self.assertEqual(PrintArgHelper.comma_separated_dns_types(arg4_empty), empty_list)
48 self.assertEqual(PrintArgHelper.comma_separated_dns_types(arg4_empty_spaces), empty_list)
49
50 # invalid schema
51 with self.assertRaises(argparse.ArgumentTypeError):
52 PrintArgHelper.comma_separated_dns_types(arg3)
53
54 def test_integer_list(self):
55 arg1 = '1,2,3,4,5'
56 arg1_with_spaces = ' 1 , 2 , 3 , 4 , 5 '
57 arg2 = '1'
58 arg3 = '1,A'
59 arg4_empty = ''
60 arg4_empty_spaces = ' '
61
62 int_list1 = [1,2,3,4,5]
63 int_list2 = [1]
64 empty_list = []
65
66 int_set1 = set([1,2,3,4,5])
67 int_set2 = set([1])
68 empty_set = set([])
69
70 self.assertEqual(PrintArgHelper.comma_separated_ints(arg1), int_list1)
71 self.assertEqual(PrintArgHelper.comma_separated_ints(arg1_with_spaces), int_list1)
72 self.assertEqual(PrintArgHelper.comma_separated_ints(arg2), int_list2)
73 self.assertEqual(PrintArgHelper.comma_separated_ints(arg4_empty), empty_list)
74 self.assertEqual(PrintArgHelper.comma_separated_ints(arg4_empty_spaces), empty_list)
75
76 self.assertEqual(PrintArgHelper.comma_separated_ints_set(arg1), int_set1)
77 self.assertEqual(PrintArgHelper.comma_separated_ints_set(arg1_with_spaces), int_set1)
78 self.assertEqual(PrintArgHelper.comma_separated_ints_set(arg2), int_set2)
79 self.assertEqual(PrintArgHelper.comma_separated_ints_set(arg4_empty), empty_set)
80 self.assertEqual(PrintArgHelper.comma_separated_ints_set(arg4_empty_spaces), empty_set)
81
82 # invalid schema
83 with self.assertRaises(argparse.ArgumentTypeError):
84 PrintArgHelper.comma_separated_ints(arg3)
85
86 def test_valid_domain_name(self):
87 arg1 = '.'
88 arg2 = 'www.example.com'
89 arg3 = 'www..example.com'
90
91 self.assertEqual(PrintArgHelper.valid_domain_name(arg1), dns.name.from_text(arg1))
92 self.assertEqual(PrintArgHelper.valid_domain_name(arg2), dns.name.from_text(arg2))
93
94 # invalid domain name
95 with self.assertRaises(argparse.ArgumentTypeError):
96 PrintArgHelper.valid_domain_name(arg3)
97
98 def test_ingest_input(self):
99 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_bad_json:
100 example_bad_json.write(b'{')
101
102 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_no_version:
103 example_no_version.write(b'{}')
104
105 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_invalid_version_1:
106 example_invalid_version_1.write(b'{ "_meta._dnsviz.": { "version": 1.11 } }')
107
108 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_invalid_version_2:
109 example_invalid_version_2.write(b'{ "_meta._dnsviz.": { "version": 5.0 } }')
110
111 with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
112 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_auth_out:
113 example_auth_out.write(example_auth_in.read())
114
115 try:
116 args = ['-r', example_auth_out.name]
117 arghelper = PrintArgHelper(self.logger)
118 arghelper.build_parser('print')
119 arghelper.parse_args(args)
120 arghelper.ingest_input()
121
122 # Bad json
123 args = ['-r', example_bad_json.name]
124 arghelper = PrintArgHelper(self.logger)
125 arghelper.build_parser('print')
126 arghelper.parse_args(args)
127 with self.assertRaises(AnalysisInputError):
128 arghelper.ingest_input()
129
130 # No version
131 args = ['-r', example_no_version.name]
132 arghelper = PrintArgHelper(self.logger)
133 arghelper.build_parser('print')
134 arghelper.parse_args(args)
135 with self.assertRaises(AnalysisInputError):
136 arghelper.ingest_input()
137
138 # Invalid version
139 args = ['-r', example_invalid_version_1.name]
140 arghelper = PrintArgHelper(self.logger)
141 arghelper.build_parser('print')
142 arghelper.parse_args(args)
143 with self.assertRaises(AnalysisInputError):
144 arghelper.ingest_input()
145
146 # Invalid version
147 args = ['-r', example_invalid_version_2.name]
148 arghelper = PrintArgHelper(self.logger)
149 arghelper.build_parser('print')
150 arghelper.parse_args(args)
151 with self.assertRaises(AnalysisInputError):
152 arghelper.ingest_input()
153
154 finally:
155 for tmpfile in (example_auth_out, example_bad_json, example_no_version, \
156 example_invalid_version_1, example_invalid_version_2):
157 os.remove(tmpfile.name)
158
159 def test_ingest_names(self):
160 args = ['example.com', 'example.net']
161 arghelper = PrintArgHelper(self.logger)
162 arghelper.build_parser('print')
163 arghelper.parse_args(args)
164 arghelper.ingest_names()
165 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net')])
166
167 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as names_file:
168 names_file.write(b'example.com\nexample.net\n')
169
170 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_names_only:
171 example_names_only.write(b'{ "_meta._dnsviz.": { "version": 1.2, "names": [ "example.com.", "example.net.", "example.org." ] } }')
172
173 try:
174 args = ['-f', names_file.name]
175 arghelper = PrintArgHelper(self.logger)
176 arghelper.build_parser('print')
177 arghelper.parse_args(args)
178 arghelper.ingest_names()
179 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net')])
180
181 args = ['-r', example_names_only.name]
182 arghelper = PrintArgHelper(self.logger)
183 arghelper.build_parser('print')
184 arghelper.parse_args(args)
185 arghelper.ingest_input()
186 arghelper.ingest_names()
187 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net'), dns.name.from_text('example.org')])
188
189 args = ['-r', example_names_only.name, 'example.com']
190 arghelper = PrintArgHelper(self.logger)
191 arghelper.build_parser('print')
192 arghelper.parse_args(args)
193 arghelper.ingest_input()
194 arghelper.ingest_names()
195 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com')])
196 finally:
197 for tmpfile in (names_file, example_names_only):
198 os.remove(tmpfile.name)
199
200 def test_trusted_keys_file(self):
201 tk1 = 'example.com. IN DNSKEY 256 3 7 AwEAAZ2YEuBl4X58v1CezDfZjT1viYn5kY3MF3lSDjvHjMZ6gJlYt4Qq oIdpChifmeJldEX9/wPc04Tg7MlEfV3m0x2j80dMyObM0FZTxzMgbTFk Zs0AWrDXELieGkFZv1FB9YoxSX2XqvpFxwvPyyszUtCy/c5hrb6vfKRB Jh+qIO+NsNrl6O8NiYjWWNjdiFw+c2BxzpArQoaA+rcoyDYwH4xGpvTw YLnE9HmkwTSQuwASkgWgX3KgTmsDEw4I0P5Tk+wvmNnaqDhmFMHJK5Oh 92wUX+ppxxSgUx4UIJmftzi7sCg0qekIYUf99Dkn7OlC8X0rjj+xO4cD hbTjGkxmsD0='
202 tk2 = 'example.com. IN DNSKEY 256 3 7 AwEAAaerI6CXvvG6U3UxkB0PXj+ORyGFtABYJ6JG3NL6w1KKlZl+73AS aPEEa7SXeuWmAWE1N3rsbnrMBvepBXkCbP609eoo2mJ8bsozT/NNwSSc FP1Ddw4wxpZAC/+/K736rF1HbI3ROS/rBTr7RW6rWzcyPbYFuUMVzrAM ZSJNJsTDcmyGc5Is3cFzNcrd3/Gmcjt8TKMmGq51HXWzFvxro7EH6aOl K6G4O4+mzaUKp91mg7DAVhX8yXnadXUZQ4yDfLzSleYQ2TroQqeSgI3X m/gUoACm3ELUOr84TmIKZ67X/zBTx8tHC5iBWY2tbIKqiJY7I4/aW4S4 NraCSRbDpbM='
203 tk1_rdata = ' '.join(tk1.split()[3:])
204 tk2_rdata = ' '.join(tk2.split()[3:])
205 tk_explicit = [(dns.name.from_text('example.com'), dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DNSKEY, tk1_rdata)),
206 (dns.name.from_text('example.com'), dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DNSKEY, tk2_rdata))]
207
208 now = datetime.datetime.now(utc)
209 tk_default = get_default_trusted_keys(now)
210
211 args = ['example.com']
212 arghelper = PrintArgHelper(self.logger)
213 arghelper.build_parser('print')
214 arghelper.parse_args(args)
215 arghelper.aggregate_trusted_key_info()
216 self.assertEqual(arghelper.trusted_keys, None)
217 arghelper.update_trusted_key_info(now)
218 self.assertEqual(arghelper.trusted_keys, tk_default)
219
220 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as tk1_file:
221 tk1_file.write(tk1.encode('utf-8'))
222
223 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as tk2_file:
224 tk2_file.write(tk2.encode('utf-8'))
225
226 try:
227 args = ['-t', tk1_file.name, '-t', tk2_file.name, 'example.com']
228 arghelper = PrintArgHelper(self.logger)
229 arghelper.build_parser('print')
230 arghelper.parse_args(args)
231 arghelper.aggregate_trusted_key_info()
232 arghelper.update_trusted_key_info(now)
233 self.assertEqual(arghelper.trusted_keys, tk_explicit)
234
235 args = ['-t', '/dev/null', 'example.com']
236 arghelper = PrintArgHelper(self.logger)
237 arghelper.build_parser('print')
238 arghelper.parse_args(args)
239 arghelper.aggregate_trusted_key_info()
240 arghelper.update_trusted_key_info(now)
241 self.assertEqual(arghelper.trusted_keys, [])
242
243 finally:
244 for tmpfile in (tk1_file, tk2_file):
245 os.remove(tmpfile.name)
246
247 def test_option_combination_errors(self):
248
249 # Names file and command-line domain names are mutually exclusive
250 args = ['-f', '/dev/null', 'example.com']
251 arghelper = PrintArgHelper(self.logger)
252 arghelper.build_parser('print')
253 arghelper.parse_args(args)
254 with self.assertRaises(argparse.ArgumentTypeError):
255 arghelper.check_args()
256
257 # Names file and command-line domain names are mutually exclusive
258 args = ['-O', '-o', '/dev/null']
259 arghelper = PrintArgHelper(self.logger)
260 arghelper.build_parser('print')
261 arghelper.parse_args(args)
262 with self.assertRaises(argparse.ArgumentTypeError):
263 arghelper.check_args()
264
265 # But this is allowed
266 args = ['-o', '/dev/null']
267 arghelper = PrintArgHelper(self.logger)
268 arghelper.build_parser('print')
269 arghelper.parse_args(args)
270 arghelper.check_args()
271
272 # So is this
273 args = ['-O']
274 arghelper = PrintArgHelper(self.logger)
275 arghelper.build_parser('print')
276 arghelper.parse_args(args)
277 arghelper.check_args()
278
279 if __name__ == '__main__':
280 unittest.main()
+0
-130
tests/dnsviz_print_run.py less more
0 import gzip
1 import io
2 import os
3 import subprocess
4 import tempfile
5 import unittest
6
7 DATA_DIR = os.path.dirname(__file__)
8 EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
9 EXAMPLE_RECURSIVE = os.path.join(DATA_DIR, 'data', 'example-recursive.json.gz')
10 ROOT_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'root-authoritative.json.gz')
11 ROOT_RECURSIVE = os.path.join(DATA_DIR, 'data', 'root-recursive.json.gz')
12
13 class DNSPrintRunTestCase(unittest.TestCase):
14 def setUp(self):
15 self.devnull = io.open('/dev/null', 'wb')
16 self.current_cwd = os.getcwd()
17 self.dnsviz_bin = os.path.join(self.current_cwd, 'bin', 'dnsviz')
18
19 tk = 'example.com. IN DNSKEY 256 3 7 AwEAAZ2YEuBl4X58v1CezDfZjT1viYn5kY3MF3lSDjvHjMZ6gJlYt4Qq oIdpChifmeJldEX9/wPc04Tg7MlEfV3m0x2j80dMyObM0FZTxzMgbTFk Zs0AWrDXELieGkFZv1FB9YoxSX2XqvpFxwvPyyszUtCy/c5hrb6vfKRB Jh+qIO+NsNrl6O8NiYjWWNjdiFw+c2BxzpArQoaA+rcoyDYwH4xGpvTw YLnE9HmkwTSQuwASkgWgX3KgTmsDEw4I0P5Tk+wvmNnaqDhmFMHJK5Oh 92wUX+ppxxSgUx4UIJmftzi7sCg0qekIYUf99Dkn7OlC8X0rjj+xO4cD hbTjGkxmsD0='
20
21 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.tk_file:
22 self.tk_file.write(tk.encode('utf-8'))
23
24 with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
25 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_auth_out:
26 self.example_auth_out.write(example_auth_in.read())
27
28 with gzip.open(EXAMPLE_RECURSIVE, 'rb') as example_rec_in:
29 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_rec_out:
30 self.example_rec_out.write(example_rec_in.read())
31
32 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.names_file:
33 self.names_file.write('example.com\nexample.net\n'.encode('utf-8'))
34
35 self.output = tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False)
36 self.output.close()
37
38 self.run_cwd = tempfile.mkdtemp(prefix='dnsviz')
39
40 def tearDown(self):
41 self.devnull.close()
42 os.remove(self.tk_file.name)
43 os.remove(self.example_auth_out.name)
44 os.remove(self.example_rec_out.name)
45 os.remove(self.names_file.name)
46 os.remove(self.output.name)
47 subprocess.check_call(['rm', '-rf', self.run_cwd])
48
49 def test_dnsviz_print_input(self):
50 with io.open(self.output.name, 'wb') as fh_out:
51 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
52 p = subprocess.Popen([self.dnsviz_bin, 'print'], stdin=subprocess.PIPE, stdout=fh_out)
53 p.communicate(fh_in.read())
54 self.assertEqual(p.returncode, 0)
55
56 with io.open(self.output.name, 'wb') as fh_out:
57 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
58 p = subprocess.Popen([self.dnsviz_bin, 'print', '-r', '-'], stdin=subprocess.PIPE, stdout=fh_out)
59 p.communicate(fh_in.read())
60 self.assertEqual(p.returncode, 0)
61
62 with io.open(self.output.name, 'wb') as fh:
63 self.assertEqual(subprocess.call([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name], stdout=fh), 0)
64
65 def test_dnsviz_print_names_input(self):
66 with io.open(self.output.name, 'wb') as fh:
67 self.assertEqual(subprocess.call([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name, '-f', self.names_file.name], stdout=fh), 0)
68
69 with io.open(self.output.name, 'wb') as fh_out:
70 with io.open(self.names_file.name, 'rb') as fh_in:
71 p = subprocess.Popen([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name, '-f', '-'], stdin=subprocess.PIPE, stdout=fh_out)
72 p.communicate(fh_in.read())
73 self.assertEqual(p.returncode, 0)
74
75 def test_dnsviz_print_tk_input(self):
76 with io.open(self.output.name, 'wb') as fh:
77 self.assertEqual(subprocess.call([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name, '-t', self.tk_file.name], stdout=fh), 0)
78
79 with io.open(self.output.name, 'wb') as fh_out:
80 with io.open(self.tk_file.name, 'rb') as fh_in:
81 p = subprocess.Popen([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name, '-t', '-'], stdin=subprocess.PIPE, stdout=fh_out)
82 p.communicate(fh_in.read())
83 self.assertEqual(p.returncode, 0)
84
85 def test_dnsviz_print_output(self):
86 with io.open(self.output.name, 'wb') as fh:
87 self.assertEqual(subprocess.call([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name], cwd=self.run_cwd, stdout=fh), 0)
88
89 with io.open(self.output.name, 'wb') as fh:
90 self.assertEqual(subprocess.call([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name, '-o', '-'], cwd=self.run_cwd, stdout=fh), 0)
91
92 with io.open(self.output.name, 'wb') as fh:
93 self.assertEqual(subprocess.call([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name, '-o', 'all.txt'], cwd=self.run_cwd, stdout=fh), 0)
94 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'all.txt')))
95 self.assertFalse(os.path.exists(os.path.join(self.run_cwd, 'example.com.txt')))
96 self.assertFalse(os.path.exists(os.path.join(self.run_cwd, 'example.net.txt')))
97
98 self.assertEqual(subprocess.call([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name, '-O'], cwd=self.run_cwd), 0)
99 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'example.com.txt')))
100 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'example.net.txt')))
101
102 def test_dnsviz_print_input_auth(self):
103 with io.open(self.output.name, 'wb') as fh_out:
104 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
105 p = subprocess.Popen([self.dnsviz_bin, 'print'], stdin=subprocess.PIPE, stdout=fh_out)
106 p.communicate(fh_in.read())
107 self.assertEqual(p.returncode, 0)
108
109 with io.open(self.output.name, 'wb') as fh_out:
110 with gzip.open(ROOT_AUTHORITATIVE) as fh_in:
111 p = subprocess.Popen([self.dnsviz_bin, 'print'], stdin=subprocess.PIPE, stdout=fh_out)
112 p.communicate(fh_in.read())
113 self.assertEqual(p.returncode, 0)
114
115 def test_dnsviz_print_input_rec(self):
116 with io.open(self.output.name, 'wb') as fh_out:
117 with gzip.open(EXAMPLE_RECURSIVE) as fh_in:
118 p = subprocess.Popen([self.dnsviz_bin, 'print'], stdin=subprocess.PIPE, stdout=fh_out)
119 p.communicate(fh_in.read())
120 self.assertEqual(p.returncode, 0)
121
122 with io.open(self.output.name, 'wb') as fh_out:
123 with gzip.open(ROOT_RECURSIVE) as fh_in:
124 p = subprocess.Popen([self.dnsviz_bin, 'print'], stdin=subprocess.PIPE, stdout=fh_out)
125 p.communicate(fh_in.read())
126 self.assertEqual(p.returncode, 0)
127
128 if __name__ == '__main__':
129 unittest.main()
+0
-1042
tests/dnsviz_probe_options.py less more
0 # -*- coding: utf-8 -*-
1
2 import argparse
3 import binascii
4 import gzip
5 import logging
6 import os
7 import subprocess
8 import tempfile
9 import unittest
10
11 import dns.name, dns.rdatatype, dns.rrset, dns.zone
12
13 from dnsviz.commands.probe import ZoneFileToServe, ArgHelper, DomainListArgHelper, StandardRecursiveQueryCD, WILDCARD_EXPLICIT_DELEGATION, AnalysisInputError, CustomQueryMixin
14 from dnsviz import transport
15 from dnsviz.resolver import Resolver
16 from dnsviz.ipaddr import IPAddr
17
18 DATA_DIR = os.path.dirname(__file__)
19 EXAMPLE_COM_ZONE = os.path.join(DATA_DIR, 'zone', 'example.com.zone')
20 EXAMPLE_COM_DELEGATION = os.path.join(DATA_DIR, 'zone', 'example.com.zone-delegation')
21 EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
22
23 class DNSVizProbeOptionsTestCase(unittest.TestCase):
24 def setUp(self):
25 self.tm = transport.DNSQueryTransportManager()
26 self.resolver = Resolver.from_file('/etc/resolv.conf', StandardRecursiveQueryCD, transport_manager=self.tm)
27 self.helper = DomainListArgHelper(self.resolver)
28 self.logger = logging.getLogger()
29 for handler in self.logger.handlers:
30 self.logger.removeHandler(handler)
31 self.logger.addHandler(logging.NullHandler())
32 try:
33 ArgHelper.bindable_ip('::1')
34 except argparse.ArgumentTypeError:
35 self.use_ipv6 = False
36 else:
37 self.use_ipv6 = True
38 self.first_port = ZoneFileToServe._next_free_port
39 self.custom_query_mixin_edns_options_orig = CustomQueryMixin.edns_options[:]
40
41 def tearDown(self):
42 CustomQueryMixin.edns_options = self.custom_query_mixin_edns_options_orig[:]
43 if self.tm is not None:
44 self.tm.close()
45
46 def test_authoritative_option(self):
47 arg1 = 'example.com+:ns1.example.com=192.0.2.1:1234,ns1.example.com=[2001:db8::1],' + \
48 'ns1.example.com=192.0.2.2,ns2.example.com=[2001:db8::2],a.root-servers.net,192.0.2.3'
49
50 arg1_with_spaces = ' example.com+ : ns1.example.com = [192.0.2.1]:1234 , ns1.example.com = [2001:db8::1], ' + \
51 'ns1.example.com = [192.0.2.2] , ns2.example.com = [2001:db8::2] , a.root-servers.net , 192.0.2.3 '
52
53 arg2 = 'example.com:ns1.example.com=192.0.2.1'
54
55 arg3 = 'example.com:%s' % EXAMPLE_COM_ZONE
56
57 arg4 = 'example.com+:%s' % EXAMPLE_COM_ZONE
58
59 delegation_mapping1 = {
60 (dns.name.from_text('example.com'), dns.rdatatype.NS):
61 dns.rrset.from_text_list(dns.name.from_text('example.com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
62 ['ns1.example.com', 'ns2.example.com', 'a.root-servers.net', 'ns1._dnsviz.example.com']),
63 (dns.name.from_text('ns1.example.com'), dns.rdatatype.A):
64 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.A,
65 ['192.0.2.1', '192.0.2.2']),
66 (dns.name.from_text('ns1.example.com'), dns.rdatatype.AAAA):
67 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
68 ['2001:db8::1']),
69 (dns.name.from_text('ns1._dnsviz.example.com'), dns.rdatatype.A):
70 dns.rrset.from_text_list(dns.name.from_text('ns1._dnsviz.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.A,
71 ['192.0.2.3']),
72 (dns.name.from_text('ns2.example.com'), dns.rdatatype.AAAA):
73 dns.rrset.from_text_list(dns.name.from_text('ns2.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
74 ['2001:db8::2']),
75 (dns.name.from_text('a.root-servers.net'), dns.rdatatype.A):
76 dns.rrset.from_text_list(dns.name.from_text('a.root-servers.net'), 0, dns.rdataclass.IN, dns.rdatatype.A,
77 ['198.41.0.4']),
78 (dns.name.from_text('a.root-servers.net'), dns.rdatatype.AAAA):
79 dns.rrset.from_text_list(dns.name.from_text('a.root-servers.net'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
80 ['2001:503:ba3e::2:30'])
81 }
82 stop_at1 = True
83 odd_ports1 = { (dns.name.from_text('example.com'), IPAddr('192.0.2.1')): 1234 }
84 zone_filename1 = None
85
86 delegation_mapping2 = {
87 (dns.name.from_text('example.com'), dns.rdatatype.NS):
88 dns.rrset.from_text_list(dns.name.from_text('example.com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
89 ['ns1.example.com']),
90 (dns.name.from_text('ns1.example.com'), dns.rdatatype.A):
91 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.A,
92 ['192.0.2.1'])
93 }
94 stop_at2 = False
95 odd_ports2 = {}
96 zone_filename2 = None
97
98 delegation_mapping3 = {
99 (dns.name.from_text('example.com'), dns.rdatatype.NS):
100 dns.rrset.from_text_list(dns.name.from_text('example.com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
101 []),
102 }
103 stop_at3 = False
104 odd_ports3 = {}
105 zone_filename3 = EXAMPLE_COM_ZONE
106
107 delegation_mapping4 = {
108 (dns.name.from_text('example.com'), dns.rdatatype.NS):
109 dns.rrset.from_text_list(dns.name.from_text('example.com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
110 []),
111 }
112 stop_at4 = True
113 odd_ports4 = {}
114 zone_filename4 = EXAMPLE_COM_ZONE
115
116 obj = self.helper.authoritative_name_server_mappings(arg1)
117 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
118 self.assertEqual(obj.delegation_mapping, delegation_mapping1)
119 self.assertEqual(obj.stop_at, stop_at1)
120 self.assertEqual(obj.odd_ports, odd_ports1)
121 self.assertEqual(obj.filename, zone_filename1)
122
123 obj = self.helper.authoritative_name_server_mappings(arg1_with_spaces)
124 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
125 self.assertEqual(obj.delegation_mapping, delegation_mapping1)
126 self.assertEqual(obj.stop_at, stop_at1)
127 self.assertEqual(obj.odd_ports, odd_ports1)
128 self.assertEqual(obj.filename, zone_filename1)
129
130 obj = self.helper.authoritative_name_server_mappings(arg2)
131 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
132 self.assertEqual(obj.delegation_mapping, delegation_mapping2)
133 self.assertEqual(obj.stop_at, stop_at2)
134 self.assertEqual(obj.odd_ports, odd_ports2)
135 self.assertEqual(obj.filename, zone_filename2)
136
137 obj = self.helper.authoritative_name_server_mappings(arg3)
138 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
139 self.assertEqual(obj.delegation_mapping, delegation_mapping3)
140 self.assertEqual(obj.stop_at, stop_at3)
141 self.assertEqual(obj.odd_ports, odd_ports3)
142 self.assertEqual(obj.filename, zone_filename3)
143
144 obj = self.helper.authoritative_name_server_mappings(arg4)
145 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
146 self.assertEqual(obj.delegation_mapping, delegation_mapping4)
147 self.assertEqual(obj.stop_at, stop_at4)
148 self.assertEqual(obj.odd_ports, odd_ports4)
149 self.assertEqual(obj.filename, zone_filename4)
150
151 def test_authoritative_errors(self):
152 # no mapping
153 arg = 'example.com'
154 with self.assertRaises(argparse.ArgumentTypeError):
155 self.helper.authoritative_name_server_mappings(arg)
156
157 # bad domain name
158 arg = 'example.com:ns1..foo.com'
159 with self.assertRaises(argparse.ArgumentTypeError):
160 self.helper.authoritative_name_server_mappings(arg)
161
162 # bad IPv4 address
163 arg = 'example.com:ns1.foo.com=192'
164 with self.assertRaises(argparse.ArgumentTypeError):
165 self.helper.authoritative_name_server_mappings(arg)
166
167 # Bad IPv6 address
168 arg = 'example.com:ns1.foo.com=2001:db8'
169 with self.assertRaises(argparse.ArgumentTypeError):
170 self.helper.authoritative_name_server_mappings(arg)
171
172 # IPv6 address needs brackets (IP valid even with port stripped)
173 arg = 'example.com:ns1.foo.com=2001:db8::1:3'
174 with self.assertRaises(argparse.ArgumentTypeError):
175 self.helper.authoritative_name_server_mappings(arg)
176
177 # IPv6 address needs brackets (IP invalid with port stripped)
178 arg = 'example.com:ns1.foo.com=2001:db8::3'
179 with self.assertRaises(argparse.ArgumentTypeError):
180 self.helper.authoritative_name_server_mappings(arg)
181
182 # Name does not resolve properly
183 arg = 'example.com:ns1.does-not-exist-foo-bar-baz-123-abc-dnsviz.net'
184 with self.assertRaises(argparse.ArgumentTypeError):
185 self.helper.authoritative_name_server_mappings(arg)
186
187 def test_delegation_option(self):
188 arg1 = 'example.com:ns1.example.com=192.0.2.1:1234,ns1.example.com=[2001:db8::1],' + \
189 'ns1.example.com=192.0.2.2,ns2.example.com=[2001:db8::2]'
190
191 arg1_with_spaces = ' example.com : ns1.example.com = [192.0.2.1]:1234 , ns1.example.com = [2001:db8::1], ' + \
192 'ns1.example.com = [192.0.2.2] , ns2.example.com = [2001:db8::2] '
193
194 arg2 = 'example.com:%s' % EXAMPLE_COM_DELEGATION
195
196 delegation_mapping1 = {
197 (dns.name.from_text('example.com'), dns.rdatatype.NS):
198 dns.rrset.from_text_list(dns.name.from_text('example.com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
199 ['ns1.example.com', 'ns2.example.com']),
200 (dns.name.from_text('ns1.example.com'), dns.rdatatype.A):
201 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.A,
202 ['192.0.2.1', '192.0.2.2']),
203 (dns.name.from_text('ns1.example.com'), dns.rdatatype.AAAA):
204 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
205 ['2001:db8::1']),
206 (dns.name.from_text('ns2.example.com'), dns.rdatatype.AAAA):
207 dns.rrset.from_text_list(dns.name.from_text('ns2.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
208 ['2001:db8::2']),
209 }
210 stop_at1 = False
211 odd_ports1 = { (dns.name.from_text('example.com'), IPAddr('192.0.2.1')): 1234 }
212
213 delegation_mapping2 = {
214 (dns.name.from_text('example.com'), dns.rdatatype.NS):
215 dns.rrset.from_text_list(dns.name.from_text('example.com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
216 ['ns1.example.com']),
217 (dns.name.from_text('ns1.example.com'), dns.rdatatype.A):
218 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.A,
219 ['127.0.0.1'])
220 }
221 stop_at2 = False
222 odd_ports2 = {}
223
224 obj = self.helper.delegation_name_server_mappings(arg1)
225 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
226 self.assertEqual(obj.delegation_mapping, delegation_mapping1)
227 self.assertEqual(obj.stop_at, stop_at1)
228 self.assertEqual(obj.odd_ports, odd_ports1)
229
230 obj = self.helper.delegation_name_server_mappings(arg1_with_spaces)
231 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
232 self.assertEqual(obj.delegation_mapping, delegation_mapping1)
233 self.assertEqual(obj.stop_at, stop_at1)
234 self.assertEqual(obj.odd_ports, odd_ports1)
235
236 obj = self.helper.delegation_name_server_mappings(arg2)
237 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
238 self.assertEqual(obj.delegation_mapping, delegation_mapping2)
239 self.assertEqual(obj.stop_at, stop_at2)
240 self.assertEqual(obj.odd_ports, odd_ports2)
241
242 def test_delegation_errors(self):
243 # all the authoritative error tests as well
244
245 # requires name=addr mapping
246 arg = 'example.com:ns1.example.com'
247 with self.assertRaises(argparse.ArgumentTypeError):
248 self.helper.delegation_name_server_mappings(arg)
249
250 # requires name=addr mapping
251 arg = 'example.com:192.0.2.1'
252 with self.assertRaises(argparse.ArgumentTypeError):
253 self.helper.delegation_name_server_mappings(arg)
254
255 # doesn't allow +
256 arg = 'example.com+:ns1.example.com=192.0.2.1'
257 with self.assertRaises(argparse.ArgumentTypeError):
258 self.helper.delegation_name_server_mappings(arg)
259
260 # can't do this for root domain
261 arg = '.:ns1.example.com=192.0.2.1'
262 with self.assertRaises(argparse.ArgumentTypeError):
263 self.helper.delegation_name_server_mappings(arg)
264
265 def test_recursive_option(self):
266 arg1 = 'ns1.example.com=192.0.2.1:1234,ns1.example.com=[2001:db8::1],' + \
267 'ns1.example.com=192.0.2.2,ns2.example.com=[2001:db8::2],a.root-servers.net'
268
269 arg1_with_spaces = ' ns1.example.com = [192.0.2.1]:1234 , ns1.example.com = [2001:db8::1], ' + \
270 'ns1.example.com = [192.0.2.2] , ns2.example.com = [2001:db8::2] , a.root-servers.net '
271
272 delegation_mapping1 = {
273 (WILDCARD_EXPLICIT_DELEGATION, dns.rdatatype.NS):
274 dns.rrset.from_text_list(WILDCARD_EXPLICIT_DELEGATION, 0, dns.rdataclass.IN, dns.rdatatype.NS,
275 ['ns1.example.com', 'ns2.example.com', 'a.root-servers.net']),
276 (dns.name.from_text('ns1.example.com'), dns.rdatatype.A):
277 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.A,
278 ['192.0.2.1', '192.0.2.2']),
279 (dns.name.from_text('ns1.example.com'), dns.rdatatype.AAAA):
280 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
281 ['2001:db8::1']),
282 (dns.name.from_text('ns2.example.com'), dns.rdatatype.AAAA):
283 dns.rrset.from_text_list(dns.name.from_text('ns2.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
284 ['2001:db8::2']),
285 (dns.name.from_text('a.root-servers.net'), dns.rdatatype.A):
286 dns.rrset.from_text_list(dns.name.from_text('a.root-servers.net'), 0, dns.rdataclass.IN, dns.rdatatype.A,
287 ['198.41.0.4']),
288 (dns.name.from_text('a.root-servers.net'), dns.rdatatype.AAAA):
289 dns.rrset.from_text_list(dns.name.from_text('a.root-servers.net'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
290 ['2001:503:ba3e::2:30'])
291 }
292 stop_at1 = False
293 odd_ports1 = { (WILDCARD_EXPLICIT_DELEGATION, IPAddr('192.0.2.1')): 1234 }
294
295 obj = self.helper.recursive_servers_for_domain(arg1)
296 self.assertEqual(obj.domain, WILDCARD_EXPLICIT_DELEGATION)
297 self.assertEqual(obj.delegation_mapping, delegation_mapping1)
298 self.assertEqual(obj.stop_at, stop_at1)
299 self.assertEqual(obj.odd_ports, odd_ports1)
300
301 obj = self.helper.recursive_servers_for_domain(arg1_with_spaces)
302 self.assertEqual(obj.domain, WILDCARD_EXPLICIT_DELEGATION)
303 self.assertEqual(obj.delegation_mapping, delegation_mapping1)
304 self.assertEqual(obj.stop_at, stop_at1)
305 self.assertEqual(obj.odd_ports, odd_ports1)
306
307 def test_recursive_errors(self):
308 # all the authoritative error tests as well
309
310 # doesn't accept file
311 arg = EXAMPLE_COM_DELEGATION
312 with self.assertRaises(argparse.ArgumentTypeError):
313 self.helper.recursive_servers_for_domain(arg)
314
315 def test_ds_option(self):
316 arg1 = 'example.com:34983 10 1 EC358CFAAEC12266EF5ACFC1FEAF2CAFF083C418,' + \
317 '34983 10 2 608D3B089D79D554A1947BD10BEC0A5B1BDBE67B4E60E34B1432ED00 33F24B49'
318
319 delegation_mapping1 = {
320 (dns.name.from_text('example.com'), dns.rdatatype.DS):
321 dns.rrset.from_text_list(dns.name.from_text('example.com'), 0, dns.rdataclass.IN, dns.rdatatype.DS,
322 ['34983 10 1 EC358CFAAEC12266EF5ACFC1FEAF2CAFF083C418',
323 '34983 10 2 608D3B089D79D554A1947BD10BEC0A5B1BDBE67B4E60E34B1432ED00 33F24B49'])
324 }
325
326 arg1_with_spaces = ' example.com : 34983 10 1 EC358CFAAEC12266EF5ACFC1FEAF2CAFF083C418, ' + \
327 ' 34983 10 2 608D3B089D79D554A1947BD10BEC0A5B1BDBE67B4E60E34B1432ED00 33F24B49 '
328
329 arg2 = 'example.com:%s' % EXAMPLE_COM_DELEGATION
330
331 delegation_mapping2 = {
332 (dns.name.from_text('example.com'), dns.rdatatype.DS):
333 dns.rrset.from_text_list(dns.name.from_text('example.com'), 0, dns.rdataclass.IN, dns.rdatatype.DS,
334 ['34983 10 1 EC358CFAAEC12266EF5ACFC1FEAF2CAFF083C418',
335 '34983 10 2 608D3B089D79D554A1947BD10BEC0A5B1BDBE67B4E60E34B1432ED00 33F24B49'])
336 }
337
338
339 obj = self.helper.ds_for_domain(arg1)
340 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
341 self.assertEqual(obj.delegation_mapping, delegation_mapping1)
342
343 obj = self.helper.ds_for_domain(arg1_with_spaces)
344 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
345 self.assertEqual(obj.delegation_mapping, delegation_mapping1)
346
347 obj = self.helper.ds_for_domain(arg2)
348 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
349 self.assertEqual(obj.delegation_mapping, delegation_mapping2)
350
351 def test_ds_error(self):
352 # bad DS record
353 arg = 'example.com:blah'
354 with self.assertRaises(argparse.ArgumentTypeError):
355 obj = self.helper.ds_for_domain(arg)
356
357 def test_positive_int(self):
358 self.assertEqual(ArgHelper.positive_int('1'), 1)
359 self.assertEqual(ArgHelper.positive_int('2'), 2)
360
361 # zero
362 with self.assertRaises(argparse.ArgumentTypeError):
363 ArgHelper.positive_int('0')
364
365 # negative
366 with self.assertRaises(argparse.ArgumentTypeError):
367 ArgHelper.positive_int('-1')
368
369 def test_bindable_ip(self):
370 self.assertEqual(ArgHelper.bindable_ip('127.0.0.1'), IPAddr('127.0.0.1'))
371 if self.use_ipv6:
372 self.assertEqual(ArgHelper.bindable_ip('::1'), IPAddr('::1'))
373
374 # invalid IPv4 address
375 with self.assertRaises(argparse.ArgumentTypeError):
376 ArgHelper.bindable_ip('192.')
377
378 # invalid IPv6 address
379 with self.assertRaises(argparse.ArgumentTypeError):
380 ArgHelper.bindable_ip('2001:')
381
382 # invalid IPv4 to bind to
383 with self.assertRaises(argparse.ArgumentTypeError):
384 ArgHelper.bindable_ip('192.0.2.1')
385
386 # invalid IPv6 to bind to
387 with self.assertRaises(argparse.ArgumentTypeError):
388 ArgHelper.bindable_ip('2001:db8::1')
389
390 def test_valid_url(self):
391 url1 = 'http://www.example.com/foo'
392 url2 = 'https://www.example.com/foo'
393 url3 = 'ws:///path/to/file'
394 url4 = 'ssh://user@example.com/foo'
395
396 self.assertEqual(ArgHelper.valid_url(url1), url1)
397 self.assertEqual(ArgHelper.valid_url(url2), url2)
398 self.assertEqual(ArgHelper.valid_url(url3), url3)
399 self.assertEqual(ArgHelper.valid_url(url4), url4)
400
401 # invalid schema
402 with self.assertRaises(argparse.ArgumentTypeError):
403 ArgHelper.valid_url('ftp://www.example.com/foo')
404
405 # ws with hostname
406 with self.assertRaises(argparse.ArgumentTypeError):
407 ArgHelper.valid_url('ws://www.example.com/foo')
408
409 def test_rrtype_list(self):
410 arg1 = 'A,AAAA,MX,CNAME'
411 arg1_with_spaces = ' A , AAAA , MX , CNAME '
412 arg2 = 'A'
413 arg3 = 'A,BLAH'
414 arg4_empty = ''
415 arg4_empty_spaces = ' '
416
417 type_list1 = [dns.rdatatype.A, dns.rdatatype.AAAA, dns.rdatatype.MX, dns.rdatatype.CNAME]
418 type_list2 = [dns.rdatatype.A]
419 empty_list = []
420
421 self.assertEqual(ArgHelper.comma_separated_dns_types(arg1), type_list1)
422 self.assertEqual(ArgHelper.comma_separated_dns_types(arg1_with_spaces), type_list1)
423 self.assertEqual(ArgHelper.comma_separated_dns_types(arg4_empty), empty_list)
424 self.assertEqual(ArgHelper.comma_separated_dns_types(arg4_empty_spaces), empty_list)
425
426 # invalid schema
427 with self.assertRaises(argparse.ArgumentTypeError):
428 ArgHelper.comma_separated_dns_types(arg3)
429
430 def test_valid_domain_name(self):
431 arg1 = '.'
432 arg2 = 'www.example.com'
433 arg3 = 'www..example.com'
434
435 self.assertEqual(ArgHelper.valid_domain_name(arg1), dns.name.from_text(arg1))
436 self.assertEqual(ArgHelper.valid_domain_name(arg2), dns.name.from_text(arg2))
437
438 # invalid domain name
439 with self.assertRaises(argparse.ArgumentTypeError):
440 ArgHelper.valid_domain_name(arg3)
441
442 def test_nsid_option(self):
443 self.assertEqual(ArgHelper.nsid_option(), dns.edns.GenericOption(3, b''))
444
445 def test_ecs_option(self):
446 arg1 = '192.0.2.0'
447 arg2 = '192.0.2.0/25'
448 arg3 = '192.0.2.255/25'
449 arg4 = '192.0.2.0/24'
450 arg5 = '2001:db8::'
451 arg6 = '2001:db8::/121'
452 arg7 = '2001:db8::ff/121'
453 arg8 = '2001:db8::/120'
454
455
456 ecs_option1 = dns.edns.GenericOption(8, binascii.unhexlify('00012000c0000200'))
457 ecs_option2 = dns.edns.GenericOption(8, binascii.unhexlify('00011900c0000200'))
458 ecs_option3 = dns.edns.GenericOption(8, binascii.unhexlify('00011900c0000280'))
459 ecs_option4 = dns.edns.GenericOption(8, binascii.unhexlify('00011800c00002'))
460 ecs_option5 = dns.edns.GenericOption(8, binascii.unhexlify('0002800020010db8000000000000000000000000'))
461 ecs_option6 = dns.edns.GenericOption(8, binascii.unhexlify('0002790020010db8000000000000000000000000'))
462 ecs_option7 = dns.edns.GenericOption(8, binascii.unhexlify('0002790020010db8000000000000000000000080'))
463 ecs_option8 = dns.edns.GenericOption(8, binascii.unhexlify('0002780020010db80000000000000000000000'))
464
465 self.assertEqual(ArgHelper.ecs_option(arg1), ecs_option1)
466 self.assertEqual(ArgHelper.ecs_option(arg2), ecs_option2)
467 self.assertEqual(ArgHelper.ecs_option(arg3), ecs_option3)
468 self.assertEqual(ArgHelper.ecs_option(arg4), ecs_option4)
469 self.assertEqual(ArgHelper.ecs_option(arg5), ecs_option5)
470 self.assertEqual(ArgHelper.ecs_option(arg6), ecs_option6)
471 self.assertEqual(ArgHelper.ecs_option(arg7), ecs_option7)
472 self.assertEqual(ArgHelper.ecs_option(arg8), ecs_option8)
473
474 # invalid IP address
475 with self.assertRaises(argparse.ArgumentTypeError):
476 ArgHelper.ecs_option('192')
477
478 # invalid length
479 with self.assertRaises(argparse.ArgumentTypeError):
480 ArgHelper.ecs_option('192.0.2.0/foo')
481
482 # invalid length
483 with self.assertRaises(argparse.ArgumentTypeError):
484 ArgHelper.ecs_option('192.0.2.0/33')
485
486 # invalid length
487 with self.assertRaises(argparse.ArgumentTypeError):
488 ArgHelper.ecs_option('2001:db8::/129')
489
490 def test_cookie_option(self):
491 arg1 = '0102030405060708'
492 arg2 = ''
493
494 cookie_option1 = dns.edns.GenericOption(10, binascii.unhexlify('0102030405060708'))
495 cookie_option2 = None
496
497 self.assertEqual(ArgHelper.dns_cookie_option(arg1), cookie_option1)
498 self.assertEqual(ArgHelper.dns_cookie_option(arg2), None)
499
500 self.assertIsInstance(ArgHelper.dns_cookie_rand(), dns.edns.GenericOption)
501
502 # too short
503 with self.assertRaises(argparse.ArgumentTypeError):
504 ArgHelper.dns_cookie_option('01')
505
506 # too long
507 with self.assertRaises(argparse.ArgumentTypeError):
508 ArgHelper.dns_cookie_option('010203040506070809')
509
510 # non-hexadecimal
511 with self.assertRaises(argparse.ArgumentTypeError):
512 ArgHelper.dns_cookie_option('010203040506070h')
513
514 def test_delegation_aggregation(self):
515 args1 = ['-A', '-N', 'example.com:ns1.example.com=192.0.2.1,ns1.example.com=[2001:db8::1]',
516 '-N', 'example.com:ns1.example.com=192.0.2.4',
517 '-N', 'example.com:ns2.example.com=192.0.2.2',
518 '-N', 'example.com:ns3.example.com=192.0.2.3']
519 args2 = ['-A', '-N', 'example.com:ns1.example.com=192.0.2.1',
520 '-D', 'example.com:34983 10 1 EC358CFAAEC12266EF5ACFC1FEAF2CAFF083C418',
521 '-D', 'example.com:34983 10 2 608D3B089D79D554A1947BD10BEC0A5B1BDBE67B4E60E34B1432ED00 33F24B49']
522 args3 = ['-A', '-N', 'example.com:ns1.example.com=192.0.2.1',
523 '-N', 'example1.com:ns1.example1.com=192.0.2.2']
524 args4 = ['-A', '-N', 'example.com:ns1.example.com=192.0.2.1',
525 '-N', 'example.net:ns1.example.net=192.0.2.2']
526
527 explicit_delegations1 = {
528 (dns.name.from_text('com'), dns.rdatatype.NS):
529 dns.rrset.from_text_list(dns.name.from_text('com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
530 ['localhost']),
531 }
532 explicit_delegations2 = {
533 (dns.name.from_text('com'), dns.rdatatype.NS):
534 dns.rrset.from_text_list(dns.name.from_text('com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
535 ['localhost']),
536 }
537 explicit_delegations3 = {
538 (dns.name.from_text('com'), dns.rdatatype.NS):
539 dns.rrset.from_text_list(dns.name.from_text('com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
540 ['localhost']),
541 }
542 explicit_delegations4 = {
543 (dns.name.from_text('com'), dns.rdatatype.NS):
544 dns.rrset.from_text_list(dns.name.from_text('com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
545 ['localhost']),
546 (dns.name.from_text('net'), dns.rdatatype.NS):
547 dns.rrset.from_text_list(dns.name.from_text('net'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
548 ['localhost']),
549 }
550
551 for ex in (explicit_delegations1, explicit_delegations2, explicit_delegations3, explicit_delegations4):
552 if self.use_ipv6:
553 ex[(dns.name.from_text('localhost'), dns.rdatatype.AAAA)] = \
554 dns.rrset.from_text_list(dns.name.from_text('localhost'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
555 ['::1'])
556 loopback_ip = IPAddr('::1')
557 else:
558 ex[(dns.name.from_text('localhost'), dns.rdatatype.A)] = \
559 dns.rrset.from_text_list(dns.name.from_text('localhost'), 0, dns.rdataclass.IN, dns.rdatatype.A,
560 ['127.0.0.1'])
561 loopback_ip = IPAddr('127.0.0.1')
562
563 odd_ports1 = { (dns.name.from_text('com'), loopback_ip): self.first_port }
564 odd_ports2 = { (dns.name.from_text('com'), loopback_ip): self.first_port }
565 odd_ports3 = { (dns.name.from_text('com'), loopback_ip): self.first_port }
566 odd_ports4 = {
567 (dns.name.from_text('com'), loopback_ip): self.first_port,
568 (dns.name.from_text('net'), loopback_ip): self.first_port + 1,
569 }
570
571 if self.use_ipv6:
572 rdata = b'AAAA ::1'
573 else:
574 rdata = b'A 127.0.0.1'
575
576 zone_contents1 = b'''@ 600 IN SOA localhost. root.localhost. 1 1800 900 86400 600
577 @ 600 IN NS @
578 @ 600 IN ''' + rdata + \
579 b'''
580 example 0 IN NS ns1.example
581 example 0 IN NS ns2.example
582 example 0 IN NS ns3.example
583 ns1.example 0 IN A 192.0.2.1
584 ns1.example 0 IN A 192.0.2.4
585 ns1.example 0 IN AAAA 2001:db8::1
586 ns2.example 0 IN A 192.0.2.2
587 ns3.example 0 IN A 192.0.2.3
588 '''
589 zone_contents2 = b'''@ 600 IN SOA localhost. root.localhost. 1 1800 900 86400 600
590 @ 600 IN NS @
591 @ 600 IN ''' + rdata + \
592 b'''
593 example 0 IN DS 34983 10 1 ec358cfaaec12266ef5acfc1feaf2caff083c418
594 example 0 IN DS 34983 10 2 608d3b089d79d554a1947bd10bec0a5b1bdbe67b4e60e34b1432ed0033f24b49
595 example 0 IN NS ns1.example
596 ns1.example 0 IN A 192.0.2.1
597 '''
598
599 ZoneFileToServe._next_free_port = self.first_port
600
601 arghelper1 = ArgHelper(self.resolver, self.logger)
602 arghelper1.build_parser('probe')
603 arghelper1.parse_args(args1)
604 arghelper1.aggregate_delegation_info()
605 zone_to_serve = arghelper1._zones_to_serve[0]
606 zone_obj = dns.zone.from_file(zone_to_serve.filename, dns.name.from_text('com'))
607 zone_obj_other = dns.zone.from_text(zone_contents1, dns.name.from_text('com'))
608 self.assertEqual(zone_obj, zone_obj_other)
609 self.assertEqual(arghelper1.explicit_delegations, explicit_delegations1)
610 self.assertEqual(arghelper1.odd_ports, odd_ports1)
611
612 ZoneFileToServe._next_free_port = self.first_port
613
614 arghelper2 = ArgHelper(self.resolver, self.logger)
615 arghelper2.build_parser('probe')
616 arghelper2.parse_args(args2)
617 arghelper2.aggregate_delegation_info()
618 zone_to_serve = arghelper2._zones_to_serve[0]
619 zone_obj = dns.zone.from_file(zone_to_serve.filename, dns.name.from_text('com'))
620 zone_obj_other = dns.zone.from_text(zone_contents2, dns.name.from_text('com'))
621 self.assertEqual(zone_obj, zone_obj_other)
622 self.assertEqual(arghelper2.explicit_delegations, explicit_delegations2)
623 self.assertEqual(arghelper2.odd_ports, odd_ports2)
624
625 ZoneFileToServe._next_free_port = self.first_port
626
627 arghelper3 = ArgHelper(self.resolver, self.logger)
628 arghelper3.build_parser('probe')
629 arghelper3.parse_args(args3)
630 arghelper3.aggregate_delegation_info()
631 self.assertEqual(arghelper3.explicit_delegations, explicit_delegations3)
632 self.assertEqual(arghelper3.odd_ports, odd_ports3)
633
634 ZoneFileToServe._next_free_port = self.first_port
635
636 arghelper4 = ArgHelper(self.resolver, self.logger)
637 arghelper4.build_parser('probe')
638 arghelper4.parse_args(args4)
639 arghelper4.aggregate_delegation_info()
640 self.assertEqual(arghelper4.explicit_delegations, explicit_delegations4)
641 self.assertEqual(arghelper4.odd_ports, odd_ports4)
642
643 def test_delegation_authoritative_aggregation(self):
644 args1 = ['-A', '-N', 'example.com:ns1.example.com=192.0.2.1,ns1.example.com=[2001:db8::1]',
645 '-x', 'foo.com:ns1.foo.com=192.0.2.3:50503']
646
647 explicit_delegations1 = {
648 (dns.name.from_text('com'), dns.rdatatype.NS):
649 dns.rrset.from_text_list(dns.name.from_text('com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
650 ['localhost']),
651 (dns.name.from_text('foo.com'), dns.rdatatype.NS):
652 dns.rrset.from_text_list(dns.name.from_text('foo.com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
653 ['ns1.foo.com']),
654 (dns.name.from_text('ns1.foo.com'), dns.rdatatype.A):
655 dns.rrset.from_text_list(dns.name.from_text('ns1.foo.com'), 0, dns.rdataclass.IN, dns.rdatatype.A,
656 ['192.0.2.3']),
657 }
658
659 for ex in (explicit_delegations1,):
660 if self.use_ipv6:
661 ex[(dns.name.from_text('localhost'), dns.rdatatype.AAAA)] = \
662 dns.rrset.from_text_list(dns.name.from_text('localhost'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
663 ['::1'])
664 loopback_ip = IPAddr('::1')
665 else:
666 ex[(dns.name.from_text('localhost'), dns.rdatatype.A)] = \
667 dns.rrset.from_text_list(dns.name.from_text('localhost'), 0, dns.rdataclass.IN, dns.rdatatype.A,
668 ['127.0.0.1'])
669 loopback_ip = IPAddr('127.0.0.1')
670
671 odd_ports1 = { (dns.name.from_text('com'), loopback_ip): self.first_port,
672 (dns.name.from_text('foo.com'), IPAddr('192.0.2.3')): 50503,
673 }
674
675 ZoneFileToServe._next_free_port = self.first_port
676
677 arghelper1 = ArgHelper(self.resolver, self.logger)
678 arghelper1.build_parser('probe')
679 arghelper1.parse_args(args1)
680 arghelper1.aggregate_delegation_info()
681 self.assertEqual(arghelper1.explicit_delegations, explicit_delegations1)
682 self.assertEqual(arghelper1.odd_ports, odd_ports1)
683
684 def test_delegation_authoritative_aggregation_errors(self):
685 args1 = ['-A', '-N', 'example.com:ns1.example.com=192.0.2.1,ns1.example.com=[2001:db8::1]',
686 '-x', 'com:ns1.foo.com=192.0.2.3']
687
688 arghelper1 = ArgHelper(self.resolver, self.logger)
689 arghelper1.build_parser('probe')
690 arghelper1.parse_args(args1)
691
692 # com is specified with -x but example.com is specified with -N
693 with self.assertRaises(argparse.ArgumentTypeError):
694 arghelper1.aggregate_delegation_info()
695
696 def test_recursive_aggregation(self):
697 args1 = ['-s', 'ns1.example.com=192.0.2.1,ns1.example.com=[2001:db8::1]',
698 '-s', 'ns1.example.com=192.0.2.4,a.root-servers.net']
699
700 explicit_delegations1 = {
701 (WILDCARD_EXPLICIT_DELEGATION, dns.rdatatype.NS):
702 dns.rrset.from_text_list(WILDCARD_EXPLICIT_DELEGATION, 0, dns.rdataclass.IN, dns.rdatatype.NS,
703 ['ns1.example.com', 'a.root-servers.net']),
704 (dns.name.from_text('ns1.example.com'), dns.rdatatype.A):
705 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.A,
706 ['192.0.2.1', '192.0.2.4']),
707 (dns.name.from_text('ns1.example.com'), dns.rdatatype.AAAA):
708 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
709 ['2001:db8::1']),
710 (dns.name.from_text('a.root-servers.net'), dns.rdatatype.A):
711 dns.rrset.from_text_list(dns.name.from_text('a.root-servers.net'), 0, dns.rdataclass.IN, dns.rdatatype.A,
712 ['198.41.0.4']),
713 (dns.name.from_text('a.root-servers.net'), dns.rdatatype.AAAA):
714 dns.rrset.from_text_list(dns.name.from_text('a.root-servers.net'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
715 ['2001:503:ba3e::2:30'])
716 }
717
718 odd_ports1 = {}
719
720 arghelper1 = ArgHelper(self.resolver, self.logger)
721 arghelper1.build_parser('probe')
722 arghelper1.parse_args(args1)
723 arghelper1.aggregate_delegation_info()
724 self.assertEqual(arghelper1.explicit_delegations, explicit_delegations1)
725 self.assertEqual(arghelper1.odd_ports, odd_ports1)
726
727 def test_option_combination_errors(self):
728
729 # Names, input file, or names file required
730 args = []
731 arghelper = ArgHelper(self.resolver, self.logger)
732 arghelper.build_parser('probe')
733 arghelper.parse_args(args)
734 with self.assertRaises(argparse.ArgumentTypeError):
735 arghelper.check_args()
736
737 # Names file and command-line domain names are mutually exclusive
738 args = ['-f', '/dev/null', 'example.com']
739 arghelper = ArgHelper(self.resolver, self.logger)
740 arghelper.build_parser('probe')
741 arghelper.parse_args(args)
742 with self.assertRaises(argparse.ArgumentTypeError):
743 arghelper.check_args()
744 arghelper.args.names_file.close()
745
746 # Authoritative analysis and recursive servers
747 args = ['-A', '-s', '192.0.2.1', 'example.com']
748 arghelper = ArgHelper(self.resolver, self.logger)
749 arghelper.build_parser('probe')
750 arghelper.parse_args(args)
751 with self.assertRaises(argparse.ArgumentTypeError):
752 arghelper.check_args()
753
754 # Authoritative servers with recursive analysis
755 args = ['-x', 'example.com:ns1.example.com=192.0.2.1', 'example.com']
756 arghelper = ArgHelper(self.resolver, self.logger)
757 arghelper.build_parser('probe')
758 arghelper.parse_args(args)
759 with self.assertRaises(argparse.ArgumentTypeError):
760 arghelper.check_args()
761
762 # Delegation information with recursive analysis
763 args = ['-N', 'example.com:ns1.example.com=192.0.2.1', 'example.com']
764 arghelper = ArgHelper(self.resolver, self.logger)
765 arghelper.build_parser('probe')
766 arghelper.parse_args(args)
767 with self.assertRaises(argparse.ArgumentTypeError):
768 arghelper.check_args()
769
770 # Delegation information with recursive analysis
771 args = [ '-D', 'example.com:34983 10 1 EC358CFAAEC12266EF5ACFC1FEAF2CAFF083C418', 'example.com']
772 arghelper = ArgHelper(self.resolver, self.logger)
773 arghelper.build_parser('probe')
774 arghelper.parse_args(args)
775 with self.assertRaises(argparse.ArgumentTypeError):
776 arghelper.check_args()
777
778 def test_ceiling(self):
779 args = ['-a', 'com', 'example.com']
780 arghelper = ArgHelper(self.resolver, self.logger)
781 arghelper.build_parser('probe')
782 arghelper.parse_args(args)
783 arghelper.set_kwargs()
784 self.assertEqual(arghelper.ceiling, dns.name.from_text('com'))
785
786 args = ['example.com']
787 arghelper = ArgHelper(self.resolver, self.logger)
788 arghelper.build_parser('probe')
789 arghelper.parse_args(args)
790 arghelper.set_kwargs()
791 self.assertEqual(arghelper.ceiling, dns.name.root)
792
793 args = ['-A', 'example.com']
794 arghelper = ArgHelper(self.resolver, self.logger)
795 arghelper.build_parser('probe')
796 arghelper.parse_args(args)
797 arghelper.set_kwargs()
798 self.assertIsNone(arghelper.ceiling)
799
800 def test_ip4_ipv6(self):
801 args = []
802 arghelper = ArgHelper(self.resolver, self.logger)
803 arghelper.build_parser('probe')
804 arghelper.parse_args(args)
805 arghelper.set_kwargs()
806 self.assertEqual(arghelper.try_ipv4, True)
807 self.assertEqual(arghelper.try_ipv6, True)
808
809 args = ['-4', '-6']
810 arghelper = ArgHelper(self.resolver, self.logger)
811 arghelper.build_parser('probe')
812 arghelper.parse_args(args)
813 arghelper.set_kwargs()
814 self.assertEqual(arghelper.try_ipv4, True)
815 self.assertEqual(arghelper.try_ipv6, True)
816
817 args = ['-4']
818 arghelper = ArgHelper(self.resolver, self.logger)
819 arghelper.build_parser('probe')
820 arghelper.parse_args(args)
821 arghelper.set_kwargs()
822 self.assertEqual(arghelper.try_ipv4, True)
823 self.assertEqual(arghelper.try_ipv6, False)
824
825 args = ['-6']
826 arghelper = ArgHelper(self.resolver, self.logger)
827 arghelper.build_parser('probe')
828 arghelper.parse_args(args)
829 arghelper.set_kwargs()
830 self.assertEqual(arghelper.try_ipv4, False)
831 self.assertEqual(arghelper.try_ipv6, True)
832
833 def test_client_ip(self):
834 args = []
835 arghelper = ArgHelper(self.resolver, self.logger)
836 arghelper.build_parser('probe')
837 arghelper.parse_args(args)
838 arghelper.set_kwargs()
839 self.assertIsNone(arghelper.client_ipv4)
840 self.assertIsNone(arghelper.client_ipv6)
841
842 args = ['-b', '127.0.0.1']
843 if self.use_ipv6:
844 args.extend(['-b', '::1'])
845 arghelper = ArgHelper(self.resolver, self.logger)
846 arghelper.build_parser('probe')
847 arghelper.parse_args(args)
848 arghelper.set_kwargs()
849 self.assertEqual(arghelper.client_ipv4, IPAddr('127.0.0.1'))
850 if self.use_ipv6:
851 self.assertEqual(arghelper.client_ipv6, IPAddr('::1'))
852
853 def test_th_factories(self):
854 args = ['example.com']
855 arghelper = ArgHelper(self.resolver, self.logger)
856 arghelper.build_parser('probe')
857 arghelper.parse_args(args)
858 arghelper.set_kwargs()
859 self.assertIsNone(arghelper.th_factories)
860
861 args = ['-u', 'http://example.com/', 'example.com']
862 arghelper = ArgHelper(self.resolver, self.logger)
863 arghelper.build_parser('probe')
864 arghelper.parse_args(args)
865 arghelper.set_kwargs()
866 self.assertIsInstance(arghelper.th_factories[0], transport.DNSQueryTransportHandlerHTTPFactory)
867
868 args = ['-u', 'ws:///dev/null', 'example.com']
869 arghelper = ArgHelper(self.resolver, self.logger)
870 arghelper.build_parser('probe')
871 arghelper.parse_args(args)
872 arghelper.set_kwargs()
873 self.assertIsInstance(arghelper.th_factories[0], transport.DNSQueryTransportHandlerWebSocketServerFactory)
874
875 args = ['-u', 'ssh://example.com/', 'example.com']
876 arghelper = ArgHelper(self.resolver, self.logger)
877 arghelper.build_parser('probe')
878 arghelper.parse_args(args)
879 arghelper.set_kwargs()
880 self.assertIsInstance(arghelper.th_factories[0], transport.DNSQueryTransportHandlerRemoteCmdFactory)
881
882 def test_edns_options(self):
883 CustomQueryMixin.edns_options = self.custom_query_mixin_edns_options_orig[:]
884
885 # None
886 args = ['-c', '', 'example.com']
887 arghelper = ArgHelper(self.resolver, self.logger)
888 arghelper.build_parser('probe')
889 arghelper.parse_args(args)
890 arghelper.set_kwargs()
891 self.assertEqual(len(CustomQueryMixin.edns_options), 0)
892
893 CustomQueryMixin.edns_options = self.custom_query_mixin_edns_options_orig[:]
894
895 # Only DNS cookie
896 args = ['example.com']
897 arghelper = ArgHelper(self.resolver, self.logger)
898 arghelper.build_parser('probe')
899 arghelper.parse_args(args)
900 arghelper.set_kwargs()
901 self.assertEqual(set([o.otype for o in CustomQueryMixin.edns_options]), set([10]))
902
903 CustomQueryMixin.edns_options = self.custom_query_mixin_edns_options_orig[:]
904
905 # All EDNS options
906 args = ['-n', '-e', '192.0.2.0/24', 'example.com']
907 arghelper = ArgHelper(self.resolver, self.logger)
908 arghelper.build_parser('probe')
909 arghelper.parse_args(args)
910 arghelper.set_kwargs()
911 self.assertEqual(set([o.otype for o in CustomQueryMixin.edns_options]), set([3, 8, 10]))
912
913 CustomQueryMixin.edns_options = self.custom_query_mixin_edns_options_orig[:]
914
915 def test_ingest_input(self):
916 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_bad_json:
917 example_bad_json.write(b'{')
918
919 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_no_version:
920 example_no_version.write(b'{}')
921
922 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_invalid_version_1:
923 example_invalid_version_1.write(b'{ "_meta._dnsviz.": { "version": 1.11 } }')
924
925 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_invalid_version_2:
926 example_invalid_version_2.write(b'{ "_meta._dnsviz.": { "version": 5.0 } }')
927
928 with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
929 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_auth_out:
930 example_auth_out.write(example_auth_in.read())
931
932 try:
933 args = ['-r', example_auth_out.name]
934 arghelper = ArgHelper(self.resolver, self.logger)
935 arghelper.build_parser('probe')
936 arghelper.parse_args(args)
937 arghelper.ingest_input()
938
939 # Bad json
940 args = ['-r', example_bad_json.name]
941 arghelper = ArgHelper(self.resolver, self.logger)
942 arghelper.build_parser('probe')
943 arghelper.parse_args(args)
944 with self.assertRaises(AnalysisInputError):
945 arghelper.ingest_input()
946
947 # No version
948 args = ['-r', example_no_version.name]
949 arghelper = ArgHelper(self.resolver, self.logger)
950 arghelper.build_parser('probe')
951 arghelper.parse_args(args)
952 with self.assertRaises(AnalysisInputError):
953 arghelper.ingest_input()
954
955 # Invalid version
956 args = ['-r', example_invalid_version_1.name]
957 arghelper = ArgHelper(self.resolver, self.logger)
958 arghelper.build_parser('probe')
959 arghelper.parse_args(args)
960 with self.assertRaises(AnalysisInputError):
961 arghelper.ingest_input()
962
963 # Invalid version
964 args = ['-r', example_invalid_version_2.name]
965 arghelper = ArgHelper(self.resolver, self.logger)
966 arghelper.build_parser('probe')
967 arghelper.parse_args(args)
968 with self.assertRaises(AnalysisInputError):
969 arghelper.ingest_input()
970
971 finally:
972 for tmpfile in (example_auth_out, example_bad_json, example_no_version, \
973 example_invalid_version_1, example_invalid_version_2):
974 os.remove(tmpfile.name)
975
976 def test_ingest_names(self):
977 args = ['example.com', 'example.net']
978 arghelper = ArgHelper(self.resolver, self.logger)
979 arghelper.build_parser('probe')
980 arghelper.parse_args(args)
981 arghelper.ingest_names()
982 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net')])
983
984 unicode_name = 'テスト'
985
986 args = [unicode_name]
987 arghelper = ArgHelper(self.resolver, self.logger)
988 arghelper.build_parser('probe')
989 arghelper.parse_args(args)
990 arghelper.ingest_names()
991 self.assertEqual(list(arghelper.names), [dns.name.from_text('xn--zckzah.')])
992
993 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as names_file:
994 names_file.write('example.com\nexample.net\n'.encode('utf-8'))
995
996 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as names_file_unicode:
997 try:
998 names_file_unicode.write(('%s\n' % (unicode_name)).encode('utf-8'))
999 # python3/python2 dual compatibility
1000 except UnicodeDecodeError:
1001 names_file_unicode.write(('%s\n' % (unicode_name)))
1002
1003 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_names_only:
1004 example_names_only.write(b'{ "_meta._dnsviz.": { "version": 1.2, "names": [ "example.com.", "example.net.", "example.org." ] } }')
1005
1006 try:
1007 args = ['-f', names_file.name]
1008 arghelper = ArgHelper(self.resolver, self.logger)
1009 arghelper.build_parser('probe')
1010 arghelper.parse_args(args)
1011 arghelper.ingest_names()
1012 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net')])
1013
1014 args = ['-f', names_file_unicode.name]
1015 arghelper = ArgHelper(self.resolver, self.logger)
1016 arghelper.build_parser('probe')
1017 arghelper.parse_args(args)
1018 arghelper.ingest_names()
1019 self.assertEqual(list(arghelper.names), [dns.name.from_text('xn--zckzah.')])
1020
1021 args = ['-r', example_names_only.name]
1022 arghelper = ArgHelper(self.resolver, self.logger)
1023 arghelper.build_parser('probe')
1024 arghelper.parse_args(args)
1025 arghelper.ingest_input()
1026 arghelper.ingest_names()
1027 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net'), dns.name.from_text('example.org')])
1028
1029 args = ['-r', example_names_only.name, 'example.com']
1030 arghelper = ArgHelper(self.resolver, self.logger)
1031 arghelper.build_parser('probe')
1032 arghelper.parse_args(args)
1033 arghelper.ingest_input()
1034 arghelper.ingest_names()
1035 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com')])
1036 finally:
1037 for tmpfile in (names_file, names_file_unicode, example_names_only):
1038 os.remove(tmpfile.name)
1039
1040 if __name__ == '__main__':
1041 unittest.main()
+0
-112
tests/dnsviz_probe_run_offline.py less more
0 import gzip
1 import io
2 import os
3 import subprocess
4 import tempfile
5 import unittest
6
7 DATA_DIR = os.path.dirname(__file__)
8 EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
9 EXAMPLE_RECURSIVE = os.path.join(DATA_DIR, 'data', 'example-recursive.json.gz')
10 ROOT_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'root-authoritative.json.gz')
11 ROOT_RECURSIVE = os.path.join(DATA_DIR, 'data', 'root-recursive.json.gz')
12 EXAMPLE_COM_SIGNED = os.path.join(DATA_DIR, 'zone', 'example.com.zone.signed')
13 EXAMPLE_COM_ZONE = os.path.join(DATA_DIR, 'zone', 'example.com.zone')
14 EXAMPLE_COM_DELEGATION = os.path.join(DATA_DIR, 'zone', 'example.com.zone-delegation')
15
16 class DNSProbeRunOfflineTestCase(unittest.TestCase):
17 def setUp(self):
18 self.current_cwd = os.getcwd()
19 self.dnsviz_bin = os.path.join(self.current_cwd, 'bin', 'dnsviz')
20
21 with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
22 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_auth_out:
23 self.example_auth_out.write(example_auth_in.read())
24
25 with gzip.open(EXAMPLE_RECURSIVE, 'rb') as example_rec_in:
26 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_rec_out:
27 self.example_rec_out.write(example_rec_in.read())
28
29 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.names_file:
30 self.names_file.write('example.com\nexample.net\n'.encode('utf-8'))
31
32 self.output = tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False)
33 self.output.close()
34
35 self.run_cwd = tempfile.mkdtemp(prefix='dnsviz')
36
37 def tearDown(self):
38 os.remove(self.example_auth_out.name)
39 os.remove(self.example_rec_out.name)
40 os.remove(self.names_file.name)
41 os.remove(self.output.name)
42 subprocess.check_call(['rm', '-rf', self.run_cwd])
43
44 def test_dnsviz_probe_input(self):
45 with io.open(self.output.name, 'wb') as fh_out:
46 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
47 p = subprocess.Popen([self.dnsviz_bin, 'probe', '-d', '0', '-r', '-', 'example.com'], stdin=subprocess.PIPE, stdout=fh_out)
48 p.communicate(fh_in.read())
49 self.assertEqual(p.returncode, 0)
50
51 with io.open(self.output.name, 'wb') as fh:
52 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '-r', self.example_auth_out.name, 'example.com'], stdout=fh), 0)
53
54 def test_dnsviz_probe_names_input(self):
55 with io.open(self.output.name, 'wb') as fh:
56 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '-r', self.example_auth_out.name, '-f', self.names_file.name], stdout=fh), 0)
57
58 with io.open(self.output.name, 'wb') as fh_out:
59 with io.open(self.names_file.name, 'rb') as fh_in:
60 p = subprocess.Popen([self.dnsviz_bin, 'probe', '-d', '0', '-r', self.example_auth_out.name, '-f', '-'], stdin=subprocess.PIPE, stdout=fh_out)
61 p.communicate(fh_in.read())
62 self.assertEqual(p.returncode, 0)
63
64 def test_dnsviz_probe_output(self):
65 with io.open(self.output.name, 'wb') as fh:
66 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '-r', self.example_auth_out.name, 'example.com'], cwd=self.run_cwd, stdout=fh), 0)
67
68 with io.open(self.output.name, 'wb') as fh:
69 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '-r', self.example_auth_out.name, '-o', '-', 'example.com'], cwd=self.run_cwd, stdout=fh), 0)
70
71 with io.open(self.output.name, 'wb') as fh:
72 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '-r', self.example_auth_out.name, '-o', 'all.json', 'example.com'], cwd=self.run_cwd, stdout=fh), 0)
73 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'all.json')))
74
75 def test_dnsviz_probe_auth(self):
76 with io.open(self.output.name, 'wb') as fh_out:
77 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
78 p = subprocess.Popen([self.dnsviz_bin, 'probe', '-d', '0', '-r', '-', 'example.com'], stdin=subprocess.PIPE, stdout=fh_out)
79 p.communicate(fh_in.read())
80 self.assertEqual(p.returncode, 0)
81
82 with io.open(self.output.name, 'wb') as fh_out:
83 with gzip.open(ROOT_AUTHORITATIVE) as fh_in:
84 p = subprocess.Popen([self.dnsviz_bin, 'probe', '-d', '0', '-r', '-', '.'], stdin=subprocess.PIPE, stdout=fh_out)
85 p.communicate(fh_in.read())
86 self.assertEqual(p.returncode, 0)
87
88 def test_dnsviz_probe_rec(self):
89 with io.open(self.output.name, 'wb') as fh_out:
90 with gzip.open(EXAMPLE_RECURSIVE) as fh_in:
91 p = subprocess.Popen([self.dnsviz_bin, 'probe', '-d', '0', '-r', '-', 'example.com'], stdin=subprocess.PIPE, stdout=fh_out)
92 p.communicate(fh_in.read())
93 self.assertEqual(p.returncode, 0)
94
95 with io.open(self.output.name, 'wb') as fh_out:
96 with gzip.open(ROOT_RECURSIVE) as fh_in:
97 p = subprocess.Popen([self.dnsviz_bin, 'probe', '-d', '0', '-r', '-', '.'], stdin=subprocess.PIPE, stdout=fh_out)
98 p.communicate(fh_in.read())
99 self.assertEqual(p.returncode, 0)
100
101 def test_dnsviz_probe_auth_local(self):
102 with io.open(self.output.name, 'wb') as fh:
103 self.assertEqual(subprocess.call(
104 [self.dnsviz_bin, 'probe', '-d', '0', '-A',
105 '-x' 'example.com:%s' % EXAMPLE_COM_SIGNED,
106 '-N' 'example.com:%s' % EXAMPLE_COM_DELEGATION,
107 '-D' 'example.com:%s' % EXAMPLE_COM_DELEGATION,
108 'example.com'], stdout=fh), 0)
109
110 if __name__ == '__main__':
111 unittest.main()
+0
-38
tests/dnsviz_probe_run_online.py less more
0 import io
1 import os
2 import subprocess
3 import tempfile
4 import unittest
5
6 class DNSVizProbeRunOnlineTestCase(unittest.TestCase):
7 def setUp(self):
8 self.current_cwd = os.getcwd()
9 self.dnsviz_bin = os.path.join(self.current_cwd, 'bin', 'dnsviz')
10
11 self.output = tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False)
12 self.output.close()
13
14 def tearDown(self):
15 os.remove(self.output.name)
16
17 def test_dnsviz_probe_auth(self):
18 with io.open(self.output.name, 'wb') as fh:
19 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '-A', '.'], stdout=fh), 0)
20
21 with io.open(self.output.name, 'wb') as fh:
22 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '-A', 'example.com'], stdout=fh), 0)
23
24 def test_dnsviz_probe_rec(self):
25 with io.open(self.output.name, 'wb') as fh:
26 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '.'], stdout=fh), 0)
27
28 with io.open(self.output.name, 'wb') as fh:
29 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', 'example.com'], stdout=fh), 0)
30
31 def test_dnsviz_probe_rec_multi(self):
32 with io.open(self.output.name, 'wb') as fh:
33 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '-t', '3', '.', 'example.com', 'example.net'], stdout=fh), 0)
34
35
36 if __name__ == '__main__':
37 unittest.main()
0 import argparse
1 import binascii
2 import datetime
3 import gzip
4 import importlib
5 import io
6 import logging
7 import os
8 import subprocess
9 import tempfile
10 import unittest
11
12 import dns.name, dns.rdatatype, dns.rrset, dns.zone
13
14 from dnsviz.format import utc
15 from dnsviz.util import get_default_trusted_keys
16
17 mod = importlib.import_module('dnsviz.commands.graph')
18 GraphArgHelper = getattr(mod, 'GraphArgHelper')
19 AnalysisInputError = getattr(mod, 'AnalysisInputError')
20
21 DATA_DIR = os.path.dirname(__file__)
22 EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
23
24
25 class DNSVizGraphOptionsTestCase(unittest.TestCase):
26 def setUp(self):
27 self.logger = logging.getLogger()
28 for handler in self.logger.handlers:
29 self.logger.removeHandler(handler)
30 self.logger.addHandler(logging.NullHandler())
31
32 def test_rrtype_list(self):
33 arg1 = 'A,AAAA,MX,CNAME'
34 arg1_with_spaces = ' A , AAAA , MX , CNAME '
35 arg2 = 'A'
36 arg3 = 'A,BLAH'
37 arg4_empty = ''
38 arg4_empty_spaces = ' '
39
40 type_list1 = [dns.rdatatype.A, dns.rdatatype.AAAA, dns.rdatatype.MX, dns.rdatatype.CNAME]
41 type_list2 = [dns.rdatatype.A]
42 empty_list = []
43
44 self.assertEqual(GraphArgHelper.comma_separated_dns_types(arg1), type_list1)
45 self.assertEqual(GraphArgHelper.comma_separated_dns_types(arg1_with_spaces), type_list1)
46 self.assertEqual(GraphArgHelper.comma_separated_dns_types(arg2), type_list2)
47 self.assertEqual(GraphArgHelper.comma_separated_dns_types(arg4_empty), empty_list)
48 self.assertEqual(GraphArgHelper.comma_separated_dns_types(arg4_empty_spaces), empty_list)
49
50 # invalid schema
51 with self.assertRaises(argparse.ArgumentTypeError):
52 GraphArgHelper.comma_separated_dns_types(arg3)
53
54 def test_integer_list(self):
55 arg1 = '1,2,3,4,5'
56 arg1_with_spaces = ' 1 , 2 , 3 , 4 , 5 '
57 arg2 = '1'
58 arg3 = '1,A'
59 arg4_empty = ''
60 arg4_empty_spaces = ' '
61
62 int_list1 = [1,2,3,4,5]
63 int_list2 = [1]
64 empty_list = []
65
66 int_set1 = set([1,2,3,4,5])
67 int_set2 = set([1])
68 empty_set = set([])
69
70 self.assertEqual(GraphArgHelper.comma_separated_ints(arg1), int_list1)
71 self.assertEqual(GraphArgHelper.comma_separated_ints(arg1_with_spaces), int_list1)
72 self.assertEqual(GraphArgHelper.comma_separated_ints(arg2), int_list2)
73 self.assertEqual(GraphArgHelper.comma_separated_ints(arg4_empty), empty_list)
74 self.assertEqual(GraphArgHelper.comma_separated_ints(arg4_empty_spaces), empty_list)
75
76 self.assertEqual(GraphArgHelper.comma_separated_ints_set(arg1), int_set1)
77 self.assertEqual(GraphArgHelper.comma_separated_ints_set(arg1_with_spaces), int_set1)
78 self.assertEqual(GraphArgHelper.comma_separated_ints_set(arg2), int_set2)
79 self.assertEqual(GraphArgHelper.comma_separated_ints_set(arg4_empty), empty_set)
80 self.assertEqual(GraphArgHelper.comma_separated_ints_set(arg4_empty_spaces), empty_set)
81
82 # invalid schema
83 with self.assertRaises(argparse.ArgumentTypeError):
84 GraphArgHelper.comma_separated_ints(arg3)
85
86 def test_valid_domain_name(self):
87 arg1 = '.'
88 arg2 = 'www.example.com'
89 arg3 = 'www..example.com'
90
91 self.assertEqual(GraphArgHelper.valid_domain_name(arg1), dns.name.from_text(arg1))
92 self.assertEqual(GraphArgHelper.valid_domain_name(arg2), dns.name.from_text(arg2))
93
94 # invalid domain name
95 with self.assertRaises(argparse.ArgumentTypeError):
96 GraphArgHelper.valid_domain_name(arg3)
97
98 def test_ingest_input(self):
99 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_bad_json:
100 example_bad_json.write(b'{')
101
102 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_no_version:
103 example_no_version.write(b'{}')
104
105 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_invalid_version_1:
106 example_invalid_version_1.write(b'{ "_meta._dnsviz.": { "version": 1.11 } }')
107
108 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_invalid_version_2:
109 example_invalid_version_2.write(b'{ "_meta._dnsviz.": { "version": 5.0 } }')
110
111 with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
112 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_auth_out:
113 example_auth_out.write(example_auth_in.read())
114
115 try:
116 args = ['-r', example_auth_out.name]
117 arghelper = GraphArgHelper(self.logger)
118 arghelper.build_parser('graph')
119 arghelper.parse_args(args)
120 arghelper.ingest_input()
121
122 # Bad json
123 args = ['-r', example_bad_json.name]
124 arghelper = GraphArgHelper(self.logger)
125 arghelper.build_parser('graph')
126 arghelper.parse_args(args)
127 with self.assertRaises(AnalysisInputError):
128 arghelper.ingest_input()
129
130 # No version
131 args = ['-r', example_no_version.name]
132 arghelper = GraphArgHelper(self.logger)
133 arghelper.build_parser('graph')
134 arghelper.parse_args(args)
135 with self.assertRaises(AnalysisInputError):
136 arghelper.ingest_input()
137
138 # Invalid version
139 args = ['-r', example_invalid_version_1.name]
140 arghelper = GraphArgHelper(self.logger)
141 arghelper.build_parser('graph')
142 arghelper.parse_args(args)
143 with self.assertRaises(AnalysisInputError):
144 arghelper.ingest_input()
145
146 # Invalid version
147 args = ['-r', example_invalid_version_2.name]
148 arghelper = GraphArgHelper(self.logger)
149 arghelper.build_parser('graph')
150 arghelper.parse_args(args)
151 with self.assertRaises(AnalysisInputError):
152 arghelper.ingest_input()
153
154 finally:
155 for tmpfile in (example_auth_out, example_bad_json, example_no_version, \
156 example_invalid_version_1, example_invalid_version_2):
157 os.remove(tmpfile.name)
158
159 def test_ingest_names(self):
160 args = ['example.com', 'example.net']
161 arghelper = GraphArgHelper(self.logger)
162 arghelper.build_parser('graph')
163 arghelper.parse_args(args)
164 arghelper.ingest_names()
165 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net')])
166
167 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as names_file:
168 names_file.write(b'example.com\nexample.net\n')
169
170 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_names_only:
171 example_names_only.write(b'{ "_meta._dnsviz.": { "version": 1.2, "names": [ "example.com.", "example.net.", "example.org." ] } }')
172
173 try:
174 args = ['-f', names_file.name]
175 arghelper = GraphArgHelper(self.logger)
176 arghelper.build_parser('graph')
177 arghelper.parse_args(args)
178 arghelper.ingest_names()
179 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net')])
180
181 args = ['-r', example_names_only.name]
182 arghelper = GraphArgHelper(self.logger)
183 arghelper.build_parser('graph')
184 arghelper.parse_args(args)
185 arghelper.ingest_input()
186 arghelper.ingest_names()
187 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net'), dns.name.from_text('example.org')])
188
189 args = ['-r', example_names_only.name, 'example.com']
190 arghelper = GraphArgHelper(self.logger)
191 arghelper.build_parser('graph')
192 arghelper.parse_args(args)
193 arghelper.ingest_input()
194 arghelper.ingest_names()
195 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com')])
196 finally:
197 for tmpfile in (names_file, example_names_only):
198 os.remove(tmpfile.name)
199
200 def test_trusted_keys_file(self):
201 tk1 = 'example.com. IN DNSKEY 256 3 7 AwEAAZ2YEuBl4X58v1CezDfZjT1viYn5kY3MF3lSDjvHjMZ6gJlYt4Qq oIdpChifmeJldEX9/wPc04Tg7MlEfV3m0x2j80dMyObM0FZTxzMgbTFk Zs0AWrDXELieGkFZv1FB9YoxSX2XqvpFxwvPyyszUtCy/c5hrb6vfKRB Jh+qIO+NsNrl6O8NiYjWWNjdiFw+c2BxzpArQoaA+rcoyDYwH4xGpvTw YLnE9HmkwTSQuwASkgWgX3KgTmsDEw4I0P5Tk+wvmNnaqDhmFMHJK5Oh 92wUX+ppxxSgUx4UIJmftzi7sCg0qekIYUf99Dkn7OlC8X0rjj+xO4cD hbTjGkxmsD0='
202 tk2 = 'example.com. IN DNSKEY 256 3 7 AwEAAaerI6CXvvG6U3UxkB0PXj+ORyGFtABYJ6JG3NL6w1KKlZl+73AS aPEEa7SXeuWmAWE1N3rsbnrMBvepBXkCbP609eoo2mJ8bsozT/NNwSSc FP1Ddw4wxpZAC/+/K736rF1HbI3ROS/rBTr7RW6rWzcyPbYFuUMVzrAM ZSJNJsTDcmyGc5Is3cFzNcrd3/Gmcjt8TKMmGq51HXWzFvxro7EH6aOl K6G4O4+mzaUKp91mg7DAVhX8yXnadXUZQ4yDfLzSleYQ2TroQqeSgI3X m/gUoACm3ELUOr84TmIKZ67X/zBTx8tHC5iBWY2tbIKqiJY7I4/aW4S4 NraCSRbDpbM='
203 tk1_rdata = ' '.join(tk1.split()[3:])
204 tk2_rdata = ' '.join(tk2.split()[3:])
205 tk_explicit = [(dns.name.from_text('example.com'), dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DNSKEY, tk1_rdata)),
206 (dns.name.from_text('example.com'), dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DNSKEY, tk2_rdata))]
207
208 now = datetime.datetime.now(utc)
209 tk_default = get_default_trusted_keys(now)
210
211 args = ['example.com']
212 arghelper = GraphArgHelper(self.logger)
213 arghelper.build_parser('graph')
214 arghelper.parse_args(args)
215 arghelper.aggregate_trusted_key_info()
216 self.assertEqual(arghelper.trusted_keys, None)
217 arghelper.update_trusted_key_info(now)
218 self.assertEqual(arghelper.trusted_keys, tk_default)
219
220 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as tk1_file:
221 tk1_file.write(tk1.encode('utf-8'))
222
223 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as tk2_file:
224 tk2_file.write(tk2.encode('utf-8'))
225
226 try:
227 args = ['-t', tk1_file.name, '-t', tk2_file.name, 'example.com']
228 arghelper = GraphArgHelper(self.logger)
229 arghelper.build_parser('graph')
230 arghelper.parse_args(args)
231 arghelper.aggregate_trusted_key_info()
232 arghelper.update_trusted_key_info(now)
233 self.assertEqual(arghelper.trusted_keys, tk_explicit)
234
235 args = ['-t', '/dev/null', 'example.com']
236 arghelper = GraphArgHelper(self.logger)
237 arghelper.build_parser('graph')
238 arghelper.parse_args(args)
239 arghelper.aggregate_trusted_key_info()
240 arghelper.update_trusted_key_info(now)
241 self.assertEqual(arghelper.trusted_keys, [])
242
243 finally:
244 for tmpfile in (tk1_file, tk2_file):
245 os.remove(tmpfile.name)
246
247 def test_option_combination_errors(self):
248
249 # Names file and command-line domain names are mutually exclusive
250 args = ['-f', '/dev/null', 'example.com']
251 arghelper = GraphArgHelper(self.logger)
252 arghelper.build_parser('graph')
253 arghelper.parse_args(args)
254 with self.assertRaises(argparse.ArgumentTypeError):
255 arghelper.check_args()
256
257 # Names file and command-line domain names are mutually exclusive
258 args = ['-O', '-o', '/dev/null']
259 arghelper = GraphArgHelper(self.logger)
260 arghelper.build_parser('graph')
261 arghelper.parse_args(args)
262 with self.assertRaises(argparse.ArgumentTypeError):
263 arghelper.check_args()
264
265 # But this is allowed
266 args = ['-o', '/dev/null']
267 arghelper = GraphArgHelper(self.logger)
268 arghelper.build_parser('graph')
269 arghelper.parse_args(args)
270 arghelper.check_args()
271
272 # So is this
273 args = ['-O']
274 arghelper = GraphArgHelper(self.logger)
275 arghelper.build_parser('graph')
276 arghelper.parse_args(args)
277 arghelper.check_args()
278
279 def test_output_format(self):
280
281 args = ['-T', 'png', '-o', 'foo.dot']
282 arghelper = GraphArgHelper(self.logger)
283 arghelper.build_parser('graph')
284 arghelper.parse_args(args)
285 arghelper.set_kwargs()
286 self.assertEqual(arghelper.output_format, 'png')
287
288 args = ['-o', 'foo.dot']
289 arghelper = GraphArgHelper(self.logger)
290 arghelper.build_parser('graph')
291 arghelper.parse_args(args)
292 arghelper.set_kwargs()
293 self.assertEqual(arghelper.output_format, 'dot')
294
295 args = ['-o', 'foo.png']
296 arghelper = GraphArgHelper(self.logger)
297 arghelper.build_parser('graph')
298 arghelper.parse_args(args)
299 arghelper.set_kwargs()
300 self.assertEqual(arghelper.output_format, 'png')
301
302 args = ['-o', 'foo.html']
303 arghelper = GraphArgHelper(self.logger)
304 arghelper.build_parser('graph')
305 arghelper.parse_args(args)
306 arghelper.set_kwargs()
307 self.assertEqual(arghelper.output_format, 'html')
308
309 args = ['-o', 'foo.svg']
310 arghelper = GraphArgHelper(self.logger)
311 arghelper.build_parser('graph')
312 arghelper.parse_args(args)
313 arghelper.set_kwargs()
314 self.assertEqual(arghelper.output_format, 'svg')
315
316 args = ['-o', 'foo.xyz']
317 arghelper = GraphArgHelper(self.logger)
318 arghelper.build_parser('graph')
319 arghelper.parse_args(args)
320 with self.assertRaises(argparse.ArgumentTypeError):
321 arghelper.set_kwargs()
322
323 args = ['-o', 'png']
324 arghelper = GraphArgHelper(self.logger)
325 arghelper.build_parser('graph')
326 arghelper.parse_args(args)
327 with self.assertRaises(argparse.ArgumentTypeError):
328 arghelper.set_kwargs()
329
330 args = ['-o', '-']
331 arghelper = GraphArgHelper(self.logger)
332 arghelper.build_parser('graph')
333 arghelper.parse_args(args)
334 arghelper.set_kwargs()
335 self.assertEqual(arghelper.output_format, 'dot')
336
337 args = []
338 arghelper = GraphArgHelper(self.logger)
339 arghelper.build_parser('graph')
340 arghelper.parse_args(args)
341 arghelper.set_kwargs()
342 self.assertEqual(arghelper.output_format, 'dot')
343
344 if __name__ == '__main__':
345 unittest.main()
0 import gzip
1 import io
2 import os
3 import subprocess
4 import tempfile
5 import unittest
6
7 DATA_DIR = os.path.dirname(__file__)
8 EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
9 EXAMPLE_RECURSIVE = os.path.join(DATA_DIR, 'data', 'example-recursive.json.gz')
10 ROOT_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'root-authoritative.json.gz')
11 ROOT_RECURSIVE = os.path.join(DATA_DIR, 'data', 'root-recursive.json.gz')
12
13 class DNSGraphRunTestCase(unittest.TestCase):
14 def setUp(self):
15 self.devnull = io.open('/dev/null', 'wb')
16 self.current_cwd = os.getcwd()
17 self.dnsviz_bin = os.path.join(self.current_cwd, 'bin', 'dnsviz')
18
19 tk = 'example.com. IN DNSKEY 256 3 7 AwEAAZ2YEuBl4X58v1CezDfZjT1viYn5kY3MF3lSDjvHjMZ6gJlYt4Qq oIdpChifmeJldEX9/wPc04Tg7MlEfV3m0x2j80dMyObM0FZTxzMgbTFk Zs0AWrDXELieGkFZv1FB9YoxSX2XqvpFxwvPyyszUtCy/c5hrb6vfKRB Jh+qIO+NsNrl6O8NiYjWWNjdiFw+c2BxzpArQoaA+rcoyDYwH4xGpvTw YLnE9HmkwTSQuwASkgWgX3KgTmsDEw4I0P5Tk+wvmNnaqDhmFMHJK5Oh 92wUX+ppxxSgUx4UIJmftzi7sCg0qekIYUf99Dkn7OlC8X0rjj+xO4cD hbTjGkxmsD0='
20
21 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.tk_file:
22 self.tk_file.write(tk.encode('utf-8'))
23
24 with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
25 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_auth_out:
26 self.example_auth_out.write(example_auth_in.read())
27
28 with gzip.open(EXAMPLE_RECURSIVE, 'rb') as example_rec_in:
29 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_rec_out:
30 self.example_rec_out.write(example_rec_in.read())
31
32 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.names_file:
33 self.names_file.write('example.com\nexample.net\n'.encode('utf-8'))
34
35 self.output = tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False)
36 self.output.close()
37
38 self.run_cwd = tempfile.mkdtemp(prefix='dnsviz')
39
40 def tearDown(self):
41 self.devnull.close()
42 os.remove(self.tk_file.name)
43 os.remove(self.example_auth_out.name)
44 os.remove(self.example_rec_out.name)
45 os.remove(self.names_file.name)
46 os.remove(self.output.name)
47 subprocess.check_call(['rm', '-rf', self.run_cwd])
48
49 def test_dnsviz_graph_input(self):
50 with io.open(self.output.name, 'wb') as fh_out:
51 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
52 p = subprocess.Popen([self.dnsviz_bin, 'graph'], stdin=subprocess.PIPE, stdout=fh_out)
53 p.communicate(fh_in.read())
54 self.assertEqual(p.returncode, 0)
55
56 with io.open(self.output.name, 'wb') as fh_out:
57 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
58 p = subprocess.Popen([self.dnsviz_bin, 'graph', '-r', '-'], stdin=subprocess.PIPE, stdout=fh_out)
59 p.communicate(fh_in.read())
60 self.assertEqual(p.returncode, 0)
61
62 with io.open(self.output.name, 'wb') as fh:
63 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name], stdout=fh), 0)
64
65 def test_dnsviz_graph_names_input(self):
66 with io.open(self.output.name, 'wb') as fh:
67 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-f', self.names_file.name], stdout=fh), 0)
68
69 with io.open(self.output.name, 'wb') as fh_out:
70 with io.open(self.names_file.name, 'rb') as fh_in:
71 p = subprocess.Popen([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-f', '-'], stdin=subprocess.PIPE, stdout=fh_out)
72 p.communicate(fh_in.read())
73 self.assertEqual(p.returncode, 0)
74
75 def test_dnsviz_graph_tk_input(self):
76 with io.open(self.output.name, 'wb') as fh:
77 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-t', self.tk_file.name], stdout=fh), 0)
78
79 with io.open(self.output.name, 'wb') as fh_out:
80 with io.open(self.tk_file.name, 'rb') as fh_in:
81 p = subprocess.Popen([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-t', '-'], stdin=subprocess.PIPE, stdout=fh_out)
82 p.communicate(fh_in.read())
83 self.assertEqual(p.returncode, 0)
84
85 def test_dnsviz_graph_output(self):
86 with io.open(self.output.name, 'wb') as fh:
87 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name], cwd=self.run_cwd, stdout=fh), 0)
88
89 with io.open(self.output.name, 'wb') as fh:
90 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-Tdot', '-o', '-'], cwd=self.run_cwd, stdout=fh), 0)
91
92 with io.open(self.output.name, 'wb') as fh:
93 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-o', 'all.dot'], cwd=self.run_cwd, stdout=fh), 0)
94 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'all.dot')))
95 self.assertFalse(os.path.exists(os.path.join(self.run_cwd, 'example.com.dot')))
96 self.assertFalse(os.path.exists(os.path.join(self.run_cwd, 'example.net.dot')))
97
98 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-O'], cwd=self.run_cwd), 0)
99 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'example.com.dot')))
100 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'example.net.dot')))
101
102 def test_dnsviz_graph_input_auth(self):
103 with io.open(self.output.name, 'wb') as fh_out:
104 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
105 p = subprocess.Popen([self.dnsviz_bin, 'graph'], stdin=subprocess.PIPE, stdout=fh_out)
106 p.communicate(fh_in.read())
107 self.assertEqual(p.returncode, 0)
108
109 with io.open(self.output.name, 'wb') as fh_out:
110 with gzip.open(ROOT_AUTHORITATIVE) as fh_in:
111 p = subprocess.Popen([self.dnsviz_bin, 'graph'], stdin=subprocess.PIPE, stdout=fh_out)
112 p.communicate(fh_in.read())
113 self.assertEqual(p.returncode, 0)
114
115 def test_dnsviz_graph_input_rec(self):
116 with io.open(self.output.name, 'wb') as fh_out:
117 with gzip.open(EXAMPLE_RECURSIVE) as fh_in:
118 p = subprocess.Popen([self.dnsviz_bin, 'graph'], stdin=subprocess.PIPE, stdout=fh_out)
119 p.communicate(fh_in.read())
120 self.assertEqual(p.returncode, 0)
121
122 with io.open(self.output.name, 'wb') as fh_out:
123 with gzip.open(ROOT_RECURSIVE) as fh_in:
124 p = subprocess.Popen([self.dnsviz_bin, 'graph'], stdin=subprocess.PIPE, stdout=fh_out)
125 p.communicate(fh_in.read())
126 self.assertEqual(p.returncode, 0)
127
128 def test_dnsviz_graph_output_format(self):
129 magic_codes_mapping = {
130 'dot': b'digraph',
131 'png': b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A',
132 'svg': b'<?xml ',
133 'html': b'<?xml ',
134 }
135
136 for fmt in ('dot', 'png', 'svg', 'html'):
137 with io.open(self.output.name, 'wb') as fh:
138 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-o', 'all.'+fmt], cwd=self.run_cwd, stdout=fh), 0)
139 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'all.'+fmt)))
140 self.assertFalse(os.path.exists(os.path.join(self.run_cwd, 'example.com.' + fmt)))
141 self.assertFalse(os.path.exists(os.path.join(self.run_cwd, 'example.net.' + fmt)))
142
143 with io.open(os.path.join(self.run_cwd, 'all.' + fmt), 'rb') as fh:
144 first_bytes = fh.read(len(magic_codes_mapping[fmt]))
145 self.assertEqual(first_bytes, magic_codes_mapping[fmt])
146
147 self.assertEqual(subprocess.call([self.dnsviz_bin, 'graph', '-r', self.example_auth_out.name, '-T', fmt, '-O'], cwd=self.run_cwd), 0)
148 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'example.com.' + fmt)))
149 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'example.net.' + fmt)))
150
151 with io.open(os.path.join(self.run_cwd, 'example.com.' + fmt), 'rb') as fh:
152 first_bytes = fh.read(len(magic_codes_mapping[fmt]))
153 self.assertEqual(first_bytes, magic_codes_mapping[fmt])
154
155 if __name__ == '__main__':
156 unittest.main()
0 import argparse
1 import datetime
2 import gzip
3 import importlib
4 import io
5 import logging
6 import os
7 import subprocess
8 import tempfile
9 import unittest
10
11 import dns.name, dns.rdatatype, dns.rrset, dns.zone
12
13 from dnsviz.format import utc
14 from dnsviz.util import get_default_trusted_keys
15
16 mod = importlib.import_module('dnsviz.commands.grok')
17 GrokArgHelper = getattr(mod, 'GrokArgHelper')
18 AnalysisInputError = getattr(mod, 'AnalysisInputError')
19
20 DATA_DIR = os.path.dirname(__file__)
21 EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
22
23
24 class DNSVizGrokOptionsTestCase(unittest.TestCase):
25 def setUp(self):
26 self.logger = logging.getLogger()
27 for handler in self.logger.handlers:
28 self.logger.removeHandler(handler)
29 self.logger.addHandler(logging.NullHandler())
30
31 def test_integer_list(self):
32 arg1 = '1,2,3,4,5'
33 arg1_with_spaces = ' 1 , 2 , 3 , 4 , 5 '
34 arg2 = '1'
35 arg3 = '1,A'
36 arg4_empty = ''
37 arg4_empty_spaces = ' '
38
39 int_list1 = [1,2,3,4,5]
40 int_list2 = [1]
41 empty_list = []
42
43 int_set1 = set([1,2,3,4,5])
44 int_set2 = set([1])
45 empty_set = set([])
46
47 self.assertEqual(GrokArgHelper.comma_separated_ints(arg1), int_list1)
48 self.assertEqual(GrokArgHelper.comma_separated_ints(arg1_with_spaces), int_list1)
49 self.assertEqual(GrokArgHelper.comma_separated_ints(arg2), int_list2)
50 self.assertEqual(GrokArgHelper.comma_separated_ints(arg4_empty), empty_list)
51 self.assertEqual(GrokArgHelper.comma_separated_ints(arg4_empty_spaces), empty_list)
52
53 self.assertEqual(GrokArgHelper.comma_separated_ints_set(arg1), int_set1)
54 self.assertEqual(GrokArgHelper.comma_separated_ints_set(arg1_with_spaces), int_set1)
55 self.assertEqual(GrokArgHelper.comma_separated_ints_set(arg2), int_set2)
56 self.assertEqual(GrokArgHelper.comma_separated_ints_set(arg4_empty), empty_set)
57 self.assertEqual(GrokArgHelper.comma_separated_ints_set(arg4_empty_spaces), empty_set)
58
59 # invalid schema
60 with self.assertRaises(argparse.ArgumentTypeError):
61 GrokArgHelper.comma_separated_ints(arg3)
62
63 def test_valid_domain_name(self):
64 arg1 = '.'
65 arg2 = 'www.example.com'
66 arg3 = 'www..example.com'
67
68 self.assertEqual(GrokArgHelper.valid_domain_name(arg1), dns.name.from_text(arg1))
69 self.assertEqual(GrokArgHelper.valid_domain_name(arg2), dns.name.from_text(arg2))
70
71 # invalid domain name
72 with self.assertRaises(argparse.ArgumentTypeError):
73 GrokArgHelper.valid_domain_name(arg3)
74
75 def test_ingest_input(self):
76 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_bad_json:
77 example_bad_json.write(b'{')
78
79 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_no_version:
80 example_no_version.write(b'{}')
81
82 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_invalid_version_1:
83 example_invalid_version_1.write(b'{ "_meta._dnsviz.": { "version": 1.11 } }')
84
85 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_invalid_version_2:
86 example_invalid_version_2.write(b'{ "_meta._dnsviz.": { "version": 5.0 } }')
87
88 with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
89 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_auth_out:
90 example_auth_out.write(example_auth_in.read())
91
92 try:
93 args = ['-r', example_auth_out.name]
94 arghelper = GrokArgHelper(self.logger)
95 arghelper.build_parser('grok')
96 arghelper.parse_args(args)
97 arghelper.ingest_input()
98
99 # Bad json
100 args = ['-r', example_bad_json.name]
101 arghelper = GrokArgHelper(self.logger)
102 arghelper.build_parser('grok')
103 arghelper.parse_args(args)
104 with self.assertRaises(AnalysisInputError):
105 arghelper.ingest_input()
106
107 # No version
108 args = ['-r', example_no_version.name]
109 arghelper = GrokArgHelper(self.logger)
110 arghelper.build_parser('grok')
111 arghelper.parse_args(args)
112 with self.assertRaises(AnalysisInputError):
113 arghelper.ingest_input()
114
115 # Invalid version
116 args = ['-r', example_invalid_version_1.name]
117 arghelper = GrokArgHelper(self.logger)
118 arghelper.build_parser('grok')
119 arghelper.parse_args(args)
120 with self.assertRaises(AnalysisInputError):
121 arghelper.ingest_input()
122
123 # Invalid version
124 args = ['-r', example_invalid_version_2.name]
125 arghelper = GrokArgHelper(self.logger)
126 arghelper.build_parser('grok')
127 arghelper.parse_args(args)
128 with self.assertRaises(AnalysisInputError):
129 arghelper.ingest_input()
130
131 finally:
132 for tmpfile in (example_auth_out, example_bad_json, example_no_version, \
133 example_invalid_version_1, example_invalid_version_2):
134 os.remove(tmpfile.name)
135
136 def test_ingest_names(self):
137 args = ['example.com', 'example.net']
138 arghelper = GrokArgHelper(self.logger)
139 arghelper.build_parser('grok')
140 arghelper.parse_args(args)
141 arghelper.ingest_names()
142 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net')])
143
144 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as names_file:
145 names_file.write(b'example.com\nexample.net\n')
146
147 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_names_only:
148 example_names_only.write(b'{ "_meta._dnsviz.": { "version": 1.2, "names": [ "example.com.", "example.net.", "example.org." ] } }')
149
150 try:
151 args = ['-f', names_file.name]
152 arghelper = GrokArgHelper(self.logger)
153 arghelper.build_parser('grok')
154 arghelper.parse_args(args)
155 arghelper.ingest_names()
156 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net')])
157
158 args = ['-r', example_names_only.name]
159 arghelper = GrokArgHelper(self.logger)
160 arghelper.build_parser('grok')
161 arghelper.parse_args(args)
162 arghelper.ingest_input()
163 arghelper.ingest_names()
164 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net'), dns.name.from_text('example.org')])
165
166 args = ['-r', example_names_only.name, 'example.com']
167 arghelper = GrokArgHelper(self.logger)
168 arghelper.build_parser('grok')
169 arghelper.parse_args(args)
170 arghelper.ingest_input()
171 arghelper.ingest_names()
172 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com')])
173 finally:
174 for tmpfile in (names_file, example_names_only):
175 os.remove(tmpfile.name)
176
177 def test_trusted_keys_file(self):
178 tk1 = 'example.com. IN DNSKEY 256 3 7 AwEAAZ2YEuBl4X58v1CezDfZjT1viYn5kY3MF3lSDjvHjMZ6gJlYt4Qq oIdpChifmeJldEX9/wPc04Tg7MlEfV3m0x2j80dMyObM0FZTxzMgbTFk Zs0AWrDXELieGkFZv1FB9YoxSX2XqvpFxwvPyyszUtCy/c5hrb6vfKRB Jh+qIO+NsNrl6O8NiYjWWNjdiFw+c2BxzpArQoaA+rcoyDYwH4xGpvTw YLnE9HmkwTSQuwASkgWgX3KgTmsDEw4I0P5Tk+wvmNnaqDhmFMHJK5Oh 92wUX+ppxxSgUx4UIJmftzi7sCg0qekIYUf99Dkn7OlC8X0rjj+xO4cD hbTjGkxmsD0='
179 tk2 = 'example.com. IN DNSKEY 256 3 7 AwEAAaerI6CXvvG6U3UxkB0PXj+ORyGFtABYJ6JG3NL6w1KKlZl+73AS aPEEa7SXeuWmAWE1N3rsbnrMBvepBXkCbP609eoo2mJ8bsozT/NNwSSc FP1Ddw4wxpZAC/+/K736rF1HbI3ROS/rBTr7RW6rWzcyPbYFuUMVzrAM ZSJNJsTDcmyGc5Is3cFzNcrd3/Gmcjt8TKMmGq51HXWzFvxro7EH6aOl K6G4O4+mzaUKp91mg7DAVhX8yXnadXUZQ4yDfLzSleYQ2TroQqeSgI3X m/gUoACm3ELUOr84TmIKZ67X/zBTx8tHC5iBWY2tbIKqiJY7I4/aW4S4 NraCSRbDpbM='
180 tk1_rdata = ' '.join(tk1.split()[3:])
181 tk2_rdata = ' '.join(tk2.split()[3:])
182 tk_explicit = [(dns.name.from_text('example.com'), dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DNSKEY, tk1_rdata)),
183 (dns.name.from_text('example.com'), dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DNSKEY, tk2_rdata))]
184
185 args = ['example.com']
186 arghelper = GrokArgHelper(self.logger)
187 arghelper.build_parser('grok')
188 arghelper.parse_args(args)
189 arghelper.aggregate_trusted_key_info()
190 self.assertEqual(arghelper.trusted_keys, None)
191 arghelper.update_trusted_key_info()
192 self.assertEqual(arghelper.trusted_keys, [])
193
194 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as tk1_file:
195 tk1_file.write(tk1.encode('utf-8'))
196
197 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as tk2_file:
198 tk2_file.write(tk2.encode('utf-8'))
199
200 try:
201 args = ['-t', tk1_file.name, '-t', tk2_file.name, 'example.com']
202 arghelper = GrokArgHelper(self.logger)
203 arghelper.build_parser('grok')
204 arghelper.parse_args(args)
205 arghelper.aggregate_trusted_key_info()
206 arghelper.update_trusted_key_info()
207 self.assertEqual(arghelper.trusted_keys, tk_explicit)
208
209 args = ['-t', '/dev/null', 'example.com']
210 arghelper = GrokArgHelper(self.logger)
211 arghelper.build_parser('grok')
212 arghelper.parse_args(args)
213 arghelper.aggregate_trusted_key_info()
214 arghelper.update_trusted_key_info()
215 self.assertEqual(arghelper.trusted_keys, [])
216
217 finally:
218 for tmpfile in (tk1_file, tk2_file):
219 os.remove(tmpfile.name)
220
221 def test_option_combination_errors(self):
222
223 # Names file and command-line domain names are mutually exclusive
224 args = ['-f', '/dev/null', 'example.com']
225 arghelper = GrokArgHelper(self.logger)
226 arghelper.build_parser('grok')
227 arghelper.parse_args(args)
228 with self.assertRaises(argparse.ArgumentTypeError):
229 arghelper.check_args()
230
231 def test_log_level(self):
232
233 # Names file and command-line domain names are mutually exclusive
234 args = []
235 arghelper = GrokArgHelper(self.logger)
236 arghelper.build_parser('grok')
237 arghelper.parse_args(args)
238 arghelper.set_kwargs()
239 self.assertEqual(arghelper.log_level, logging.DEBUG)
240
241 args = ['-l', 'info']
242 arghelper = GrokArgHelper(self.logger)
243 arghelper.build_parser('grok')
244 arghelper.parse_args(args)
245 arghelper.set_kwargs()
246 self.assertEqual(arghelper.log_level, logging.INFO)
247
248 args = ['-l', 'warning']
249 arghelper = GrokArgHelper(self.logger)
250 arghelper.build_parser('grok')
251 arghelper.parse_args(args)
252 arghelper.set_kwargs()
253 self.assertEqual(arghelper.log_level, logging.WARNING)
254
255 args = ['-l', 'error']
256 arghelper = GrokArgHelper(self.logger)
257 arghelper.build_parser('grok')
258 arghelper.parse_args(args)
259 arghelper.set_kwargs()
260 self.assertEqual(arghelper.log_level, logging.ERROR)
261
262 if __name__ == '__main__':
263 unittest.main()
0 import gzip
1 import io
2 import os
3 import subprocess
4 import tempfile
5 import unittest
6
7 DATA_DIR = os.path.dirname(__file__)
8 EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
9 EXAMPLE_RECURSIVE = os.path.join(DATA_DIR, 'data', 'example-recursive.json.gz')
10 ROOT_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'root-authoritative.json.gz')
11 ROOT_RECURSIVE = os.path.join(DATA_DIR, 'data', 'root-recursive.json.gz')
12
13 class DNSGrokRunTestCase(unittest.TestCase):
14 def setUp(self):
15 self.devnull = io.open('/dev/null', 'wb')
16 self.current_cwd = os.getcwd()
17 self.dnsviz_bin = os.path.join(self.current_cwd, 'bin', 'dnsviz')
18
19 tk = 'example.com. IN DNSKEY 256 3 7 AwEAAZ2YEuBl4X58v1CezDfZjT1viYn5kY3MF3lSDjvHjMZ6gJlYt4Qq oIdpChifmeJldEX9/wPc04Tg7MlEfV3m0x2j80dMyObM0FZTxzMgbTFk Zs0AWrDXELieGkFZv1FB9YoxSX2XqvpFxwvPyyszUtCy/c5hrb6vfKRB Jh+qIO+NsNrl6O8NiYjWWNjdiFw+c2BxzpArQoaA+rcoyDYwH4xGpvTw YLnE9HmkwTSQuwASkgWgX3KgTmsDEw4I0P5Tk+wvmNnaqDhmFMHJK5Oh 92wUX+ppxxSgUx4UIJmftzi7sCg0qekIYUf99Dkn7OlC8X0rjj+xO4cD hbTjGkxmsD0='
20
21 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.tk_file:
22 self.tk_file.write(tk.encode('utf-8'))
23
24 with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
25 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_auth_out:
26 self.example_auth_out.write(example_auth_in.read())
27
28 with gzip.open(EXAMPLE_RECURSIVE, 'rb') as example_rec_in:
29 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_rec_out:
30 self.example_rec_out.write(example_rec_in.read())
31
32 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.names_file:
33 self.names_file.write('example.com\nexample.net\n'.encode('utf-8'))
34
35 self.output = tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False)
36 self.output.close()
37
38 self.run_cwd = tempfile.mkdtemp(prefix='dnsviz')
39
40 def tearDown(self):
41 self.devnull.close()
42 os.remove(self.tk_file.name)
43 os.remove(self.example_auth_out.name)
44 os.remove(self.example_rec_out.name)
45 os.remove(self.names_file.name)
46 os.remove(self.output.name)
47 subprocess.check_call(['rm', '-rf', self.run_cwd])
48
49 def test_dnsviz_grok_input(self):
50 with io.open(self.output.name, 'wb') as fh_out:
51 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
52 p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
53 p.communicate(fh_in.read())
54 self.assertEqual(p.returncode, 0)
55
56 with io.open(self.output.name, 'wb') as fh_out:
57 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
58 p = subprocess.Popen([self.dnsviz_bin, 'grok', '-r', '-'], stdin=subprocess.PIPE, stdout=fh_out)
59 p.communicate(fh_in.read())
60 self.assertEqual(p.returncode, 0)
61
62 with io.open(self.output.name, 'wb') as fh:
63 self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name], stdout=fh), 0)
64
65 def test_dnsviz_grok_names_input(self):
66 with io.open(self.output.name, 'wb') as fh:
67 self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-f', self.names_file.name], stdout=fh), 0)
68
69 with io.open(self.output.name, 'wb') as fh_out:
70 with io.open(self.names_file.name, 'rb') as fh_in:
71 p = subprocess.Popen([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-f', '-'], stdin=subprocess.PIPE, stdout=fh_out)
72 p.communicate(fh_in.read())
73 self.assertEqual(p.returncode, 0)
74
75 def test_dnsviz_grok_tk_input(self):
76 pass
77
78 #FIXME
79 #with io.open(self.output.name, 'wb') as fh:
80 # self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-t', self.tk_file.name], stdout=fh), 0)
81
82 #with io.open(self.output.name, 'wb') as fh_out:
83 # with io.open(self.tk_file.name, 'rb') as fh_in:
84 # p = subprocess.Popen([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-t', '-'], stdin=subprocess.PIPE, stdout=fh_out)
85 # p.communicate(fh_in.read())
86 # self.assertEqual(p.returncode, 0)
87
88 def test_dnsviz_grok_output(self):
89 with io.open(self.output.name, 'wb') as fh:
90 self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name], cwd=self.run_cwd, stdout=fh), 0)
91
92 with io.open(self.output.name, 'wb') as fh:
93 self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-o', '-'], cwd=self.run_cwd, stdout=fh), 0)
94
95 with io.open(self.output.name, 'wb') as fh:
96 self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-o', 'all.json'], cwd=self.run_cwd, stdout=fh), 0)
97 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'all.json')))
98
99 def test_dnsviz_grok_input_auth(self):
100 with io.open(self.output.name, 'wb') as fh_out:
101 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
102 p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
103 p.communicate(fh_in.read())
104 self.assertEqual(p.returncode, 0)
105
106 with io.open(self.output.name, 'wb') as fh_out:
107 with gzip.open(ROOT_AUTHORITATIVE) as fh_in:
108 p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
109 p.communicate(fh_in.read())
110 self.assertEqual(p.returncode, 0)
111
112 def test_dnsviz_grok_input_rec(self):
113 with io.open(self.output.name, 'wb') as fh_out:
114 with gzip.open(EXAMPLE_RECURSIVE) as fh_in:
115 p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
116 p.communicate(fh_in.read())
117 self.assertEqual(p.returncode, 0)
118
119 with io.open(self.output.name, 'wb') as fh_out:
120 with gzip.open(ROOT_RECURSIVE) as fh_in:
121 p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
122 p.communicate(fh_in.read())
123 self.assertEqual(p.returncode, 0)
124
125 if __name__ == '__main__':
126 unittest.main()
0 import argparse
1 import binascii
2 import datetime
3 import gzip
4 import importlib
5 import io
6 import logging
7 import os
8 import subprocess
9 import tempfile
10 import unittest
11
12 import dns.name, dns.rdatatype, dns.rrset, dns.zone
13
14 from dnsviz.format import utc
15 from dnsviz.util import get_default_trusted_keys
16
17 mod = importlib.import_module('dnsviz.commands.print')
18 PrintArgHelper = getattr(mod, 'PrintArgHelper')
19 AnalysisInputError = getattr(mod, 'AnalysisInputError')
20
21 DATA_DIR = os.path.dirname(__file__)
22 EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
23
24
25 class DNSVizPrintOptionsTestCase(unittest.TestCase):
26 def setUp(self):
27 self.logger = logging.getLogger()
28 for handler in self.logger.handlers:
29 self.logger.removeHandler(handler)
30 self.logger.addHandler(logging.NullHandler())
31
32 def test_rrtype_list(self):
33 arg1 = 'A,AAAA,MX,CNAME'
34 arg1_with_spaces = ' A , AAAA , MX , CNAME '
35 arg2 = 'A'
36 arg3 = 'A,BLAH'
37 arg4_empty = ''
38 arg4_empty_spaces = ' '
39
40 type_list1 = [dns.rdatatype.A, dns.rdatatype.AAAA, dns.rdatatype.MX, dns.rdatatype.CNAME]
41 type_list2 = [dns.rdatatype.A]
42 empty_list = []
43
44 self.assertEqual(PrintArgHelper.comma_separated_dns_types(arg1), type_list1)
45 self.assertEqual(PrintArgHelper.comma_separated_dns_types(arg1_with_spaces), type_list1)
46 self.assertEqual(PrintArgHelper.comma_separated_dns_types(arg2), type_list2)
47 self.assertEqual(PrintArgHelper.comma_separated_dns_types(arg4_empty), empty_list)
48 self.assertEqual(PrintArgHelper.comma_separated_dns_types(arg4_empty_spaces), empty_list)
49
50 # invalid schema
51 with self.assertRaises(argparse.ArgumentTypeError):
52 PrintArgHelper.comma_separated_dns_types(arg3)
53
54 def test_integer_list(self):
55 arg1 = '1,2,3,4,5'
56 arg1_with_spaces = ' 1 , 2 , 3 , 4 , 5 '
57 arg2 = '1'
58 arg3 = '1,A'
59 arg4_empty = ''
60 arg4_empty_spaces = ' '
61
62 int_list1 = [1,2,3,4,5]
63 int_list2 = [1]
64 empty_list = []
65
66 int_set1 = set([1,2,3,4,5])
67 int_set2 = set([1])
68 empty_set = set([])
69
70 self.assertEqual(PrintArgHelper.comma_separated_ints(arg1), int_list1)
71 self.assertEqual(PrintArgHelper.comma_separated_ints(arg1_with_spaces), int_list1)
72 self.assertEqual(PrintArgHelper.comma_separated_ints(arg2), int_list2)
73 self.assertEqual(PrintArgHelper.comma_separated_ints(arg4_empty), empty_list)
74 self.assertEqual(PrintArgHelper.comma_separated_ints(arg4_empty_spaces), empty_list)
75
76 self.assertEqual(PrintArgHelper.comma_separated_ints_set(arg1), int_set1)
77 self.assertEqual(PrintArgHelper.comma_separated_ints_set(arg1_with_spaces), int_set1)
78 self.assertEqual(PrintArgHelper.comma_separated_ints_set(arg2), int_set2)
79 self.assertEqual(PrintArgHelper.comma_separated_ints_set(arg4_empty), empty_set)
80 self.assertEqual(PrintArgHelper.comma_separated_ints_set(arg4_empty_spaces), empty_set)
81
82 # invalid schema
83 with self.assertRaises(argparse.ArgumentTypeError):
84 PrintArgHelper.comma_separated_ints(arg3)
85
86 def test_valid_domain_name(self):
87 arg1 = '.'
88 arg2 = 'www.example.com'
89 arg3 = 'www..example.com'
90
91 self.assertEqual(PrintArgHelper.valid_domain_name(arg1), dns.name.from_text(arg1))
92 self.assertEqual(PrintArgHelper.valid_domain_name(arg2), dns.name.from_text(arg2))
93
94 # invalid domain name
95 with self.assertRaises(argparse.ArgumentTypeError):
96 PrintArgHelper.valid_domain_name(arg3)
97
98 def test_ingest_input(self):
99 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_bad_json:
100 example_bad_json.write(b'{')
101
102 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_no_version:
103 example_no_version.write(b'{}')
104
105 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_invalid_version_1:
106 example_invalid_version_1.write(b'{ "_meta._dnsviz.": { "version": 1.11 } }')
107
108 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_invalid_version_2:
109 example_invalid_version_2.write(b'{ "_meta._dnsviz.": { "version": 5.0 } }')
110
111 with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
112 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_auth_out:
113 example_auth_out.write(example_auth_in.read())
114
115 try:
116 args = ['-r', example_auth_out.name]
117 arghelper = PrintArgHelper(self.logger)
118 arghelper.build_parser('print')
119 arghelper.parse_args(args)
120 arghelper.ingest_input()
121
122 # Bad json
123 args = ['-r', example_bad_json.name]
124 arghelper = PrintArgHelper(self.logger)
125 arghelper.build_parser('print')
126 arghelper.parse_args(args)
127 with self.assertRaises(AnalysisInputError):
128 arghelper.ingest_input()
129
130 # No version
131 args = ['-r', example_no_version.name]
132 arghelper = PrintArgHelper(self.logger)
133 arghelper.build_parser('print')
134 arghelper.parse_args(args)
135 with self.assertRaises(AnalysisInputError):
136 arghelper.ingest_input()
137
138 # Invalid version
139 args = ['-r', example_invalid_version_1.name]
140 arghelper = PrintArgHelper(self.logger)
141 arghelper.build_parser('print')
142 arghelper.parse_args(args)
143 with self.assertRaises(AnalysisInputError):
144 arghelper.ingest_input()
145
146 # Invalid version
147 args = ['-r', example_invalid_version_2.name]
148 arghelper = PrintArgHelper(self.logger)
149 arghelper.build_parser('print')
150 arghelper.parse_args(args)
151 with self.assertRaises(AnalysisInputError):
152 arghelper.ingest_input()
153
154 finally:
155 for tmpfile in (example_auth_out, example_bad_json, example_no_version, \
156 example_invalid_version_1, example_invalid_version_2):
157 os.remove(tmpfile.name)
158
159 def test_ingest_names(self):
160 args = ['example.com', 'example.net']
161 arghelper = PrintArgHelper(self.logger)
162 arghelper.build_parser('print')
163 arghelper.parse_args(args)
164 arghelper.ingest_names()
165 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net')])
166
167 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as names_file:
168 names_file.write(b'example.com\nexample.net\n')
169
170 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_names_only:
171 example_names_only.write(b'{ "_meta._dnsviz.": { "version": 1.2, "names": [ "example.com.", "example.net.", "example.org." ] } }')
172
173 try:
174 args = ['-f', names_file.name]
175 arghelper = PrintArgHelper(self.logger)
176 arghelper.build_parser('print')
177 arghelper.parse_args(args)
178 arghelper.ingest_names()
179 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net')])
180
181 args = ['-r', example_names_only.name]
182 arghelper = PrintArgHelper(self.logger)
183 arghelper.build_parser('print')
184 arghelper.parse_args(args)
185 arghelper.ingest_input()
186 arghelper.ingest_names()
187 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net'), dns.name.from_text('example.org')])
188
189 args = ['-r', example_names_only.name, 'example.com']
190 arghelper = PrintArgHelper(self.logger)
191 arghelper.build_parser('print')
192 arghelper.parse_args(args)
193 arghelper.ingest_input()
194 arghelper.ingest_names()
195 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com')])
196 finally:
197 for tmpfile in (names_file, example_names_only):
198 os.remove(tmpfile.name)
199
200 def test_trusted_keys_file(self):
201 tk1 = 'example.com. IN DNSKEY 256 3 7 AwEAAZ2YEuBl4X58v1CezDfZjT1viYn5kY3MF3lSDjvHjMZ6gJlYt4Qq oIdpChifmeJldEX9/wPc04Tg7MlEfV3m0x2j80dMyObM0FZTxzMgbTFk Zs0AWrDXELieGkFZv1FB9YoxSX2XqvpFxwvPyyszUtCy/c5hrb6vfKRB Jh+qIO+NsNrl6O8NiYjWWNjdiFw+c2BxzpArQoaA+rcoyDYwH4xGpvTw YLnE9HmkwTSQuwASkgWgX3KgTmsDEw4I0P5Tk+wvmNnaqDhmFMHJK5Oh 92wUX+ppxxSgUx4UIJmftzi7sCg0qekIYUf99Dkn7OlC8X0rjj+xO4cD hbTjGkxmsD0='
202 tk2 = 'example.com. IN DNSKEY 256 3 7 AwEAAaerI6CXvvG6U3UxkB0PXj+ORyGFtABYJ6JG3NL6w1KKlZl+73AS aPEEa7SXeuWmAWE1N3rsbnrMBvepBXkCbP609eoo2mJ8bsozT/NNwSSc FP1Ddw4wxpZAC/+/K736rF1HbI3ROS/rBTr7RW6rWzcyPbYFuUMVzrAM ZSJNJsTDcmyGc5Is3cFzNcrd3/Gmcjt8TKMmGq51HXWzFvxro7EH6aOl K6G4O4+mzaUKp91mg7DAVhX8yXnadXUZQ4yDfLzSleYQ2TroQqeSgI3X m/gUoACm3ELUOr84TmIKZ67X/zBTx8tHC5iBWY2tbIKqiJY7I4/aW4S4 NraCSRbDpbM='
203 tk1_rdata = ' '.join(tk1.split()[3:])
204 tk2_rdata = ' '.join(tk2.split()[3:])
205 tk_explicit = [(dns.name.from_text('example.com'), dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DNSKEY, tk1_rdata)),
206 (dns.name.from_text('example.com'), dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.DNSKEY, tk2_rdata))]
207
208 now = datetime.datetime.now(utc)
209 tk_default = get_default_trusted_keys(now)
210
211 args = ['example.com']
212 arghelper = PrintArgHelper(self.logger)
213 arghelper.build_parser('print')
214 arghelper.parse_args(args)
215 arghelper.aggregate_trusted_key_info()
216 self.assertEqual(arghelper.trusted_keys, None)
217 arghelper.update_trusted_key_info(now)
218 self.assertEqual(arghelper.trusted_keys, tk_default)
219
220 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as tk1_file:
221 tk1_file.write(tk1.encode('utf-8'))
222
223 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as tk2_file:
224 tk2_file.write(tk2.encode('utf-8'))
225
226 try:
227 args = ['-t', tk1_file.name, '-t', tk2_file.name, 'example.com']
228 arghelper = PrintArgHelper(self.logger)
229 arghelper.build_parser('print')
230 arghelper.parse_args(args)
231 arghelper.aggregate_trusted_key_info()
232 arghelper.update_trusted_key_info(now)
233 self.assertEqual(arghelper.trusted_keys, tk_explicit)
234
235 args = ['-t', '/dev/null', 'example.com']
236 arghelper = PrintArgHelper(self.logger)
237 arghelper.build_parser('print')
238 arghelper.parse_args(args)
239 arghelper.aggregate_trusted_key_info()
240 arghelper.update_trusted_key_info(now)
241 self.assertEqual(arghelper.trusted_keys, [])
242
243 finally:
244 for tmpfile in (tk1_file, tk2_file):
245 os.remove(tmpfile.name)
246
247 def test_option_combination_errors(self):
248
249 # Names file and command-line domain names are mutually exclusive
250 args = ['-f', '/dev/null', 'example.com']
251 arghelper = PrintArgHelper(self.logger)
252 arghelper.build_parser('print')
253 arghelper.parse_args(args)
254 with self.assertRaises(argparse.ArgumentTypeError):
255 arghelper.check_args()
256
257 # Names file and command-line domain names are mutually exclusive
258 args = ['-O', '-o', '/dev/null']
259 arghelper = PrintArgHelper(self.logger)
260 arghelper.build_parser('print')
261 arghelper.parse_args(args)
262 with self.assertRaises(argparse.ArgumentTypeError):
263 arghelper.check_args()
264
265 # But this is allowed
266 args = ['-o', '/dev/null']
267 arghelper = PrintArgHelper(self.logger)
268 arghelper.build_parser('print')
269 arghelper.parse_args(args)
270 arghelper.check_args()
271
272 # So is this
273 args = ['-O']
274 arghelper = PrintArgHelper(self.logger)
275 arghelper.build_parser('print')
276 arghelper.parse_args(args)
277 arghelper.check_args()
278
279 if __name__ == '__main__':
280 unittest.main()
0 import gzip
1 import io
2 import os
3 import subprocess
4 import tempfile
5 import unittest
6
7 DATA_DIR = os.path.dirname(__file__)
8 EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
9 EXAMPLE_RECURSIVE = os.path.join(DATA_DIR, 'data', 'example-recursive.json.gz')
10 ROOT_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'root-authoritative.json.gz')
11 ROOT_RECURSIVE = os.path.join(DATA_DIR, 'data', 'root-recursive.json.gz')
12
13 class DNSPrintRunTestCase(unittest.TestCase):
14 def setUp(self):
15 self.devnull = io.open('/dev/null', 'wb')
16 self.current_cwd = os.getcwd()
17 self.dnsviz_bin = os.path.join(self.current_cwd, 'bin', 'dnsviz')
18
19 tk = 'example.com. IN DNSKEY 256 3 7 AwEAAZ2YEuBl4X58v1CezDfZjT1viYn5kY3MF3lSDjvHjMZ6gJlYt4Qq oIdpChifmeJldEX9/wPc04Tg7MlEfV3m0x2j80dMyObM0FZTxzMgbTFk Zs0AWrDXELieGkFZv1FB9YoxSX2XqvpFxwvPyyszUtCy/c5hrb6vfKRB Jh+qIO+NsNrl6O8NiYjWWNjdiFw+c2BxzpArQoaA+rcoyDYwH4xGpvTw YLnE9HmkwTSQuwASkgWgX3KgTmsDEw4I0P5Tk+wvmNnaqDhmFMHJK5Oh 92wUX+ppxxSgUx4UIJmftzi7sCg0qekIYUf99Dkn7OlC8X0rjj+xO4cD hbTjGkxmsD0='
20
21 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.tk_file:
22 self.tk_file.write(tk.encode('utf-8'))
23
24 with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
25 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_auth_out:
26 self.example_auth_out.write(example_auth_in.read())
27
28 with gzip.open(EXAMPLE_RECURSIVE, 'rb') as example_rec_in:
29 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_rec_out:
30 self.example_rec_out.write(example_rec_in.read())
31
32 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.names_file:
33 self.names_file.write('example.com\nexample.net\n'.encode('utf-8'))
34
35 self.output = tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False)
36 self.output.close()
37
38 self.run_cwd = tempfile.mkdtemp(prefix='dnsviz')
39
40 def tearDown(self):
41 self.devnull.close()
42 os.remove(self.tk_file.name)
43 os.remove(self.example_auth_out.name)
44 os.remove(self.example_rec_out.name)
45 os.remove(self.names_file.name)
46 os.remove(self.output.name)
47 subprocess.check_call(['rm', '-rf', self.run_cwd])
48
49 def test_dnsviz_print_input(self):
50 with io.open(self.output.name, 'wb') as fh_out:
51 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
52 p = subprocess.Popen([self.dnsviz_bin, 'print'], stdin=subprocess.PIPE, stdout=fh_out)
53 p.communicate(fh_in.read())
54 self.assertEqual(p.returncode, 0)
55
56 with io.open(self.output.name, 'wb') as fh_out:
57 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
58 p = subprocess.Popen([self.dnsviz_bin, 'print', '-r', '-'], stdin=subprocess.PIPE, stdout=fh_out)
59 p.communicate(fh_in.read())
60 self.assertEqual(p.returncode, 0)
61
62 with io.open(self.output.name, 'wb') as fh:
63 self.assertEqual(subprocess.call([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name], stdout=fh), 0)
64
65 def test_dnsviz_print_names_input(self):
66 with io.open(self.output.name, 'wb') as fh:
67 self.assertEqual(subprocess.call([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name, '-f', self.names_file.name], stdout=fh), 0)
68
69 with io.open(self.output.name, 'wb') as fh_out:
70 with io.open(self.names_file.name, 'rb') as fh_in:
71 p = subprocess.Popen([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name, '-f', '-'], stdin=subprocess.PIPE, stdout=fh_out)
72 p.communicate(fh_in.read())
73 self.assertEqual(p.returncode, 0)
74
75 def test_dnsviz_print_tk_input(self):
76 with io.open(self.output.name, 'wb') as fh:
77 self.assertEqual(subprocess.call([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name, '-t', self.tk_file.name], stdout=fh), 0)
78
79 with io.open(self.output.name, 'wb') as fh_out:
80 with io.open(self.tk_file.name, 'rb') as fh_in:
81 p = subprocess.Popen([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name, '-t', '-'], stdin=subprocess.PIPE, stdout=fh_out)
82 p.communicate(fh_in.read())
83 self.assertEqual(p.returncode, 0)
84
85 def test_dnsviz_print_output(self):
86 with io.open(self.output.name, 'wb') as fh:
87 self.assertEqual(subprocess.call([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name], cwd=self.run_cwd, stdout=fh), 0)
88
89 with io.open(self.output.name, 'wb') as fh:
90 self.assertEqual(subprocess.call([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name, '-o', '-'], cwd=self.run_cwd, stdout=fh), 0)
91
92 with io.open(self.output.name, 'wb') as fh:
93 self.assertEqual(subprocess.call([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name, '-o', 'all.txt'], cwd=self.run_cwd, stdout=fh), 0)
94 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'all.txt')))
95 self.assertFalse(os.path.exists(os.path.join(self.run_cwd, 'example.com.txt')))
96 self.assertFalse(os.path.exists(os.path.join(self.run_cwd, 'example.net.txt')))
97
98 self.assertEqual(subprocess.call([self.dnsviz_bin, 'print', '-r', self.example_auth_out.name, '-O'], cwd=self.run_cwd), 0)
99 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'example.com.txt')))
100 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'example.net.txt')))
101
102 def test_dnsviz_print_input_auth(self):
103 with io.open(self.output.name, 'wb') as fh_out:
104 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
105 p = subprocess.Popen([self.dnsviz_bin, 'print'], stdin=subprocess.PIPE, stdout=fh_out)
106 p.communicate(fh_in.read())
107 self.assertEqual(p.returncode, 0)
108
109 with io.open(self.output.name, 'wb') as fh_out:
110 with gzip.open(ROOT_AUTHORITATIVE) as fh_in:
111 p = subprocess.Popen([self.dnsviz_bin, 'print'], stdin=subprocess.PIPE, stdout=fh_out)
112 p.communicate(fh_in.read())
113 self.assertEqual(p.returncode, 0)
114
115 def test_dnsviz_print_input_rec(self):
116 with io.open(self.output.name, 'wb') as fh_out:
117 with gzip.open(EXAMPLE_RECURSIVE) as fh_in:
118 p = subprocess.Popen([self.dnsviz_bin, 'print'], stdin=subprocess.PIPE, stdout=fh_out)
119 p.communicate(fh_in.read())
120 self.assertEqual(p.returncode, 0)
121
122 with io.open(self.output.name, 'wb') as fh_out:
123 with gzip.open(ROOT_RECURSIVE) as fh_in:
124 p = subprocess.Popen([self.dnsviz_bin, 'print'], stdin=subprocess.PIPE, stdout=fh_out)
125 p.communicate(fh_in.read())
126 self.assertEqual(p.returncode, 0)
127
128 if __name__ == '__main__':
129 unittest.main()
0 # -*- coding: utf-8 -*-
1
2 import argparse
3 import binascii
4 import gzip
5 import logging
6 import os
7 import subprocess
8 import tempfile
9 import unittest
10
11 import dns.name, dns.rdatatype, dns.rrset, dns.zone
12
13 from dnsviz.commands.probe import ZoneFileToServe, ArgHelper, DomainListArgHelper, StandardRecursiveQueryCD, WILDCARD_EXPLICIT_DELEGATION, AnalysisInputError, CustomQueryMixin
14 from dnsviz.ipaddr import IPAddr
15 from dnsviz import transport
16
17 DATA_DIR = os.path.dirname(__file__)
18 EXAMPLE_COM_ZONE = os.path.join(DATA_DIR, 'zone', 'example.com.zone')
19 EXAMPLE_COM_DELEGATION = os.path.join(DATA_DIR, 'zone', 'example.com.zone-delegation')
20 EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
21
22 class DNSVizProbeOptionsTestCase(unittest.TestCase):
23 def setUp(self):
24 self.helper = DomainListArgHelper()
25 self.logger = logging.getLogger()
26 for handler in self.logger.handlers:
27 self.logger.removeHandler(handler)
28 self.logger.addHandler(logging.NullHandler())
29 try:
30 ArgHelper.bindable_ip('::1')
31 except argparse.ArgumentTypeError:
32 self.use_ipv6 = False
33 else:
34 self.use_ipv6 = True
35 self.first_port = ZoneFileToServe._next_free_port
36 self.custom_query_mixin_edns_options_orig = CustomQueryMixin.edns_options[:]
37
38 def tearDown(self):
39 CustomQueryMixin.edns_options = self.custom_query_mixin_edns_options_orig[:]
40
41 def test_authoritative_option(self):
42 arg1 = 'example.com+:ns1.example.com=192.0.2.1:1234,ns1.example.com=[2001:db8::1],' + \
43 'ns1.example.com=192.0.2.2,ns2.example.com=[2001:db8::2],a.root-servers.net,192.0.2.3'
44
45 arg1_with_spaces = ' example.com+ : ns1.example.com = [192.0.2.1]:1234 , ns1.example.com = [2001:db8::1], ' + \
46 'ns1.example.com = [192.0.2.2] , ns2.example.com = [2001:db8::2] , a.root-servers.net , 192.0.2.3 '
47
48 arg2 = 'example.com:ns1.example.com=192.0.2.1'
49
50 arg3 = 'example.com:%s' % EXAMPLE_COM_ZONE
51
52 arg4 = 'example.com+:%s' % EXAMPLE_COM_ZONE
53
54 delegation_mapping1 = {
55 (dns.name.from_text('example.com'), dns.rdatatype.NS):
56 dns.rrset.from_text_list(dns.name.from_text('example.com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
57 ['ns1.example.com', 'ns2.example.com', 'a.root-servers.net', 'ns1._dnsviz.example.com']),
58 (dns.name.from_text('ns1.example.com'), dns.rdatatype.A):
59 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.A,
60 ['192.0.2.1', '192.0.2.2']),
61 (dns.name.from_text('ns1.example.com'), dns.rdatatype.AAAA):
62 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
63 ['2001:db8::1']),
64 (dns.name.from_text('ns1._dnsviz.example.com'), dns.rdatatype.A):
65 dns.rrset.from_text_list(dns.name.from_text('ns1._dnsviz.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.A,
66 ['192.0.2.3']),
67 (dns.name.from_text('ns2.example.com'), dns.rdatatype.AAAA):
68 dns.rrset.from_text_list(dns.name.from_text('ns2.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
69 ['2001:db8::2']),
70 (dns.name.from_text('a.root-servers.net'), dns.rdatatype.A):
71 dns.rrset.from_text_list(dns.name.from_text('a.root-servers.net'), 0, dns.rdataclass.IN, dns.rdatatype.A,
72 ['198.41.0.4']),
73 (dns.name.from_text('a.root-servers.net'), dns.rdatatype.AAAA):
74 dns.rrset.from_text_list(dns.name.from_text('a.root-servers.net'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
75 ['2001:503:ba3e::2:30'])
76 }
77 stop_at1 = True
78 odd_ports1 = { (dns.name.from_text('example.com'), IPAddr('192.0.2.1')): 1234 }
79 zone_filename1 = None
80
81 delegation_mapping2 = {
82 (dns.name.from_text('example.com'), dns.rdatatype.NS):
83 dns.rrset.from_text_list(dns.name.from_text('example.com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
84 ['ns1.example.com']),
85 (dns.name.from_text('ns1.example.com'), dns.rdatatype.A):
86 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.A,
87 ['192.0.2.1'])
88 }
89 stop_at2 = False
90 odd_ports2 = {}
91 zone_filename2 = None
92
93 delegation_mapping3 = {
94 (dns.name.from_text('example.com'), dns.rdatatype.NS):
95 dns.rrset.from_text_list(dns.name.from_text('example.com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
96 []),
97 }
98 stop_at3 = False
99 odd_ports3 = {}
100 zone_filename3 = EXAMPLE_COM_ZONE
101
102 delegation_mapping4 = {
103 (dns.name.from_text('example.com'), dns.rdatatype.NS):
104 dns.rrset.from_text_list(dns.name.from_text('example.com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
105 []),
106 }
107 stop_at4 = True
108 odd_ports4 = {}
109 zone_filename4 = EXAMPLE_COM_ZONE
110
111 obj = self.helper.authoritative_name_server_mappings(arg1)
112 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
113 self.assertEqual(obj.delegation_mapping, delegation_mapping1)
114 self.assertEqual(obj.stop_at, stop_at1)
115 self.assertEqual(obj.odd_ports, odd_ports1)
116 self.assertEqual(obj.filename, zone_filename1)
117
118 obj = self.helper.authoritative_name_server_mappings(arg1_with_spaces)
119 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
120 self.assertEqual(obj.delegation_mapping, delegation_mapping1)
121 self.assertEqual(obj.stop_at, stop_at1)
122 self.assertEqual(obj.odd_ports, odd_ports1)
123 self.assertEqual(obj.filename, zone_filename1)
124
125 obj = self.helper.authoritative_name_server_mappings(arg2)
126 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
127 self.assertEqual(obj.delegation_mapping, delegation_mapping2)
128 self.assertEqual(obj.stop_at, stop_at2)
129 self.assertEqual(obj.odd_ports, odd_ports2)
130 self.assertEqual(obj.filename, zone_filename2)
131
132 obj = self.helper.authoritative_name_server_mappings(arg3)
133 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
134 self.assertEqual(obj.delegation_mapping, delegation_mapping3)
135 self.assertEqual(obj.stop_at, stop_at3)
136 self.assertEqual(obj.odd_ports, odd_ports3)
137 self.assertEqual(obj.filename, zone_filename3)
138
139 obj = self.helper.authoritative_name_server_mappings(arg4)
140 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
141 self.assertEqual(obj.delegation_mapping, delegation_mapping4)
142 self.assertEqual(obj.stop_at, stop_at4)
143 self.assertEqual(obj.odd_ports, odd_ports4)
144 self.assertEqual(obj.filename, zone_filename4)
145
146 def test_authoritative_errors(self):
147 # no mapping
148 arg = 'example.com'
149 with self.assertRaises(argparse.ArgumentTypeError):
150 self.helper.authoritative_name_server_mappings(arg)
151
152 # bad domain name
153 arg = 'example.com:ns1..foo.com'
154 with self.assertRaises(argparse.ArgumentTypeError):
155 self.helper.authoritative_name_server_mappings(arg)
156
157 # bad IPv4 address
158 arg = 'example.com:ns1.foo.com=192'
159 with self.assertRaises(argparse.ArgumentTypeError):
160 self.helper.authoritative_name_server_mappings(arg)
161
162 # Bad IPv6 address
163 arg = 'example.com:ns1.foo.com=2001:db8'
164 with self.assertRaises(argparse.ArgumentTypeError):
165 self.helper.authoritative_name_server_mappings(arg)
166
167 # IPv6 address needs brackets (IP valid even with port stripped)
168 arg = 'example.com:ns1.foo.com=2001:db8::1:3'
169 with self.assertRaises(argparse.ArgumentTypeError):
170 self.helper.authoritative_name_server_mappings(arg)
171
172 # IPv6 address needs brackets (IP invalid with port stripped)
173 arg = 'example.com:ns1.foo.com=2001:db8::3'
174 with self.assertRaises(argparse.ArgumentTypeError):
175 self.helper.authoritative_name_server_mappings(arg)
176
177 # Name does not resolve properly
178 arg = 'example.com:ns1.does-not-exist-foo-bar-baz-123-abc-dnsviz.net'
179 with self.assertRaises(argparse.ArgumentTypeError):
180 self.helper.authoritative_name_server_mappings(arg)
181
182 def test_delegation_option(self):
183 arg1 = 'example.com:ns1.example.com=192.0.2.1:1234,ns1.example.com=[2001:db8::1],' + \
184 'ns1.example.com=192.0.2.2,ns2.example.com=[2001:db8::2]'
185
186 arg1_with_spaces = ' example.com : ns1.example.com = [192.0.2.1]:1234 , ns1.example.com = [2001:db8::1], ' + \
187 'ns1.example.com = [192.0.2.2] , ns2.example.com = [2001:db8::2] '
188
189 arg2 = 'example.com:%s' % EXAMPLE_COM_DELEGATION
190
191 delegation_mapping1 = {
192 (dns.name.from_text('example.com'), dns.rdatatype.NS):
193 dns.rrset.from_text_list(dns.name.from_text('example.com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
194 ['ns1.example.com', 'ns2.example.com']),
195 (dns.name.from_text('ns1.example.com'), dns.rdatatype.A):
196 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.A,
197 ['192.0.2.1', '192.0.2.2']),
198 (dns.name.from_text('ns1.example.com'), dns.rdatatype.AAAA):
199 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
200 ['2001:db8::1']),
201 (dns.name.from_text('ns2.example.com'), dns.rdatatype.AAAA):
202 dns.rrset.from_text_list(dns.name.from_text('ns2.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
203 ['2001:db8::2']),
204 }
205 stop_at1 = False
206 odd_ports1 = { (dns.name.from_text('example.com'), IPAddr('192.0.2.1')): 1234 }
207
208 delegation_mapping2 = {
209 (dns.name.from_text('example.com'), dns.rdatatype.NS):
210 dns.rrset.from_text_list(dns.name.from_text('example.com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
211 ['ns1.example.com']),
212 (dns.name.from_text('ns1.example.com'), dns.rdatatype.A):
213 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.A,
214 ['127.0.0.1'])
215 }
216 stop_at2 = False
217 odd_ports2 = {}
218
219 obj = self.helper.delegation_name_server_mappings(arg1)
220 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
221 self.assertEqual(obj.delegation_mapping, delegation_mapping1)
222 self.assertEqual(obj.stop_at, stop_at1)
223 self.assertEqual(obj.odd_ports, odd_ports1)
224
225 obj = self.helper.delegation_name_server_mappings(arg1_with_spaces)
226 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
227 self.assertEqual(obj.delegation_mapping, delegation_mapping1)
228 self.assertEqual(obj.stop_at, stop_at1)
229 self.assertEqual(obj.odd_ports, odd_ports1)
230
231 obj = self.helper.delegation_name_server_mappings(arg2)
232 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
233 self.assertEqual(obj.delegation_mapping, delegation_mapping2)
234 self.assertEqual(obj.stop_at, stop_at2)
235 self.assertEqual(obj.odd_ports, odd_ports2)
236
237 def test_delegation_errors(self):
238 # all the authoritative error tests as well
239
240 # requires name=addr mapping
241 arg = 'example.com:ns1.example.com'
242 with self.assertRaises(argparse.ArgumentTypeError):
243 self.helper.delegation_name_server_mappings(arg)
244
245 # requires name=addr mapping
246 arg = 'example.com:192.0.2.1'
247 with self.assertRaises(argparse.ArgumentTypeError):
248 self.helper.delegation_name_server_mappings(arg)
249
250 # doesn't allow +
251 arg = 'example.com+:ns1.example.com=192.0.2.1'
252 with self.assertRaises(argparse.ArgumentTypeError):
253 self.helper.delegation_name_server_mappings(arg)
254
255 # can't do this for root domain
256 arg = '.:ns1.example.com=192.0.2.1'
257 with self.assertRaises(argparse.ArgumentTypeError):
258 self.helper.delegation_name_server_mappings(arg)
259
260 def test_recursive_option(self):
261 arg1 = 'ns1.example.com=192.0.2.1:1234,ns1.example.com=[2001:db8::1],' + \
262 'ns1.example.com=192.0.2.2,ns2.example.com=[2001:db8::2],a.root-servers.net'
263
264 arg1_with_spaces = ' ns1.example.com = [192.0.2.1]:1234 , ns1.example.com = [2001:db8::1], ' + \
265 'ns1.example.com = [192.0.2.2] , ns2.example.com = [2001:db8::2] , a.root-servers.net '
266
267 delegation_mapping1 = {
268 (WILDCARD_EXPLICIT_DELEGATION, dns.rdatatype.NS):
269 dns.rrset.from_text_list(WILDCARD_EXPLICIT_DELEGATION, 0, dns.rdataclass.IN, dns.rdatatype.NS,
270 ['ns1.example.com', 'ns2.example.com', 'a.root-servers.net']),
271 (dns.name.from_text('ns1.example.com'), dns.rdatatype.A):
272 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.A,
273 ['192.0.2.1', '192.0.2.2']),
274 (dns.name.from_text('ns1.example.com'), dns.rdatatype.AAAA):
275 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
276 ['2001:db8::1']),
277 (dns.name.from_text('ns2.example.com'), dns.rdatatype.AAAA):
278 dns.rrset.from_text_list(dns.name.from_text('ns2.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
279 ['2001:db8::2']),
280 (dns.name.from_text('a.root-servers.net'), dns.rdatatype.A):
281 dns.rrset.from_text_list(dns.name.from_text('a.root-servers.net'), 0, dns.rdataclass.IN, dns.rdatatype.A,
282 ['198.41.0.4']),
283 (dns.name.from_text('a.root-servers.net'), dns.rdatatype.AAAA):
284 dns.rrset.from_text_list(dns.name.from_text('a.root-servers.net'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
285 ['2001:503:ba3e::2:30'])
286 }
287 stop_at1 = False
288 odd_ports1 = { (WILDCARD_EXPLICIT_DELEGATION, IPAddr('192.0.2.1')): 1234 }
289
290 obj = self.helper.recursive_servers_for_domain(arg1)
291 self.assertEqual(obj.domain, WILDCARD_EXPLICIT_DELEGATION)
292 self.assertEqual(obj.delegation_mapping, delegation_mapping1)
293 self.assertEqual(obj.stop_at, stop_at1)
294 self.assertEqual(obj.odd_ports, odd_ports1)
295
296 obj = self.helper.recursive_servers_for_domain(arg1_with_spaces)
297 self.assertEqual(obj.domain, WILDCARD_EXPLICIT_DELEGATION)
298 self.assertEqual(obj.delegation_mapping, delegation_mapping1)
299 self.assertEqual(obj.stop_at, stop_at1)
300 self.assertEqual(obj.odd_ports, odd_ports1)
301
302 def test_recursive_errors(self):
303 # all the authoritative error tests as well
304
305 # doesn't accept file
306 arg = EXAMPLE_COM_DELEGATION
307 with self.assertRaises(argparse.ArgumentTypeError):
308 self.helper.recursive_servers_for_domain(arg)
309
310 def test_ds_option(self):
311 arg1 = 'example.com:34983 10 1 EC358CFAAEC12266EF5ACFC1FEAF2CAFF083C418,' + \
312 '34983 10 2 608D3B089D79D554A1947BD10BEC0A5B1BDBE67B4E60E34B1432ED00 33F24B49'
313
314 delegation_mapping1 = {
315 (dns.name.from_text('example.com'), dns.rdatatype.DS):
316 dns.rrset.from_text_list(dns.name.from_text('example.com'), 0, dns.rdataclass.IN, dns.rdatatype.DS,
317 ['34983 10 1 EC358CFAAEC12266EF5ACFC1FEAF2CAFF083C418',
318 '34983 10 2 608D3B089D79D554A1947BD10BEC0A5B1BDBE67B4E60E34B1432ED00 33F24B49'])
319 }
320
321 arg1_with_spaces = ' example.com : 34983 10 1 EC358CFAAEC12266EF5ACFC1FEAF2CAFF083C418, ' + \
322 ' 34983 10 2 608D3B089D79D554A1947BD10BEC0A5B1BDBE67B4E60E34B1432ED00 33F24B49 '
323
324 arg2 = 'example.com:%s' % EXAMPLE_COM_DELEGATION
325
326 delegation_mapping2 = {
327 (dns.name.from_text('example.com'), dns.rdatatype.DS):
328 dns.rrset.from_text_list(dns.name.from_text('example.com'), 0, dns.rdataclass.IN, dns.rdatatype.DS,
329 ['34983 10 1 EC358CFAAEC12266EF5ACFC1FEAF2CAFF083C418',
330 '34983 10 2 608D3B089D79D554A1947BD10BEC0A5B1BDBE67B4E60E34B1432ED00 33F24B49'])
331 }
332
333
334 obj = self.helper.ds_for_domain(arg1)
335 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
336 self.assertEqual(obj.delegation_mapping, delegation_mapping1)
337
338 obj = self.helper.ds_for_domain(arg1_with_spaces)
339 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
340 self.assertEqual(obj.delegation_mapping, delegation_mapping1)
341
342 obj = self.helper.ds_for_domain(arg2)
343 self.assertEqual(obj.domain, dns.name.from_text('example.com'))
344 self.assertEqual(obj.delegation_mapping, delegation_mapping2)
345
346 def test_ds_error(self):
347 # bad DS record
348 arg = 'example.com:blah'
349 with self.assertRaises(argparse.ArgumentTypeError):
350 obj = self.helper.ds_for_domain(arg)
351
352 def test_positive_int(self):
353 self.assertEqual(ArgHelper.positive_int('1'), 1)
354 self.assertEqual(ArgHelper.positive_int('2'), 2)
355
356 # zero
357 with self.assertRaises(argparse.ArgumentTypeError):
358 ArgHelper.positive_int('0')
359
360 # negative
361 with self.assertRaises(argparse.ArgumentTypeError):
362 ArgHelper.positive_int('-1')
363
364 def test_bindable_ip(self):
365 self.assertEqual(ArgHelper.bindable_ip('127.0.0.1'), IPAddr('127.0.0.1'))
366 if self.use_ipv6:
367 self.assertEqual(ArgHelper.bindable_ip('::1'), IPAddr('::1'))
368
369 # invalid IPv4 address
370 with self.assertRaises(argparse.ArgumentTypeError):
371 ArgHelper.bindable_ip('192.')
372
373 # invalid IPv6 address
374 with self.assertRaises(argparse.ArgumentTypeError):
375 ArgHelper.bindable_ip('2001:')
376
377 # invalid IPv4 to bind to
378 with self.assertRaises(argparse.ArgumentTypeError):
379 ArgHelper.bindable_ip('192.0.2.1')
380
381 # invalid IPv6 to bind to
382 with self.assertRaises(argparse.ArgumentTypeError):
383 ArgHelper.bindable_ip('2001:db8::1')
384
385 def test_valid_url(self):
386 url1 = 'http://www.example.com/foo'
387 url2 = 'https://www.example.com/foo'
388 url3 = 'ws:///path/to/file'
389 url4 = 'ssh://user@example.com/foo'
390
391 self.assertEqual(ArgHelper.valid_url(url1), url1)
392 self.assertEqual(ArgHelper.valid_url(url2), url2)
393 self.assertEqual(ArgHelper.valid_url(url3), url3)
394 self.assertEqual(ArgHelper.valid_url(url4), url4)
395
396 # invalid schema
397 with self.assertRaises(argparse.ArgumentTypeError):
398 ArgHelper.valid_url('ftp://www.example.com/foo')
399
400 # ws with hostname
401 with self.assertRaises(argparse.ArgumentTypeError):
402 ArgHelper.valid_url('ws://www.example.com/foo')
403
404 def test_rrtype_list(self):
405 arg1 = 'A,AAAA,MX,CNAME'
406 arg1_with_spaces = ' A , AAAA , MX , CNAME '
407 arg2 = 'A'
408 arg3 = 'A,BLAH'
409 arg4_empty = ''
410 arg4_empty_spaces = ' '
411
412 type_list1 = [dns.rdatatype.A, dns.rdatatype.AAAA, dns.rdatatype.MX, dns.rdatatype.CNAME]
413 type_list2 = [dns.rdatatype.A]
414 empty_list = []
415
416 self.assertEqual(ArgHelper.comma_separated_dns_types(arg1), type_list1)
417 self.assertEqual(ArgHelper.comma_separated_dns_types(arg1_with_spaces), type_list1)
418 self.assertEqual(ArgHelper.comma_separated_dns_types(arg4_empty), empty_list)
419 self.assertEqual(ArgHelper.comma_separated_dns_types(arg4_empty_spaces), empty_list)
420
421 # invalid schema
422 with self.assertRaises(argparse.ArgumentTypeError):
423 ArgHelper.comma_separated_dns_types(arg3)
424
425 def test_valid_domain_name(self):
426 arg1 = '.'
427 arg2 = 'www.example.com'
428 arg3 = 'www..example.com'
429
430 self.assertEqual(ArgHelper.valid_domain_name(arg1), dns.name.from_text(arg1))
431 self.assertEqual(ArgHelper.valid_domain_name(arg2), dns.name.from_text(arg2))
432
433 # invalid domain name
434 with self.assertRaises(argparse.ArgumentTypeError):
435 ArgHelper.valid_domain_name(arg3)
436
437 def test_nsid_option(self):
438 self.assertEqual(ArgHelper.nsid_option(), dns.edns.GenericOption(3, b''))
439
440 def test_ecs_option(self):
441 arg1 = '192.0.2.0'
442 arg2 = '192.0.2.0/25'
443 arg3 = '192.0.2.255/25'
444 arg4 = '192.0.2.0/24'
445 arg5 = '2001:db8::'
446 arg6 = '2001:db8::/121'
447 arg7 = '2001:db8::ff/121'
448 arg8 = '2001:db8::/120'
449
450
451 ecs_option1 = dns.edns.GenericOption(8, binascii.unhexlify('00012000c0000200'))
452 ecs_option2 = dns.edns.GenericOption(8, binascii.unhexlify('00011900c0000200'))
453 ecs_option3 = dns.edns.GenericOption(8, binascii.unhexlify('00011900c0000280'))
454 ecs_option4 = dns.edns.GenericOption(8, binascii.unhexlify('00011800c00002'))
455 ecs_option5 = dns.edns.GenericOption(8, binascii.unhexlify('0002800020010db8000000000000000000000000'))
456 ecs_option6 = dns.edns.GenericOption(8, binascii.unhexlify('0002790020010db8000000000000000000000000'))
457 ecs_option7 = dns.edns.GenericOption(8, binascii.unhexlify('0002790020010db8000000000000000000000080'))
458 ecs_option8 = dns.edns.GenericOption(8, binascii.unhexlify('0002780020010db80000000000000000000000'))
459
460 self.assertEqual(ArgHelper.ecs_option(arg1), ecs_option1)
461 self.assertEqual(ArgHelper.ecs_option(arg2), ecs_option2)
462 self.assertEqual(ArgHelper.ecs_option(arg3), ecs_option3)
463 self.assertEqual(ArgHelper.ecs_option(arg4), ecs_option4)
464 self.assertEqual(ArgHelper.ecs_option(arg5), ecs_option5)
465 self.assertEqual(ArgHelper.ecs_option(arg6), ecs_option6)
466 self.assertEqual(ArgHelper.ecs_option(arg7), ecs_option7)
467 self.assertEqual(ArgHelper.ecs_option(arg8), ecs_option8)
468
469 # invalid IP address
470 with self.assertRaises(argparse.ArgumentTypeError):
471 ArgHelper.ecs_option('192')
472
473 # invalid length
474 with self.assertRaises(argparse.ArgumentTypeError):
475 ArgHelper.ecs_option('192.0.2.0/foo')
476
477 # invalid length
478 with self.assertRaises(argparse.ArgumentTypeError):
479 ArgHelper.ecs_option('192.0.2.0/33')
480
481 # invalid length
482 with self.assertRaises(argparse.ArgumentTypeError):
483 ArgHelper.ecs_option('2001:db8::/129')
484
485 def test_cookie_option(self):
486 arg1 = '0102030405060708'
487 arg2 = ''
488
489 cookie_option1 = dns.edns.GenericOption(10, binascii.unhexlify('0102030405060708'))
490 cookie_option2 = None
491
492 self.assertEqual(ArgHelper.dns_cookie_option(arg1), cookie_option1)
493 self.assertEqual(ArgHelper.dns_cookie_option(arg2), None)
494
495 self.assertIsInstance(ArgHelper.dns_cookie_rand(), dns.edns.GenericOption)
496
497 # too short
498 with self.assertRaises(argparse.ArgumentTypeError):
499 ArgHelper.dns_cookie_option('01')
500
501 # too long
502 with self.assertRaises(argparse.ArgumentTypeError):
503 ArgHelper.dns_cookie_option('010203040506070809')
504
505 # non-hexadecimal
506 with self.assertRaises(argparse.ArgumentTypeError):
507 ArgHelper.dns_cookie_option('010203040506070h')
508
509 def test_delegation_aggregation(self):
510 args1 = ['-A', '-N', 'example.com:ns1.example.com=192.0.2.1,ns1.example.com=[2001:db8::1]',
511 '-N', 'example.com:ns1.example.com=192.0.2.4',
512 '-N', 'example.com:ns2.example.com=192.0.2.2',
513 '-N', 'example.com:ns3.example.com=192.0.2.3']
514 args2 = ['-A', '-N', 'example.com:ns1.example.com=192.0.2.1',
515 '-D', 'example.com:34983 10 1 EC358CFAAEC12266EF5ACFC1FEAF2CAFF083C418',
516 '-D', 'example.com:34983 10 2 608D3B089D79D554A1947BD10BEC0A5B1BDBE67B4E60E34B1432ED00 33F24B49']
517 args3 = ['-A', '-N', 'example.com:ns1.example.com=192.0.2.1',
518 '-N', 'example1.com:ns1.example1.com=192.0.2.2']
519 args4 = ['-A', '-N', 'example.com:ns1.example.com=192.0.2.1',
520 '-N', 'example.net:ns1.example.net=192.0.2.2']
521
522 explicit_delegations1 = {
523 (dns.name.from_text('com'), dns.rdatatype.NS):
524 dns.rrset.from_text_list(dns.name.from_text('com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
525 ['localhost']),
526 }
527 explicit_delegations2 = {
528 (dns.name.from_text('com'), dns.rdatatype.NS):
529 dns.rrset.from_text_list(dns.name.from_text('com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
530 ['localhost']),
531 }
532 explicit_delegations3 = {
533 (dns.name.from_text('com'), dns.rdatatype.NS):
534 dns.rrset.from_text_list(dns.name.from_text('com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
535 ['localhost']),
536 }
537 explicit_delegations4 = {
538 (dns.name.from_text('com'), dns.rdatatype.NS):
539 dns.rrset.from_text_list(dns.name.from_text('com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
540 ['localhost']),
541 (dns.name.from_text('net'), dns.rdatatype.NS):
542 dns.rrset.from_text_list(dns.name.from_text('net'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
543 ['localhost']),
544 }
545
546 for ex in (explicit_delegations1, explicit_delegations2, explicit_delegations3, explicit_delegations4):
547 if self.use_ipv6:
548 ex[(dns.name.from_text('localhost'), dns.rdatatype.AAAA)] = \
549 dns.rrset.from_text_list(dns.name.from_text('localhost'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
550 ['::1'])
551 loopback_ip = IPAddr('::1')
552 else:
553 ex[(dns.name.from_text('localhost'), dns.rdatatype.A)] = \
554 dns.rrset.from_text_list(dns.name.from_text('localhost'), 0, dns.rdataclass.IN, dns.rdatatype.A,
555 ['127.0.0.1'])
556 loopback_ip = IPAddr('127.0.0.1')
557
558 odd_ports1 = { (dns.name.from_text('com'), loopback_ip): self.first_port }
559 odd_ports2 = { (dns.name.from_text('com'), loopback_ip): self.first_port }
560 odd_ports3 = { (dns.name.from_text('com'), loopback_ip): self.first_port }
561 odd_ports4 = {
562 (dns.name.from_text('com'), loopback_ip): self.first_port,
563 (dns.name.from_text('net'), loopback_ip): self.first_port + 1,
564 }
565
566 if self.use_ipv6:
567 rdata = b'AAAA ::1'
568 else:
569 rdata = b'A 127.0.0.1'
570
571 zone_contents1 = b'''@ 600 IN SOA localhost. root.localhost. 1 1800 900 86400 600
572 @ 600 IN NS @
573 @ 600 IN ''' + rdata + \
574 b'''
575 example 0 IN NS ns1.example
576 example 0 IN NS ns2.example
577 example 0 IN NS ns3.example
578 ns1.example 0 IN A 192.0.2.1
579 ns1.example 0 IN A 192.0.2.4
580 ns1.example 0 IN AAAA 2001:db8::1
581 ns2.example 0 IN A 192.0.2.2
582 ns3.example 0 IN A 192.0.2.3
583 '''
584 zone_contents2 = b'''@ 600 IN SOA localhost. root.localhost. 1 1800 900 86400 600
585 @ 600 IN NS @
586 @ 600 IN ''' + rdata + \
587 b'''
588 example 0 IN DS 34983 10 1 ec358cfaaec12266ef5acfc1feaf2caff083c418
589 example 0 IN DS 34983 10 2 608d3b089d79d554a1947bd10bec0a5b1bdbe67b4e60e34b1432ed0033f24b49
590 example 0 IN NS ns1.example
591 ns1.example 0 IN A 192.0.2.1
592 '''
593
594 ZoneFileToServe._next_free_port = self.first_port
595
596 arghelper1 = ArgHelper(self.logger)
597 arghelper1.build_parser('probe')
598 arghelper1.parse_args(args1)
599 arghelper1.aggregate_delegation_info()
600 zone_to_serve = arghelper1._zones_to_serve[0]
601 zone_obj = dns.zone.from_file(zone_to_serve.filename, dns.name.from_text('com'))
602 zone_obj_other = dns.zone.from_text(zone_contents1, dns.name.from_text('com'))
603 self.assertEqual(zone_obj, zone_obj_other)
604 self.assertEqual(arghelper1.explicit_delegations, explicit_delegations1)
605 self.assertEqual(arghelper1.odd_ports, odd_ports1)
606
607 ZoneFileToServe._next_free_port = self.first_port
608
609 arghelper2 = ArgHelper(self.logger)
610 arghelper2.build_parser('probe')
611 arghelper2.parse_args(args2)
612 arghelper2.aggregate_delegation_info()
613 zone_to_serve = arghelper2._zones_to_serve[0]
614 zone_obj = dns.zone.from_file(zone_to_serve.filename, dns.name.from_text('com'))
615 zone_obj_other = dns.zone.from_text(zone_contents2, dns.name.from_text('com'))
616 self.assertEqual(zone_obj, zone_obj_other)
617 self.assertEqual(arghelper2.explicit_delegations, explicit_delegations2)
618 self.assertEqual(arghelper2.odd_ports, odd_ports2)
619
620 ZoneFileToServe._next_free_port = self.first_port
621
622 arghelper3 = ArgHelper(self.logger)
623 arghelper3.build_parser('probe')
624 arghelper3.parse_args(args3)
625 arghelper3.aggregate_delegation_info()
626 self.assertEqual(arghelper3.explicit_delegations, explicit_delegations3)
627 self.assertEqual(arghelper3.odd_ports, odd_ports3)
628
629 ZoneFileToServe._next_free_port = self.first_port
630
631 arghelper4 = ArgHelper(self.logger)
632 arghelper4.build_parser('probe')
633 arghelper4.parse_args(args4)
634 arghelper4.aggregate_delegation_info()
635 self.assertEqual(arghelper4.explicit_delegations, explicit_delegations4)
636 self.assertEqual(arghelper4.odd_ports, odd_ports4)
637
638 def test_delegation_authoritative_aggregation(self):
639 args1 = ['-A', '-N', 'example.com:ns1.example.com=192.0.2.1,ns1.example.com=[2001:db8::1]',
640 '-x', 'foo.com:ns1.foo.com=192.0.2.3:50503']
641
642 explicit_delegations1 = {
643 (dns.name.from_text('com'), dns.rdatatype.NS):
644 dns.rrset.from_text_list(dns.name.from_text('com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
645 ['localhost']),
646 (dns.name.from_text('foo.com'), dns.rdatatype.NS):
647 dns.rrset.from_text_list(dns.name.from_text('foo.com'), 0, dns.rdataclass.IN, dns.rdatatype.NS,
648 ['ns1.foo.com']),
649 (dns.name.from_text('ns1.foo.com'), dns.rdatatype.A):
650 dns.rrset.from_text_list(dns.name.from_text('ns1.foo.com'), 0, dns.rdataclass.IN, dns.rdatatype.A,
651 ['192.0.2.3']),
652 }
653
654 for ex in (explicit_delegations1,):
655 if self.use_ipv6:
656 ex[(dns.name.from_text('localhost'), dns.rdatatype.AAAA)] = \
657 dns.rrset.from_text_list(dns.name.from_text('localhost'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
658 ['::1'])
659 loopback_ip = IPAddr('::1')
660 else:
661 ex[(dns.name.from_text('localhost'), dns.rdatatype.A)] = \
662 dns.rrset.from_text_list(dns.name.from_text('localhost'), 0, dns.rdataclass.IN, dns.rdatatype.A,
663 ['127.0.0.1'])
664 loopback_ip = IPAddr('127.0.0.1')
665
666 odd_ports1 = { (dns.name.from_text('com'), loopback_ip): self.first_port,
667 (dns.name.from_text('foo.com'), IPAddr('192.0.2.3')): 50503,
668 }
669
670 ZoneFileToServe._next_free_port = self.first_port
671
672 arghelper1 = ArgHelper(self.logger)
673 arghelper1.build_parser('probe')
674 arghelper1.parse_args(args1)
675 arghelper1.aggregate_delegation_info()
676 self.assertEqual(arghelper1.explicit_delegations, explicit_delegations1)
677 self.assertEqual(arghelper1.odd_ports, odd_ports1)
678
679 def test_delegation_authoritative_aggregation_errors(self):
680 args1 = ['-A', '-N', 'example.com:ns1.example.com=192.0.2.1,ns1.example.com=[2001:db8::1]',
681 '-x', 'com:ns1.foo.com=192.0.2.3']
682
683 arghelper1 = ArgHelper(self.logger)
684 arghelper1.build_parser('probe')
685 arghelper1.parse_args(args1)
686
687 # com is specified with -x but example.com is specified with -N
688 with self.assertRaises(argparse.ArgumentTypeError):
689 arghelper1.aggregate_delegation_info()
690
691 def test_recursive_aggregation(self):
692 args1 = ['-s', 'ns1.example.com=192.0.2.1,ns1.example.com=[2001:db8::1]',
693 '-s', 'ns1.example.com=192.0.2.4,a.root-servers.net']
694
695 explicit_delegations1 = {
696 (WILDCARD_EXPLICIT_DELEGATION, dns.rdatatype.NS):
697 dns.rrset.from_text_list(WILDCARD_EXPLICIT_DELEGATION, 0, dns.rdataclass.IN, dns.rdatatype.NS,
698 ['ns1.example.com', 'a.root-servers.net']),
699 (dns.name.from_text('ns1.example.com'), dns.rdatatype.A):
700 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.A,
701 ['192.0.2.1', '192.0.2.4']),
702 (dns.name.from_text('ns1.example.com'), dns.rdatatype.AAAA):
703 dns.rrset.from_text_list(dns.name.from_text('ns1.example.com'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
704 ['2001:db8::1']),
705 (dns.name.from_text('a.root-servers.net'), dns.rdatatype.A):
706 dns.rrset.from_text_list(dns.name.from_text('a.root-servers.net'), 0, dns.rdataclass.IN, dns.rdatatype.A,
707 ['198.41.0.4']),
708 (dns.name.from_text('a.root-servers.net'), dns.rdatatype.AAAA):
709 dns.rrset.from_text_list(dns.name.from_text('a.root-servers.net'), 0, dns.rdataclass.IN, dns.rdatatype.AAAA,
710 ['2001:503:ba3e::2:30'])
711 }
712
713 odd_ports1 = {}
714
715 arghelper1 = ArgHelper(self.logger)
716 arghelper1.build_parser('probe')
717 arghelper1.parse_args(args1)
718 arghelper1.aggregate_delegation_info()
719 self.assertEqual(arghelper1.explicit_delegations, explicit_delegations1)
720 self.assertEqual(arghelper1.odd_ports, odd_ports1)
721
722 def test_option_combination_errors(self):
723
724 # Names, input file, or names file required
725 args = []
726 arghelper = ArgHelper(self.logger)
727 arghelper.build_parser('probe')
728 arghelper.parse_args(args)
729 with self.assertRaises(argparse.ArgumentTypeError):
730 arghelper.check_args()
731
732 # Names file and command-line domain names are mutually exclusive
733 args = ['-f', '/dev/null', 'example.com']
734 arghelper = ArgHelper(self.logger)
735 arghelper.build_parser('probe')
736 arghelper.parse_args(args)
737 with self.assertRaises(argparse.ArgumentTypeError):
738 arghelper.check_args()
739 arghelper.args.names_file.close()
740
741 # Authoritative analysis and recursive servers
742 args = ['-A', '-s', '192.0.2.1', 'example.com']
743 arghelper = ArgHelper(self.logger)
744 arghelper.build_parser('probe')
745 arghelper.parse_args(args)
746 with self.assertRaises(argparse.ArgumentTypeError):
747 arghelper.check_args()
748
749 # Authoritative servers with recursive analysis
750 args = ['-x', 'example.com:ns1.example.com=192.0.2.1', 'example.com']
751 arghelper = ArgHelper(self.logger)
752 arghelper.build_parser('probe')
753 arghelper.parse_args(args)
754 with self.assertRaises(argparse.ArgumentTypeError):
755 arghelper.check_args()
756
757 # Delegation information with recursive analysis
758 args = ['-N', 'example.com:ns1.example.com=192.0.2.1', 'example.com']
759 arghelper = ArgHelper(self.logger)
760 arghelper.build_parser('probe')
761 arghelper.parse_args(args)
762 with self.assertRaises(argparse.ArgumentTypeError):
763 arghelper.check_args()
764
765 # Delegation information with recursive analysis
766 args = [ '-D', 'example.com:34983 10 1 EC358CFAAEC12266EF5ACFC1FEAF2CAFF083C418', 'example.com']
767 arghelper = ArgHelper(self.logger)
768 arghelper.build_parser('probe')
769 arghelper.parse_args(args)
770 with self.assertRaises(argparse.ArgumentTypeError):
771 arghelper.check_args()
772
773 def test_ceiling(self):
774 args = ['-a', 'com', 'example.com']
775 arghelper = ArgHelper(self.logger)
776 arghelper.build_parser('probe')
777 arghelper.parse_args(args)
778 arghelper.set_kwargs()
779 self.assertEqual(arghelper.ceiling, dns.name.from_text('com'))
780
781 args = ['example.com']
782 arghelper = ArgHelper(self.logger)
783 arghelper.build_parser('probe')
784 arghelper.parse_args(args)
785 arghelper.set_kwargs()
786 self.assertEqual(arghelper.ceiling, dns.name.root)
787
788 args = ['-A', 'example.com']
789 arghelper = ArgHelper(self.logger)
790 arghelper.build_parser('probe')
791 arghelper.parse_args(args)
792 arghelper.set_kwargs()
793 self.assertIsNone(arghelper.ceiling)
794
795 def test_ip4_ipv6(self):
796 args = []
797 arghelper = ArgHelper(self.logger)
798 arghelper.build_parser('probe')
799 arghelper.parse_args(args)
800 arghelper.set_kwargs()
801 self.assertEqual(arghelper.try_ipv4, True)
802 self.assertEqual(arghelper.try_ipv6, True)
803
804 args = ['-4', '-6']
805 arghelper = ArgHelper(self.logger)
806 arghelper.build_parser('probe')
807 arghelper.parse_args(args)
808 arghelper.set_kwargs()
809 self.assertEqual(arghelper.try_ipv4, True)
810 self.assertEqual(arghelper.try_ipv6, True)
811
812 args = ['-4']
813 arghelper = ArgHelper(self.logger)
814 arghelper.build_parser('probe')
815 arghelper.parse_args(args)
816 arghelper.set_kwargs()
817 self.assertEqual(arghelper.try_ipv4, True)
818 self.assertEqual(arghelper.try_ipv6, False)
819
820 args = ['-6']
821 arghelper = ArgHelper(self.logger)
822 arghelper.build_parser('probe')
823 arghelper.parse_args(args)
824 arghelper.set_kwargs()
825 self.assertEqual(arghelper.try_ipv4, False)
826 self.assertEqual(arghelper.try_ipv6, True)
827
828 def test_client_ip(self):
829 args = []
830 arghelper = ArgHelper(self.logger)
831 arghelper.build_parser('probe')
832 arghelper.parse_args(args)
833 arghelper.set_kwargs()
834 self.assertIsNone(arghelper.client_ipv4)
835 self.assertIsNone(arghelper.client_ipv6)
836
837 args = ['-b', '127.0.0.1']
838 if self.use_ipv6:
839 args.extend(['-b', '::1'])
840 arghelper = ArgHelper(self.logger)
841 arghelper.build_parser('probe')
842 arghelper.parse_args(args)
843 arghelper.set_kwargs()
844 self.assertEqual(arghelper.client_ipv4, IPAddr('127.0.0.1'))
845 if self.use_ipv6:
846 self.assertEqual(arghelper.client_ipv6, IPAddr('::1'))
847
848 def test_th_factories(self):
849 args = ['example.com']
850 arghelper = ArgHelper(self.logger)
851 arghelper.build_parser('probe')
852 arghelper.parse_args(args)
853 arghelper.set_kwargs()
854 self.assertIsNone(arghelper.th_factories)
855
856 args = ['-u', 'http://example.com/', 'example.com']
857 arghelper = ArgHelper(self.logger)
858 arghelper.build_parser('probe')
859 arghelper.parse_args(args)
860 arghelper.set_kwargs()
861 self.assertIsInstance(arghelper.th_factories[0], transport.DNSQueryTransportHandlerHTTPFactory)
862
863 args = ['-u', 'ws:///dev/null', 'example.com']
864 arghelper = ArgHelper(self.logger)
865 arghelper.build_parser('probe')
866 arghelper.parse_args(args)
867 arghelper.set_kwargs()
868 self.assertIsInstance(arghelper.th_factories[0], transport.DNSQueryTransportHandlerWebSocketServerFactory)
869
870 args = ['-u', 'ssh://example.com/', 'example.com']
871 arghelper = ArgHelper(self.logger)
872 arghelper.build_parser('probe')
873 arghelper.parse_args(args)
874 arghelper.set_kwargs()
875 self.assertIsInstance(arghelper.th_factories[0], transport.DNSQueryTransportHandlerRemoteCmdFactory)
876
877 def test_edns_options(self):
878 CustomQueryMixin.edns_options = self.custom_query_mixin_edns_options_orig[:]
879
880 # None
881 args = ['-c', '', 'example.com']
882 arghelper = ArgHelper(self.logger)
883 arghelper.build_parser('probe')
884 arghelper.parse_args(args)
885 arghelper.set_kwargs()
886 self.assertEqual(len(CustomQueryMixin.edns_options), 0)
887
888 CustomQueryMixin.edns_options = self.custom_query_mixin_edns_options_orig[:]
889
890 # Only DNS cookie
891 args = ['example.com']
892 arghelper = ArgHelper(self.logger)
893 arghelper.build_parser('probe')
894 arghelper.parse_args(args)
895 arghelper.set_kwargs()
896 self.assertEqual(set([o.otype for o in CustomQueryMixin.edns_options]), set([10]))
897
898 CustomQueryMixin.edns_options = self.custom_query_mixin_edns_options_orig[:]
899
900 # All EDNS options
901 args = ['-n', '-e', '192.0.2.0/24', 'example.com']
902 arghelper = ArgHelper(self.logger)
903 arghelper.build_parser('probe')
904 arghelper.parse_args(args)
905 arghelper.set_kwargs()
906 self.assertEqual(set([o.otype for o in CustomQueryMixin.edns_options]), set([3, 8, 10]))
907
908 CustomQueryMixin.edns_options = self.custom_query_mixin_edns_options_orig[:]
909
910 def test_ingest_input(self):
911 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_bad_json:
912 example_bad_json.write(b'{')
913
914 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_no_version:
915 example_no_version.write(b'{}')
916
917 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_invalid_version_1:
918 example_invalid_version_1.write(b'{ "_meta._dnsviz.": { "version": 1.11 } }')
919
920 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_invalid_version_2:
921 example_invalid_version_2.write(b'{ "_meta._dnsviz.": { "version": 5.0 } }')
922
923 with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
924 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_auth_out:
925 example_auth_out.write(example_auth_in.read())
926
927 try:
928 args = ['-r', example_auth_out.name]
929 arghelper = ArgHelper(self.logger)
930 arghelper.build_parser('probe')
931 arghelper.parse_args(args)
932 arghelper.ingest_input()
933
934 # Bad json
935 args = ['-r', example_bad_json.name]
936 arghelper = ArgHelper(self.logger)
937 arghelper.build_parser('probe')
938 arghelper.parse_args(args)
939 with self.assertRaises(AnalysisInputError):
940 arghelper.ingest_input()
941
942 # No version
943 args = ['-r', example_no_version.name]
944 arghelper = ArgHelper(self.logger)
945 arghelper.build_parser('probe')
946 arghelper.parse_args(args)
947 with self.assertRaises(AnalysisInputError):
948 arghelper.ingest_input()
949
950 # Invalid version
951 args = ['-r', example_invalid_version_1.name]
952 arghelper = ArgHelper(self.logger)
953 arghelper.build_parser('probe')
954 arghelper.parse_args(args)
955 with self.assertRaises(AnalysisInputError):
956 arghelper.ingest_input()
957
958 # Invalid version
959 args = ['-r', example_invalid_version_2.name]
960 arghelper = ArgHelper(self.logger)
961 arghelper.build_parser('probe')
962 arghelper.parse_args(args)
963 with self.assertRaises(AnalysisInputError):
964 arghelper.ingest_input()
965
966 finally:
967 for tmpfile in (example_auth_out, example_bad_json, example_no_version, \
968 example_invalid_version_1, example_invalid_version_2):
969 os.remove(tmpfile.name)
970
971 def test_ingest_names(self):
972 args = ['example.com', 'example.net']
973 arghelper = ArgHelper(self.logger)
974 arghelper.build_parser('probe')
975 arghelper.parse_args(args)
976 arghelper.ingest_names()
977 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net')])
978
979 unicode_name = 'テスト'
980
981 args = [unicode_name]
982 arghelper = ArgHelper(self.logger)
983 arghelper.build_parser('probe')
984 arghelper.parse_args(args)
985 arghelper.ingest_names()
986 self.assertEqual(list(arghelper.names), [dns.name.from_text('xn--zckzah.')])
987
988 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as names_file:
989 names_file.write('example.com\nexample.net\n'.encode('utf-8'))
990
991 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as names_file_unicode:
992 try:
993 names_file_unicode.write(('%s\n' % (unicode_name)).encode('utf-8'))
994 # python3/python2 dual compatibility
995 except UnicodeDecodeError:
996 names_file_unicode.write(('%s\n' % (unicode_name)))
997
998 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as example_names_only:
999 example_names_only.write(b'{ "_meta._dnsviz.": { "version": 1.2, "names": [ "example.com.", "example.net.", "example.org." ] } }')
1000
1001 try:
1002 args = ['-f', names_file.name]
1003 arghelper = ArgHelper(self.logger)
1004 arghelper.build_parser('probe')
1005 arghelper.parse_args(args)
1006 arghelper.ingest_names()
1007 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net')])
1008
1009 args = ['-f', names_file_unicode.name]
1010 arghelper = ArgHelper(self.logger)
1011 arghelper.build_parser('probe')
1012 arghelper.parse_args(args)
1013 arghelper.ingest_names()
1014 self.assertEqual(list(arghelper.names), [dns.name.from_text('xn--zckzah.')])
1015
1016 args = ['-r', example_names_only.name]
1017 arghelper = ArgHelper(self.logger)
1018 arghelper.build_parser('probe')
1019 arghelper.parse_args(args)
1020 arghelper.ingest_input()
1021 arghelper.ingest_names()
1022 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com'), dns.name.from_text('example.net'), dns.name.from_text('example.org')])
1023
1024 args = ['-r', example_names_only.name, 'example.com']
1025 arghelper = ArgHelper(self.logger)
1026 arghelper.build_parser('probe')
1027 arghelper.parse_args(args)
1028 arghelper.ingest_input()
1029 arghelper.ingest_names()
1030 self.assertEqual(list(arghelper.names), [dns.name.from_text('example.com')])
1031 finally:
1032 for tmpfile in (names_file, names_file_unicode, example_names_only):
1033 os.remove(tmpfile.name)
1034
1035 if __name__ == '__main__':
1036 unittest.main()
0 import gzip
1 import io
2 import os
3 import subprocess
4 import tempfile
5 import unittest
6
7 DATA_DIR = os.path.dirname(__file__)
8 EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
9 EXAMPLE_RECURSIVE = os.path.join(DATA_DIR, 'data', 'example-recursive.json.gz')
10 ROOT_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'root-authoritative.json.gz')
11 ROOT_RECURSIVE = os.path.join(DATA_DIR, 'data', 'root-recursive.json.gz')
12 EXAMPLE_COM_SIGNED = os.path.join(DATA_DIR, 'zone', 'example.com.zone.signed')
13 EXAMPLE_COM_ZONE = os.path.join(DATA_DIR, 'zone', 'example.com.zone')
14 EXAMPLE_COM_DELEGATION = os.path.join(DATA_DIR, 'zone', 'example.com.zone-delegation')
15
16 class DNSProbeRunOfflineTestCase(unittest.TestCase):
17 def setUp(self):
18 self.current_cwd = os.getcwd()
19 self.dnsviz_bin = os.path.join(self.current_cwd, 'bin', 'dnsviz')
20
21 with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
22 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_auth_out:
23 self.example_auth_out.write(example_auth_in.read())
24
25 with gzip.open(EXAMPLE_RECURSIVE, 'rb') as example_rec_in:
26 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_rec_out:
27 self.example_rec_out.write(example_rec_in.read())
28
29 with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.names_file:
30 self.names_file.write('example.com\nexample.net\n'.encode('utf-8'))
31
32 self.output = tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False)
33 self.output.close()
34
35 self.run_cwd = tempfile.mkdtemp(prefix='dnsviz')
36
37 def tearDown(self):
38 os.remove(self.example_auth_out.name)
39 os.remove(self.example_rec_out.name)
40 os.remove(self.names_file.name)
41 os.remove(self.output.name)
42 subprocess.check_call(['rm', '-rf', self.run_cwd])
43
44 def test_dnsviz_probe_input(self):
45 with io.open(self.output.name, 'wb') as fh_out:
46 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
47 p = subprocess.Popen([self.dnsviz_bin, 'probe', '-d', '0', '-r', '-', 'example.com'], stdin=subprocess.PIPE, stdout=fh_out)
48 p.communicate(fh_in.read())
49 self.assertEqual(p.returncode, 0)
50
51 with io.open(self.output.name, 'wb') as fh:
52 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '-r', self.example_auth_out.name, 'example.com'], stdout=fh), 0)
53
54 def test_dnsviz_probe_names_input(self):
55 with io.open(self.output.name, 'wb') as fh:
56 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '-r', self.example_auth_out.name, '-f', self.names_file.name], stdout=fh), 0)
57
58 with io.open(self.output.name, 'wb') as fh_out:
59 with io.open(self.names_file.name, 'rb') as fh_in:
60 p = subprocess.Popen([self.dnsviz_bin, 'probe', '-d', '0', '-r', self.example_auth_out.name, '-f', '-'], stdin=subprocess.PIPE, stdout=fh_out)
61 p.communicate(fh_in.read())
62 self.assertEqual(p.returncode, 0)
63
64 def test_dnsviz_probe_output(self):
65 with io.open(self.output.name, 'wb') as fh:
66 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '-r', self.example_auth_out.name, 'example.com'], cwd=self.run_cwd, stdout=fh), 0)
67
68 with io.open(self.output.name, 'wb') as fh:
69 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '-r', self.example_auth_out.name, '-o', '-', 'example.com'], cwd=self.run_cwd, stdout=fh), 0)
70
71 with io.open(self.output.name, 'wb') as fh:
72 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '-r', self.example_auth_out.name, '-o', 'all.json', 'example.com'], cwd=self.run_cwd, stdout=fh), 0)
73 self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'all.json')))
74
75 def test_dnsviz_probe_auth(self):
76 with io.open(self.output.name, 'wb') as fh_out:
77 with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
78 p = subprocess.Popen([self.dnsviz_bin, 'probe', '-d', '0', '-r', '-', 'example.com'], stdin=subprocess.PIPE, stdout=fh_out)
79 p.communicate(fh_in.read())
80 self.assertEqual(p.returncode, 0)
81
82 with io.open(self.output.name, 'wb') as fh_out:
83 with gzip.open(ROOT_AUTHORITATIVE) as fh_in:
84 p = subprocess.Popen([self.dnsviz_bin, 'probe', '-d', '0', '-r', '-', '.'], stdin=subprocess.PIPE, stdout=fh_out)
85 p.communicate(fh_in.read())
86 self.assertEqual(p.returncode, 0)
87
88 def test_dnsviz_probe_rec(self):
89 with io.open(self.output.name, 'wb') as fh_out:
90 with gzip.open(EXAMPLE_RECURSIVE) as fh_in:
91 p = subprocess.Popen([self.dnsviz_bin, 'probe', '-d', '0', '-r', '-', 'example.com'], stdin=subprocess.PIPE, stdout=fh_out)
92 p.communicate(fh_in.read())
93 self.assertEqual(p.returncode, 0)
94
95 with io.open(self.output.name, 'wb') as fh_out:
96 with gzip.open(ROOT_RECURSIVE) as fh_in:
97 p = subprocess.Popen([self.dnsviz_bin, 'probe', '-d', '0', '-r', '-', '.'], stdin=subprocess.PIPE, stdout=fh_out)
98 p.communicate(fh_in.read())
99 self.assertEqual(p.returncode, 0)
100
101 def test_dnsviz_probe_auth_local(self):
102 with io.open(self.output.name, 'wb') as fh:
103 self.assertEqual(subprocess.call(
104 [self.dnsviz_bin, 'probe', '-d', '0', '-A',
105 '-x' 'example.com:%s' % EXAMPLE_COM_SIGNED,
106 '-N' 'example.com:%s' % EXAMPLE_COM_DELEGATION,
107 '-D' 'example.com:%s' % EXAMPLE_COM_DELEGATION,
108 'example.com'], stdout=fh), 0)
109
110 if __name__ == '__main__':
111 unittest.main()
0 import io
1 import os
2 import subprocess
3 import tempfile
4 import unittest
5
6 class DNSVizProbeRunOnlineTestCase(unittest.TestCase):
7 def setUp(self):
8 self.current_cwd = os.getcwd()
9 self.dnsviz_bin = os.path.join(self.current_cwd, 'bin', 'dnsviz')
10
11 self.output = tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False)
12 self.output.close()
13
14 def tearDown(self):
15 os.remove(self.output.name)
16
17 def test_dnsviz_probe_auth(self):
18 with io.open(self.output.name, 'wb') as fh:
19 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '-A', '.'], stdout=fh), 0)
20
21 with io.open(self.output.name, 'wb') as fh:
22 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '-A', 'example.com'], stdout=fh), 0)
23
24 def test_dnsviz_probe_rec(self):
25 with io.open(self.output.name, 'wb') as fh:
26 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '.'], stdout=fh), 0)
27
28 with io.open(self.output.name, 'wb') as fh:
29 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', 'example.com'], stdout=fh), 0)
30
31 def test_dnsviz_probe_rec_multi(self):
32 with io.open(self.output.name, 'wb') as fh:
33 self.assertEqual(subprocess.call([self.dnsviz_bin, 'probe', '-d', '0', '-t', '3', '.', 'example.com', 'example.net'], stdout=fh), 0)
34
35
36 if __name__ == '__main__':
37 unittest.main()