Merge tag 'upstream/1.3.0'
Upstream version 1.3.0
Harlan Lieberman-Berg
3 years ago
0 | 0 | include LICENSE.txt |
1 | 1 | include README.rst |
2 | 2 | recursive-include docs * |
3 | recursive-include tests * | |
4 | global-exclude __pycache__ | |
5 | global-exclude *.py[cod] |
0 | 0 | Metadata-Version: 2.1 |
1 | 1 | Name: certbot-dns-rfc2136 |
2 | Version: 0.35.1 | |
2 | Version: 1.3.0 | |
3 | 3 | Summary: RFC 2136 DNS Authenticator plugin for Certbot |
4 | 4 | Home-page: https://github.com/certbot/certbot |
5 | 5 | Author: Certbot Project |
7 | 7 | License: Apache License 2.0 |
8 | 8 | Description: UNKNOWN |
9 | 9 | Platform: UNKNOWN |
10 | Classifier: Development Status :: 3 - Alpha | |
10 | Classifier: Development Status :: 5 - Production/Stable | |
11 | 11 | Classifier: Environment :: Plugins |
12 | 12 | Classifier: Intended Audience :: System Administrators |
13 | 13 | Classifier: License :: OSI Approved :: Apache Software License |
16 | 16 | Classifier: Programming Language :: Python :: 2 |
17 | 17 | Classifier: Programming Language :: Python :: 2.7 |
18 | 18 | Classifier: Programming Language :: Python :: 3 |
19 | Classifier: Programming Language :: Python :: 3.4 | |
20 | 19 | Classifier: Programming Language :: Python :: 3.5 |
21 | 20 | Classifier: Programming Language :: Python :: 3.6 |
22 | 21 | Classifier: Programming Language :: Python :: 3.7 |
22 | Classifier: Programming Language :: Python :: 3.8 | |
23 | 23 | Classifier: Topic :: Internet :: WWW/HTTP |
24 | 24 | Classifier: Topic :: Security |
25 | 25 | Classifier: Topic :: System :: Installation/Setup |
26 | 26 | Classifier: Topic :: System :: Networking |
27 | 27 | Classifier: Topic :: System :: Systems Administration |
28 | 28 | Classifier: Topic :: Utilities |
29 | Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.* | |
29 | Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.* | |
30 | 30 | Provides-Extra: docs |
0 | """Internal implementation of `~certbot_dns_rfc2136.dns_rfc2136` plugin.""" |
0 | """DNS Authenticator using RFC 2136 Dynamic Updates.""" | |
1 | import logging | |
2 | ||
3 | import dns.flags | |
4 | import dns.message | |
5 | import dns.name | |
6 | import dns.query | |
7 | import dns.rdataclass | |
8 | import dns.rdatatype | |
9 | import dns.tsig | |
10 | import dns.tsigkeyring | |
11 | import dns.update | |
12 | import zope.interface | |
13 | ||
14 | from certbot import errors | |
15 | from certbot import interfaces | |
16 | from certbot.plugins import dns_common | |
17 | ||
18 | logger = logging.getLogger(__name__) | |
19 | ||
20 | ||
21 | @zope.interface.implementer(interfaces.IAuthenticator) | |
22 | @zope.interface.provider(interfaces.IPluginFactory) | |
23 | class Authenticator(dns_common.DNSAuthenticator): | |
24 | """DNS Authenticator using RFC 2136 Dynamic Updates | |
25 | ||
26 | This Authenticator uses RFC 2136 Dynamic Updates to fulfull a dns-01 challenge. | |
27 | """ | |
28 | ||
29 | ALGORITHMS = { | |
30 | 'HMAC-MD5': dns.tsig.HMAC_MD5, | |
31 | 'HMAC-SHA1': dns.tsig.HMAC_SHA1, | |
32 | 'HMAC-SHA224': dns.tsig.HMAC_SHA224, | |
33 | 'HMAC-SHA256': dns.tsig.HMAC_SHA256, | |
34 | 'HMAC-SHA384': dns.tsig.HMAC_SHA384, | |
35 | 'HMAC-SHA512': dns.tsig.HMAC_SHA512 | |
36 | } | |
37 | ||
38 | PORT = 53 | |
39 | ||
40 | description = 'Obtain certificates using a DNS TXT record (if you are using BIND for DNS).' | |
41 | ttl = 120 | |
42 | ||
43 | def __init__(self, *args, **kwargs): | |
44 | super(Authenticator, self).__init__(*args, **kwargs) | |
45 | self.credentials = None | |
46 | ||
47 | @classmethod | |
48 | def add_parser_arguments(cls, add): # pylint: disable=arguments-differ | |
49 | super(Authenticator, cls).add_parser_arguments(add, default_propagation_seconds=60) | |
50 | add('credentials', help='RFC 2136 credentials INI file.') | |
51 | ||
52 | def more_info(self): # pylint: disable=missing-function-docstring | |
53 | return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \ | |
54 | 'RFC 2136 Dynamic Updates.' | |
55 | ||
56 | def _validate_algorithm(self, credentials): | |
57 | algorithm = credentials.conf('algorithm') | |
58 | if algorithm: | |
59 | if not self.ALGORITHMS.get(algorithm.upper()): | |
60 | raise errors.PluginError("Unknown algorithm: {0}.".format(algorithm)) | |
61 | ||
62 | def _setup_credentials(self): | |
63 | self.credentials = self._configure_credentials( | |
64 | 'credentials', | |
65 | 'RFC 2136 credentials INI file', | |
66 | { | |
67 | 'name': 'TSIG key name', | |
68 | 'secret': 'TSIG key secret', | |
69 | 'server': 'The target DNS server' | |
70 | }, | |
71 | self._validate_algorithm | |
72 | ) | |
73 | ||
74 | def _perform(self, _domain, validation_name, validation): | |
75 | self._get_rfc2136_client().add_txt_record(validation_name, validation, self.ttl) | |
76 | ||
77 | def _cleanup(self, _domain, validation_name, validation): | |
78 | self._get_rfc2136_client().del_txt_record(validation_name, validation) | |
79 | ||
80 | def _get_rfc2136_client(self): | |
81 | return _RFC2136Client(self.credentials.conf('server'), | |
82 | int(self.credentials.conf('port') or self.PORT), | |
83 | self.credentials.conf('name'), | |
84 | self.credentials.conf('secret'), | |
85 | self.ALGORITHMS.get(self.credentials.conf('algorithm'), | |
86 | dns.tsig.HMAC_MD5)) | |
87 | ||
88 | ||
89 | class _RFC2136Client(object): | |
90 | """ | |
91 | Encapsulates all communication with the target DNS server. | |
92 | """ | |
93 | def __init__(self, server, port, key_name, key_secret, key_algorithm): | |
94 | self.server = server | |
95 | self.port = port | |
96 | self.keyring = dns.tsigkeyring.from_text({ | |
97 | key_name: key_secret | |
98 | }) | |
99 | self.algorithm = key_algorithm | |
100 | ||
101 | def add_txt_record(self, record_name, record_content, record_ttl): | |
102 | """ | |
103 | Add a TXT record using the supplied information. | |
104 | ||
105 | :param str record_name: The record name (typically beginning with '_acme-challenge.'). | |
106 | :param str record_content: The record content (typically the challenge validation). | |
107 | :param int record_ttl: The record TTL (number of seconds that the record may be cached). | |
108 | :raises certbot.errors.PluginError: if an error occurs communicating with the DNS server | |
109 | """ | |
110 | ||
111 | domain = self._find_domain(record_name) | |
112 | ||
113 | n = dns.name.from_text(record_name) | |
114 | o = dns.name.from_text(domain) | |
115 | rel = n.relativize(o) | |
116 | ||
117 | update = dns.update.Update( | |
118 | domain, | |
119 | keyring=self.keyring, | |
120 | keyalgorithm=self.algorithm) | |
121 | update.add(rel, record_ttl, dns.rdatatype.TXT, record_content) | |
122 | ||
123 | try: | |
124 | response = dns.query.tcp(update, self.server, port=self.port) | |
125 | except Exception as e: | |
126 | raise errors.PluginError('Encountered error adding TXT record: {0}' | |
127 | .format(e)) | |
128 | rcode = response.rcode() | |
129 | ||
130 | if rcode == dns.rcode.NOERROR: | |
131 | logger.debug('Successfully added TXT record %s', record_name) | |
132 | else: | |
133 | raise errors.PluginError('Received response from server: {0}' | |
134 | .format(dns.rcode.to_text(rcode))) | |
135 | ||
136 | def del_txt_record(self, record_name, record_content): | |
137 | """ | |
138 | Delete a TXT record using the supplied information. | |
139 | ||
140 | :param str record_name: The record name (typically beginning with '_acme-challenge.'). | |
141 | :param str record_content: The record content (typically the challenge validation). | |
142 | :param int record_ttl: The record TTL (number of seconds that the record may be cached). | |
143 | :raises certbot.errors.PluginError: if an error occurs communicating with the DNS server | |
144 | """ | |
145 | ||
146 | domain = self._find_domain(record_name) | |
147 | ||
148 | n = dns.name.from_text(record_name) | |
149 | o = dns.name.from_text(domain) | |
150 | rel = n.relativize(o) | |
151 | ||
152 | update = dns.update.Update( | |
153 | domain, | |
154 | keyring=self.keyring, | |
155 | keyalgorithm=self.algorithm) | |
156 | update.delete(rel, dns.rdatatype.TXT, record_content) | |
157 | ||
158 | try: | |
159 | response = dns.query.tcp(update, self.server, port=self.port) | |
160 | except Exception as e: | |
161 | raise errors.PluginError('Encountered error deleting TXT record: {0}' | |
162 | .format(e)) | |
163 | rcode = response.rcode() | |
164 | ||
165 | if rcode == dns.rcode.NOERROR: | |
166 | logger.debug('Successfully deleted TXT record %s', record_name) | |
167 | else: | |
168 | raise errors.PluginError('Received response from server: {0}' | |
169 | .format(dns.rcode.to_text(rcode))) | |
170 | ||
171 | def _find_domain(self, record_name): | |
172 | """ | |
173 | Find the closest domain with an SOA record for a given domain name. | |
174 | ||
175 | :param str record_name: The record name for which to find the closest SOA record. | |
176 | :returns: The domain, if found. | |
177 | :rtype: str | |
178 | :raises certbot.errors.PluginError: if no SOA record can be found. | |
179 | """ | |
180 | ||
181 | domain_name_guesses = dns_common.base_domain_name_guesses(record_name) | |
182 | ||
183 | # Loop through until we find an authoritative SOA record | |
184 | for guess in domain_name_guesses: | |
185 | if self._query_soa(guess): | |
186 | return guess | |
187 | ||
188 | raise errors.PluginError('Unable to determine base domain for {0} using names: {1}.' | |
189 | .format(record_name, domain_name_guesses)) | |
190 | ||
191 | def _query_soa(self, domain_name): | |
192 | """ | |
193 | Query a domain name for an authoritative SOA record. | |
194 | ||
195 | :param str domain_name: The domain name to query for an SOA record. | |
196 | :returns: True if found, False otherwise. | |
197 | :rtype: bool | |
198 | :raises certbot.errors.PluginError: if no response is received. | |
199 | """ | |
200 | ||
201 | domain = dns.name.from_text(domain_name) | |
202 | ||
203 | request = dns.message.make_query(domain, dns.rdatatype.SOA, dns.rdataclass.IN) | |
204 | # Turn off Recursion Desired bit in query | |
205 | request.flags ^= dns.flags.RD | |
206 | ||
207 | try: | |
208 | try: | |
209 | response = dns.query.tcp(request, self.server, port=self.port) | |
210 | except OSError as e: | |
211 | logger.debug('TCP query failed, fallback to UDP: %s', e) | |
212 | response = dns.query.udp(request, self.server, port=self.port) | |
213 | rcode = response.rcode() | |
214 | ||
215 | # Authoritative Answer bit should be set | |
216 | if (rcode == dns.rcode.NOERROR and response.get_rrset(response.answer, | |
217 | domain, dns.rdataclass.IN, dns.rdatatype.SOA) and response.flags & dns.flags.AA): | |
218 | logger.debug('Received authoritative SOA response for %s', domain_name) | |
219 | return True | |
220 | ||
221 | logger.debug('No authoritative SOA record found for %s', domain_name) | |
222 | return False | |
223 | except Exception as e: | |
224 | raise errors.PluginError('Encountered error when making query: {0}' | |
225 | .format(e)) |
0 | """DNS Authenticator using RFC 2136 Dynamic Updates.""" | |
1 | import logging | |
2 | ||
3 | import dns.flags | |
4 | import dns.message | |
5 | import dns.name | |
6 | import dns.query | |
7 | import dns.rdataclass | |
8 | import dns.rdatatype | |
9 | import dns.tsig | |
10 | import dns.tsigkeyring | |
11 | import dns.update | |
12 | import zope.interface | |
13 | ||
14 | from certbot import errors | |
15 | from certbot import interfaces | |
16 | from certbot.plugins import dns_common | |
17 | ||
18 | logger = logging.getLogger(__name__) | |
19 | ||
20 | ||
21 | @zope.interface.implementer(interfaces.IAuthenticator) | |
22 | @zope.interface.provider(interfaces.IPluginFactory) | |
23 | class Authenticator(dns_common.DNSAuthenticator): | |
24 | """DNS Authenticator using RFC 2136 Dynamic Updates | |
25 | ||
26 | This Authenticator uses RFC 2136 Dynamic Updates to fulfull a dns-01 challenge. | |
27 | """ | |
28 | ||
29 | ALGORITHMS = { | |
30 | 'HMAC-MD5': dns.tsig.HMAC_MD5, | |
31 | 'HMAC-SHA1': dns.tsig.HMAC_SHA1, | |
32 | 'HMAC-SHA224': dns.tsig.HMAC_SHA224, | |
33 | 'HMAC-SHA256': dns.tsig.HMAC_SHA256, | |
34 | 'HMAC-SHA384': dns.tsig.HMAC_SHA384, | |
35 | 'HMAC-SHA512': dns.tsig.HMAC_SHA512 | |
36 | } | |
37 | ||
38 | PORT = 53 | |
39 | ||
40 | description = 'Obtain certificates using a DNS TXT record (if you are using BIND for DNS).' | |
41 | ttl = 120 | |
42 | ||
43 | def __init__(self, *args, **kwargs): | |
44 | super(Authenticator, self).__init__(*args, **kwargs) | |
45 | self.credentials = None | |
46 | ||
47 | @classmethod | |
48 | def add_parser_arguments(cls, add): # pylint: disable=arguments-differ | |
49 | super(Authenticator, cls).add_parser_arguments(add, default_propagation_seconds=60) | |
50 | add('credentials', help='RFC 2136 credentials INI file.') | |
51 | ||
52 | def more_info(self): # pylint: disable=missing-docstring,no-self-use | |
53 | return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \ | |
54 | 'RFC 2136 Dynamic Updates.' | |
55 | ||
56 | def _validate_algorithm(self, credentials): | |
57 | algorithm = credentials.conf('algorithm') | |
58 | if algorithm: | |
59 | if not self.ALGORITHMS.get(algorithm.upper()): | |
60 | raise errors.PluginError("Unknown algorithm: {0}.".format(algorithm)) | |
61 | ||
62 | def _setup_credentials(self): | |
63 | self.credentials = self._configure_credentials( | |
64 | 'credentials', | |
65 | 'RFC 2136 credentials INI file', | |
66 | { | |
67 | 'name': 'TSIG key name', | |
68 | 'secret': 'TSIG key secret', | |
69 | 'server': 'The target DNS server' | |
70 | }, | |
71 | self._validate_algorithm | |
72 | ) | |
73 | ||
74 | def _perform(self, _domain, validation_name, validation): | |
75 | self._get_rfc2136_client().add_txt_record(validation_name, validation, self.ttl) | |
76 | ||
77 | def _cleanup(self, _domain, validation_name, validation): | |
78 | self._get_rfc2136_client().del_txt_record(validation_name, validation) | |
79 | ||
80 | def _get_rfc2136_client(self): | |
81 | return _RFC2136Client(self.credentials.conf('server'), | |
82 | int(self.credentials.conf('port') or self.PORT), | |
83 | self.credentials.conf('name'), | |
84 | self.credentials.conf('secret'), | |
85 | self.ALGORITHMS.get(self.credentials.conf('algorithm'), | |
86 | dns.tsig.HMAC_MD5)) | |
87 | ||
88 | ||
89 | class _RFC2136Client(object): | |
90 | """ | |
91 | Encapsulates all communication with the target DNS server. | |
92 | """ | |
93 | def __init__(self, server, port, key_name, key_secret, key_algorithm): | |
94 | self.server = server | |
95 | self.port = port | |
96 | self.keyring = dns.tsigkeyring.from_text({ | |
97 | key_name: key_secret | |
98 | }) | |
99 | self.algorithm = key_algorithm | |
100 | ||
101 | def add_txt_record(self, record_name, record_content, record_ttl): | |
102 | """ | |
103 | Add a TXT record using the supplied information. | |
104 | ||
105 | :param str record_name: The record name (typically beginning with '_acme-challenge.'). | |
106 | :param str record_content: The record content (typically the challenge validation). | |
107 | :param int record_ttl: The record TTL (number of seconds that the record may be cached). | |
108 | :raises certbot.errors.PluginError: if an error occurs communicating with the DNS server | |
109 | """ | |
110 | ||
111 | domain = self._find_domain(record_name) | |
112 | ||
113 | n = dns.name.from_text(record_name) | |
114 | o = dns.name.from_text(domain) | |
115 | rel = n.relativize(o) | |
116 | ||
117 | update = dns.update.Update( | |
118 | domain, | |
119 | keyring=self.keyring, | |
120 | keyalgorithm=self.algorithm) | |
121 | update.add(rel, record_ttl, dns.rdatatype.TXT, record_content) | |
122 | ||
123 | try: | |
124 | response = dns.query.tcp(update, self.server, port=self.port) | |
125 | except Exception as e: | |
126 | raise errors.PluginError('Encountered error adding TXT record: {0}' | |
127 | .format(e)) | |
128 | rcode = response.rcode() | |
129 | ||
130 | if rcode == dns.rcode.NOERROR: | |
131 | logger.debug('Successfully added TXT record') | |
132 | else: | |
133 | raise errors.PluginError('Received response from server: {0}' | |
134 | .format(dns.rcode.to_text(rcode))) | |
135 | ||
136 | def del_txt_record(self, record_name, record_content): | |
137 | """ | |
138 | Delete a TXT record using the supplied information. | |
139 | ||
140 | :param str record_name: The record name (typically beginning with '_acme-challenge.'). | |
141 | :param str record_content: The record content (typically the challenge validation). | |
142 | :param int record_ttl: The record TTL (number of seconds that the record may be cached). | |
143 | :raises certbot.errors.PluginError: if an error occurs communicating with the DNS server | |
144 | """ | |
145 | ||
146 | domain = self._find_domain(record_name) | |
147 | ||
148 | n = dns.name.from_text(record_name) | |
149 | o = dns.name.from_text(domain) | |
150 | rel = n.relativize(o) | |
151 | ||
152 | update = dns.update.Update( | |
153 | domain, | |
154 | keyring=self.keyring, | |
155 | keyalgorithm=self.algorithm) | |
156 | update.delete(rel, dns.rdatatype.TXT, record_content) | |
157 | ||
158 | try: | |
159 | response = dns.query.tcp(update, self.server, port=self.port) | |
160 | except Exception as e: | |
161 | raise errors.PluginError('Encountered error deleting TXT record: {0}' | |
162 | .format(e)) | |
163 | rcode = response.rcode() | |
164 | ||
165 | if rcode == dns.rcode.NOERROR: | |
166 | logger.debug('Successfully deleted TXT record') | |
167 | else: | |
168 | raise errors.PluginError('Received response from server: {0}' | |
169 | .format(dns.rcode.to_text(rcode))) | |
170 | ||
171 | def _find_domain(self, record_name): | |
172 | """ | |
173 | Find the closest domain with an SOA record for a given domain name. | |
174 | ||
175 | :param str record_name: The record name for which to find the closest SOA record. | |
176 | :returns: The domain, if found. | |
177 | :rtype: str | |
178 | :raises certbot.errors.PluginError: if no SOA record can be found. | |
179 | """ | |
180 | ||
181 | domain_name_guesses = dns_common.base_domain_name_guesses(record_name) | |
182 | ||
183 | # Loop through until we find an authoritative SOA record | |
184 | for guess in domain_name_guesses: | |
185 | if self._query_soa(guess): | |
186 | return guess | |
187 | ||
188 | raise errors.PluginError('Unable to determine base domain for {0} using names: {1}.' | |
189 | .format(record_name, domain_name_guesses)) | |
190 | ||
191 | def _query_soa(self, domain_name): | |
192 | """ | |
193 | Query a domain name for an authoritative SOA record. | |
194 | ||
195 | :param str domain_name: The domain name to query for an SOA record. | |
196 | :returns: True if found, False otherwise. | |
197 | :rtype: bool | |
198 | :raises certbot.errors.PluginError: if no response is received. | |
199 | """ | |
200 | ||
201 | domain = dns.name.from_text(domain_name) | |
202 | ||
203 | request = dns.message.make_query(domain, dns.rdatatype.SOA, dns.rdataclass.IN) | |
204 | # Turn off Recursion Desired bit in query | |
205 | request.flags ^= dns.flags.RD | |
206 | ||
207 | try: | |
208 | response = dns.query.udp(request, self.server, port=self.port) | |
209 | rcode = response.rcode() | |
210 | ||
211 | # Authoritative Answer bit should be set | |
212 | if (rcode == dns.rcode.NOERROR and response.get_rrset(response.answer, | |
213 | domain, dns.rdataclass.IN, dns.rdatatype.SOA) and response.flags & dns.flags.AA): | |
214 | logger.debug('Received authoritative SOA response for %s', domain_name) | |
215 | return True | |
216 | ||
217 | logger.debug('No authoritative SOA record found for %s', domain_name) | |
218 | return False | |
219 | except Exception as e: | |
220 | raise errors.PluginError('Encountered error when making query: {0}' | |
221 | .format(e)) |
0 | """Tests for certbot_dns_rfc2136.dns_rfc2136.""" | |
1 | ||
2 | import unittest | |
3 | ||
4 | import dns.flags | |
5 | import dns.rcode | |
6 | import dns.tsig | |
7 | import mock | |
8 | ||
9 | from certbot import errors | |
10 | from certbot.compat import os | |
11 | from certbot.plugins import dns_test_common | |
12 | from certbot.plugins.dns_test_common import DOMAIN | |
13 | from certbot.tests import util as test_util | |
14 | ||
15 | SERVER = '192.0.2.1' | |
16 | PORT = 53 | |
17 | NAME = 'a-tsig-key.' | |
18 | SECRET = 'SSB3b25kZXIgd2hvIHdpbGwgYm90aGVyIHRvIGRlY29kZSB0aGlzIHRleHQK' | |
19 | VALID_CONFIG = {"rfc2136_server": SERVER, "rfc2136_name": NAME, "rfc2136_secret": SECRET} | |
20 | ||
21 | ||
22 | class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthenticatorTest): | |
23 | ||
24 | def setUp(self): | |
25 | from certbot_dns_rfc2136.dns_rfc2136 import Authenticator | |
26 | ||
27 | super(AuthenticatorTest, self).setUp() | |
28 | ||
29 | path = os.path.join(self.tempdir, 'file.ini') | |
30 | dns_test_common.write(VALID_CONFIG, path) | |
31 | ||
32 | self.config = mock.MagicMock(rfc2136_credentials=path, | |
33 | rfc2136_propagation_seconds=0) # don't wait during tests | |
34 | ||
35 | self.auth = Authenticator(self.config, "rfc2136") | |
36 | ||
37 | self.mock_client = mock.MagicMock() | |
38 | # _get_rfc2136_client | pylint: disable=protected-access | |
39 | self.auth._get_rfc2136_client = mock.MagicMock(return_value=self.mock_client) | |
40 | ||
41 | def test_perform(self): | |
42 | self.auth.perform([self.achall]) | |
43 | ||
44 | expected = [mock.call.add_txt_record('_acme-challenge.'+DOMAIN, mock.ANY, mock.ANY)] | |
45 | self.assertEqual(expected, self.mock_client.mock_calls) | |
46 | ||
47 | def test_cleanup(self): | |
48 | # _attempt_cleanup | pylint: disable=protected-access | |
49 | self.auth._attempt_cleanup = True | |
50 | self.auth.cleanup([self.achall]) | |
51 | ||
52 | expected = [mock.call.del_txt_record('_acme-challenge.'+DOMAIN, mock.ANY)] | |
53 | self.assertEqual(expected, self.mock_client.mock_calls) | |
54 | ||
55 | def test_invalid_algorithm_raises(self): | |
56 | config = VALID_CONFIG.copy() | |
57 | config["rfc2136_algorithm"] = "INVALID" | |
58 | dns_test_common.write(config, self.config.rfc2136_credentials) | |
59 | ||
60 | self.assertRaises(errors.PluginError, | |
61 | self.auth.perform, | |
62 | [self.achall]) | |
63 | ||
64 | def test_valid_algorithm_passes(self): | |
65 | config = VALID_CONFIG.copy() | |
66 | config["rfc2136_algorithm"] = "HMAC-sha512" | |
67 | dns_test_common.write(config, self.config.rfc2136_credentials) | |
68 | ||
69 | self.auth.perform([self.achall]) | |
70 | ||
71 | ||
72 | class RFC2136ClientTest(unittest.TestCase): | |
73 | ||
74 | def setUp(self): | |
75 | from certbot_dns_rfc2136.dns_rfc2136 import _RFC2136Client | |
76 | ||
77 | self.rfc2136_client = _RFC2136Client(SERVER, PORT, NAME, SECRET, dns.tsig.HMAC_MD5) | |
78 | ||
79 | @mock.patch("dns.query.tcp") | |
80 | def test_add_txt_record(self, query_mock): | |
81 | query_mock.return_value.rcode.return_value = dns.rcode.NOERROR | |
82 | # _find_domain | pylint: disable=protected-access | |
83 | self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") | |
84 | ||
85 | self.rfc2136_client.add_txt_record("bar", "baz", 42) | |
86 | ||
87 | query_mock.assert_called_with(mock.ANY, SERVER, port=PORT) | |
88 | self.assertTrue("bar. 42 IN TXT \"baz\"" in str(query_mock.call_args[0][0])) | |
89 | ||
90 | @mock.patch("dns.query.tcp") | |
91 | def test_add_txt_record_wraps_errors(self, query_mock): | |
92 | query_mock.side_effect = Exception | |
93 | # _find_domain | pylint: disable=protected-access | |
94 | self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") | |
95 | ||
96 | self.assertRaises( | |
97 | errors.PluginError, | |
98 | self.rfc2136_client.add_txt_record, | |
99 | "bar", "baz", 42) | |
100 | ||
101 | @mock.patch("dns.query.tcp") | |
102 | def test_add_txt_record_server_error(self, query_mock): | |
103 | query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN | |
104 | # _find_domain | pylint: disable=protected-access | |
105 | self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") | |
106 | ||
107 | self.assertRaises( | |
108 | errors.PluginError, | |
109 | self.rfc2136_client.add_txt_record, | |
110 | "bar", "baz", 42) | |
111 | ||
112 | @mock.patch("dns.query.tcp") | |
113 | def test_del_txt_record(self, query_mock): | |
114 | query_mock.return_value.rcode.return_value = dns.rcode.NOERROR | |
115 | # _find_domain | pylint: disable=protected-access | |
116 | self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") | |
117 | ||
118 | self.rfc2136_client.del_txt_record("bar", "baz") | |
119 | ||
120 | query_mock.assert_called_with(mock.ANY, SERVER, port=PORT) | |
121 | self.assertTrue("bar. 0 NONE TXT \"baz\"" in str(query_mock.call_args[0][0])) | |
122 | ||
123 | @mock.patch("dns.query.tcp") | |
124 | def test_del_txt_record_wraps_errors(self, query_mock): | |
125 | query_mock.side_effect = Exception | |
126 | # _find_domain | pylint: disable=protected-access | |
127 | self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") | |
128 | ||
129 | self.assertRaises( | |
130 | errors.PluginError, | |
131 | self.rfc2136_client.del_txt_record, | |
132 | "bar", "baz") | |
133 | ||
134 | @mock.patch("dns.query.tcp") | |
135 | def test_del_txt_record_server_error(self, query_mock): | |
136 | query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN | |
137 | # _find_domain | pylint: disable=protected-access | |
138 | self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") | |
139 | ||
140 | self.assertRaises( | |
141 | errors.PluginError, | |
142 | self.rfc2136_client.del_txt_record, | |
143 | "bar", "baz") | |
144 | ||
145 | def test_find_domain(self): | |
146 | # _query_soa | pylint: disable=protected-access | |
147 | self.rfc2136_client._query_soa = mock.MagicMock(side_effect=[False, False, True]) | |
148 | ||
149 | # _find_domain | pylint: disable=protected-access | |
150 | domain = self.rfc2136_client._find_domain('foo.bar.'+DOMAIN) | |
151 | ||
152 | self.assertTrue(domain == DOMAIN) | |
153 | ||
154 | def test_find_domain_wraps_errors(self): | |
155 | # _query_soa | pylint: disable=protected-access | |
156 | self.rfc2136_client._query_soa = mock.MagicMock(return_value=False) | |
157 | ||
158 | self.assertRaises( | |
159 | errors.PluginError, | |
160 | # _find_domain | pylint: disable=protected-access | |
161 | self.rfc2136_client._find_domain, | |
162 | 'foo.bar.'+DOMAIN) | |
163 | ||
164 | @mock.patch("dns.query.udp") | |
165 | def test_query_soa_found(self, query_mock): | |
166 | query_mock.return_value = mock.MagicMock(answer=[mock.MagicMock()], flags=dns.flags.AA) | |
167 | query_mock.return_value.rcode.return_value = dns.rcode.NOERROR | |
168 | ||
169 | # _query_soa | pylint: disable=protected-access | |
170 | result = self.rfc2136_client._query_soa(DOMAIN) | |
171 | ||
172 | query_mock.assert_called_with(mock.ANY, SERVER, port=PORT) | |
173 | self.assertTrue(result) | |
174 | ||
175 | @mock.patch("dns.query.udp") | |
176 | def test_query_soa_not_found(self, query_mock): | |
177 | query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN | |
178 | ||
179 | # _query_soa | pylint: disable=protected-access | |
180 | result = self.rfc2136_client._query_soa(DOMAIN) | |
181 | ||
182 | query_mock.assert_called_with(mock.ANY, SERVER, port=PORT) | |
183 | self.assertFalse(result) | |
184 | ||
185 | @mock.patch("dns.query.udp") | |
186 | def test_query_soa_wraps_errors(self, query_mock): | |
187 | query_mock.side_effect = Exception | |
188 | ||
189 | self.assertRaises( | |
190 | errors.PluginError, | |
191 | # _query_soa | pylint: disable=protected-access | |
192 | self.rfc2136_client._query_soa, | |
193 | DOMAIN) | |
194 | ||
195 | ||
196 | if __name__ == "__main__": | |
197 | unittest.main() # pragma: no cover |
0 | 0 | Metadata-Version: 2.1 |
1 | 1 | Name: certbot-dns-rfc2136 |
2 | Version: 0.35.1 | |
2 | Version: 1.3.0 | |
3 | 3 | Summary: RFC 2136 DNS Authenticator plugin for Certbot |
4 | 4 | Home-page: https://github.com/certbot/certbot |
5 | 5 | Author: Certbot Project |
7 | 7 | License: Apache License 2.0 |
8 | 8 | Description: UNKNOWN |
9 | 9 | Platform: UNKNOWN |
10 | Classifier: Development Status :: 3 - Alpha | |
10 | Classifier: Development Status :: 5 - Production/Stable | |
11 | 11 | Classifier: Environment :: Plugins |
12 | 12 | Classifier: Intended Audience :: System Administrators |
13 | 13 | Classifier: License :: OSI Approved :: Apache Software License |
16 | 16 | Classifier: Programming Language :: Python :: 2 |
17 | 17 | Classifier: Programming Language :: Python :: 2.7 |
18 | 18 | Classifier: Programming Language :: Python :: 3 |
19 | Classifier: Programming Language :: Python :: 3.4 | |
20 | 19 | Classifier: Programming Language :: Python :: 3.5 |
21 | 20 | Classifier: Programming Language :: Python :: 3.6 |
22 | 21 | Classifier: Programming Language :: Python :: 3.7 |
22 | Classifier: Programming Language :: Python :: 3.8 | |
23 | 23 | Classifier: Topic :: Internet :: WWW/HTTP |
24 | 24 | Classifier: Topic :: Security |
25 | 25 | Classifier: Topic :: System :: Installation/Setup |
26 | 26 | Classifier: Topic :: System :: Networking |
27 | 27 | Classifier: Topic :: System :: Systems Administration |
28 | 28 | Classifier: Topic :: Utilities |
29 | Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.* | |
29 | Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.* | |
30 | 30 | Provides-Extra: docs |
3 | 3 | setup.cfg |
4 | 4 | setup.py |
5 | 5 | certbot_dns_rfc2136/__init__.py |
6 | certbot_dns_rfc2136/dns_rfc2136.py | |
7 | certbot_dns_rfc2136/dns_rfc2136_test.py | |
8 | 6 | certbot_dns_rfc2136.egg-info/PKG-INFO |
9 | 7 | certbot_dns_rfc2136.egg-info/SOURCES.txt |
10 | 8 | certbot_dns_rfc2136.egg-info/dependency_links.txt |
11 | 9 | certbot_dns_rfc2136.egg-info/entry_points.txt |
12 | 10 | certbot_dns_rfc2136.egg-info/requires.txt |
13 | 11 | certbot_dns_rfc2136.egg-info/top_level.txt |
12 | certbot_dns_rfc2136/_internal/__init__.py | |
13 | certbot_dns_rfc2136/_internal/dns_rfc2136.py | |
14 | 14 | docs/.gitignore |
15 | 15 | docs/Makefile |
16 | 16 | docs/api.rst |
17 | 17 | docs/conf.py |
18 | 18 | docs/index.rst |
19 | 19 | docs/make.bat |
20 | docs/api/dns_rfc2136.rst⏎ | |
20 | tests/dns_rfc2136_test.py⏎ |
0 | 0 | [certbot.plugins] |
1 | dns-rfc2136 = certbot_dns_rfc2136.dns_rfc2136:Authenticator | |
1 | dns-rfc2136 = certbot_dns_rfc2136._internal.dns_rfc2136:Authenticator | |
2 | 2 |
0 | :mod:`certbot_dns_rfc2136.dns_rfc2136` | |
1 | -------------------------------------- | |
2 | ||
3 | .. automodule:: certbot_dns_rfc2136.dns_rfc2136 | |
4 | :members: |
1 | 1 | API Documentation |
2 | 2 | ================= |
3 | 3 | |
4 | .. toctree:: | |
5 | :glob: | |
6 | ||
7 | api/** | |
4 | Certbot plugins implement the Certbot plugins API, and do not otherwise have an external API. |
16 | 16 | # documentation root, use os.path.abspath to make it absolute, like shown here. |
17 | 17 | # |
18 | 18 | import os |
19 | ||
19 | 20 | # import sys |
20 | 21 | # sys.path.insert(0, os.path.abspath('.')) |
21 | 22 | |
36 | 37 | 'sphinx.ext.viewcode'] |
37 | 38 | |
38 | 39 | autodoc_member_order = 'bysource' |
39 | autodoc_default_flags = ['show-inheritance', 'private-members'] | |
40 | autodoc_default_flags = ['show-inheritance'] | |
40 | 41 | |
41 | 42 | # Add any paths that contain templates here, relative to this directory. |
42 | 43 | templates_path = ['_templates'] |
82 | 83 | pygments_style = 'sphinx' |
83 | 84 | |
84 | 85 | # If true, `todo` and `todoList` produce output, else they produce nothing. |
85 | todo_include_todos = True | |
86 | todo_include_todos = False | |
86 | 87 | |
87 | 88 | |
88 | 89 | # -- Options for HTML output ---------------------------------------------- |
0 | import sys | |
1 | ||
2 | from setuptools import find_packages | |
0 | 3 | from setuptools import setup |
1 | from setuptools import find_packages | |
4 | from setuptools.command.test import test as TestCommand | |
2 | 5 | |
3 | ||
4 | version = '0.35.1' | |
6 | version = '1.3.0' | |
5 | 7 | |
6 | 8 | # Remember to update local-oldest-requirements.txt when changing the minimum |
7 | 9 | # acme/certbot version. |
8 | 10 | install_requires = [ |
9 | 11 | 'acme>=0.29.0', |
10 | 'certbot>=0.34.0', | |
12 | 'certbot>=1.1.0', | |
11 | 13 | 'dnspython', |
12 | 14 | 'mock', |
13 | 15 | 'setuptools', |
19 | 21 | 'sphinx_rtd_theme', |
20 | 22 | ] |
21 | 23 | |
24 | class PyTest(TestCommand): | |
25 | user_options = [] | |
26 | ||
27 | def initialize_options(self): | |
28 | TestCommand.initialize_options(self) | |
29 | self.pytest_args = '' | |
30 | ||
31 | def run_tests(self): | |
32 | import shlex | |
33 | # import here, cause outside the eggs aren't loaded | |
34 | import pytest | |
35 | errno = pytest.main(shlex.split(self.pytest_args)) | |
36 | sys.exit(errno) | |
37 | ||
22 | 38 | setup( |
23 | 39 | name='certbot-dns-rfc2136', |
24 | 40 | version=version, |
27 | 43 | author="Certbot Project", |
28 | 44 | author_email='client-dev@letsencrypt.org', |
29 | 45 | license='Apache License 2.0', |
30 | python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*', | |
46 | python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*', | |
31 | 47 | classifiers=[ |
32 | 'Development Status :: 3 - Alpha', | |
48 | 'Development Status :: 5 - Production/Stable', | |
33 | 49 | 'Environment :: Plugins', |
34 | 50 | 'Intended Audience :: System Administrators', |
35 | 51 | 'License :: OSI Approved :: Apache Software License', |
38 | 54 | 'Programming Language :: Python :: 2', |
39 | 55 | 'Programming Language :: Python :: 2.7', |
40 | 56 | 'Programming Language :: Python :: 3', |
41 | 'Programming Language :: Python :: 3.4', | |
42 | 57 | 'Programming Language :: Python :: 3.5', |
43 | 58 | 'Programming Language :: Python :: 3.6', |
44 | 59 | 'Programming Language :: Python :: 3.7', |
60 | 'Programming Language :: Python :: 3.8', | |
45 | 61 | 'Topic :: Internet :: WWW/HTTP', |
46 | 62 | 'Topic :: Security', |
47 | 63 | 'Topic :: System :: Installation/Setup', |
58 | 74 | }, |
59 | 75 | entry_points={ |
60 | 76 | 'certbot.plugins': [ |
61 | 'dns-rfc2136 = certbot_dns_rfc2136.dns_rfc2136:Authenticator', | |
77 | 'dns-rfc2136 = certbot_dns_rfc2136._internal.dns_rfc2136:Authenticator', | |
62 | 78 | ], |
63 | 79 | }, |
80 | tests_require=["pytest"], | |
64 | 81 | test_suite='certbot_dns_rfc2136', |
82 | cmdclass={"test": PyTest}, | |
65 | 83 | ) |
0 | """Tests for certbot_dns_rfc2136._internal.dns_rfc2136.""" | |
1 | ||
2 | import unittest | |
3 | ||
4 | import dns.flags | |
5 | import dns.rcode | |
6 | import dns.tsig | |
7 | import mock | |
8 | ||
9 | from certbot import errors | |
10 | from certbot.compat import os | |
11 | from certbot.plugins import dns_test_common | |
12 | from certbot.plugins.dns_test_common import DOMAIN | |
13 | from certbot.tests import util as test_util | |
14 | ||
15 | SERVER = '192.0.2.1' | |
16 | PORT = 53 | |
17 | NAME = 'a-tsig-key.' | |
18 | SECRET = 'SSB3b25kZXIgd2hvIHdpbGwgYm90aGVyIHRvIGRlY29kZSB0aGlzIHRleHQK' | |
19 | VALID_CONFIG = {"rfc2136_server": SERVER, "rfc2136_name": NAME, "rfc2136_secret": SECRET} | |
20 | ||
21 | ||
22 | class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthenticatorTest): | |
23 | ||
24 | def setUp(self): | |
25 | from certbot_dns_rfc2136._internal.dns_rfc2136 import Authenticator | |
26 | ||
27 | super(AuthenticatorTest, self).setUp() | |
28 | ||
29 | path = os.path.join(self.tempdir, 'file.ini') | |
30 | dns_test_common.write(VALID_CONFIG, path) | |
31 | ||
32 | self.config = mock.MagicMock(rfc2136_credentials=path, | |
33 | rfc2136_propagation_seconds=0) # don't wait during tests | |
34 | ||
35 | self.auth = Authenticator(self.config, "rfc2136") | |
36 | ||
37 | self.mock_client = mock.MagicMock() | |
38 | # _get_rfc2136_client | pylint: disable=protected-access | |
39 | self.auth._get_rfc2136_client = mock.MagicMock(return_value=self.mock_client) | |
40 | ||
41 | def test_perform(self): | |
42 | self.auth.perform([self.achall]) | |
43 | ||
44 | expected = [mock.call.add_txt_record('_acme-challenge.'+DOMAIN, mock.ANY, mock.ANY)] | |
45 | self.assertEqual(expected, self.mock_client.mock_calls) | |
46 | ||
47 | def test_cleanup(self): | |
48 | # _attempt_cleanup | pylint: disable=protected-access | |
49 | self.auth._attempt_cleanup = True | |
50 | self.auth.cleanup([self.achall]) | |
51 | ||
52 | expected = [mock.call.del_txt_record('_acme-challenge.'+DOMAIN, mock.ANY)] | |
53 | self.assertEqual(expected, self.mock_client.mock_calls) | |
54 | ||
55 | def test_invalid_algorithm_raises(self): | |
56 | config = VALID_CONFIG.copy() | |
57 | config["rfc2136_algorithm"] = "INVALID" | |
58 | dns_test_common.write(config, self.config.rfc2136_credentials) | |
59 | ||
60 | self.assertRaises(errors.PluginError, | |
61 | self.auth.perform, | |
62 | [self.achall]) | |
63 | ||
64 | def test_valid_algorithm_passes(self): | |
65 | config = VALID_CONFIG.copy() | |
66 | config["rfc2136_algorithm"] = "HMAC-sha512" | |
67 | dns_test_common.write(config, self.config.rfc2136_credentials) | |
68 | ||
69 | self.auth.perform([self.achall]) | |
70 | ||
71 | ||
72 | class RFC2136ClientTest(unittest.TestCase): | |
73 | ||
74 | def setUp(self): | |
75 | from certbot_dns_rfc2136._internal.dns_rfc2136 import _RFC2136Client | |
76 | ||
77 | self.rfc2136_client = _RFC2136Client(SERVER, PORT, NAME, SECRET, dns.tsig.HMAC_MD5) | |
78 | ||
79 | @mock.patch("dns.query.tcp") | |
80 | def test_add_txt_record(self, query_mock): | |
81 | query_mock.return_value.rcode.return_value = dns.rcode.NOERROR | |
82 | # _find_domain | pylint: disable=protected-access | |
83 | self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") | |
84 | ||
85 | self.rfc2136_client.add_txt_record("bar", "baz", 42) | |
86 | ||
87 | query_mock.assert_called_with(mock.ANY, SERVER, port=PORT) | |
88 | self.assertTrue("bar. 42 IN TXT \"baz\"" in str(query_mock.call_args[0][0])) | |
89 | ||
90 | @mock.patch("dns.query.tcp") | |
91 | def test_add_txt_record_wraps_errors(self, query_mock): | |
92 | query_mock.side_effect = Exception | |
93 | # _find_domain | pylint: disable=protected-access | |
94 | self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") | |
95 | ||
96 | self.assertRaises( | |
97 | errors.PluginError, | |
98 | self.rfc2136_client.add_txt_record, | |
99 | "bar", "baz", 42) | |
100 | ||
101 | @mock.patch("dns.query.tcp") | |
102 | def test_add_txt_record_server_error(self, query_mock): | |
103 | query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN | |
104 | # _find_domain | pylint: disable=protected-access | |
105 | self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") | |
106 | ||
107 | self.assertRaises( | |
108 | errors.PluginError, | |
109 | self.rfc2136_client.add_txt_record, | |
110 | "bar", "baz", 42) | |
111 | ||
112 | @mock.patch("dns.query.tcp") | |
113 | def test_del_txt_record(self, query_mock): | |
114 | query_mock.return_value.rcode.return_value = dns.rcode.NOERROR | |
115 | # _find_domain | pylint: disable=protected-access | |
116 | self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") | |
117 | ||
118 | self.rfc2136_client.del_txt_record("bar", "baz") | |
119 | ||
120 | query_mock.assert_called_with(mock.ANY, SERVER, port=PORT) | |
121 | self.assertTrue("bar. 0 NONE TXT \"baz\"" in str(query_mock.call_args[0][0])) | |
122 | ||
123 | @mock.patch("dns.query.tcp") | |
124 | def test_del_txt_record_wraps_errors(self, query_mock): | |
125 | query_mock.side_effect = Exception | |
126 | # _find_domain | pylint: disable=protected-access | |
127 | self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") | |
128 | ||
129 | self.assertRaises( | |
130 | errors.PluginError, | |
131 | self.rfc2136_client.del_txt_record, | |
132 | "bar", "baz") | |
133 | ||
134 | @mock.patch("dns.query.tcp") | |
135 | def test_del_txt_record_server_error(self, query_mock): | |
136 | query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN | |
137 | # _find_domain | pylint: disable=protected-access | |
138 | self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com") | |
139 | ||
140 | self.assertRaises( | |
141 | errors.PluginError, | |
142 | self.rfc2136_client.del_txt_record, | |
143 | "bar", "baz") | |
144 | ||
145 | def test_find_domain(self): | |
146 | # _query_soa | pylint: disable=protected-access | |
147 | self.rfc2136_client._query_soa = mock.MagicMock(side_effect=[False, False, True]) | |
148 | ||
149 | # _find_domain | pylint: disable=protected-access | |
150 | domain = self.rfc2136_client._find_domain('foo.bar.'+DOMAIN) | |
151 | ||
152 | self.assertTrue(domain == DOMAIN) | |
153 | ||
154 | def test_find_domain_wraps_errors(self): | |
155 | # _query_soa | pylint: disable=protected-access | |
156 | self.rfc2136_client._query_soa = mock.MagicMock(return_value=False) | |
157 | ||
158 | self.assertRaises( | |
159 | errors.PluginError, | |
160 | # _find_domain | pylint: disable=protected-access | |
161 | self.rfc2136_client._find_domain, | |
162 | 'foo.bar.'+DOMAIN) | |
163 | ||
164 | @mock.patch("dns.query.tcp") | |
165 | def test_query_soa_found(self, query_mock): | |
166 | query_mock.return_value = mock.MagicMock(answer=[mock.MagicMock()], flags=dns.flags.AA) | |
167 | query_mock.return_value.rcode.return_value = dns.rcode.NOERROR | |
168 | ||
169 | # _query_soa | pylint: disable=protected-access | |
170 | result = self.rfc2136_client._query_soa(DOMAIN) | |
171 | ||
172 | query_mock.assert_called_with(mock.ANY, SERVER, port=PORT) | |
173 | self.assertTrue(result) | |
174 | ||
175 | @mock.patch("dns.query.tcp") | |
176 | def test_query_soa_not_found(self, query_mock): | |
177 | query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN | |
178 | ||
179 | # _query_soa | pylint: disable=protected-access | |
180 | result = self.rfc2136_client._query_soa(DOMAIN) | |
181 | ||
182 | query_mock.assert_called_with(mock.ANY, SERVER, port=PORT) | |
183 | self.assertFalse(result) | |
184 | ||
185 | @mock.patch("dns.query.tcp") | |
186 | def test_query_soa_wraps_errors(self, query_mock): | |
187 | query_mock.side_effect = Exception | |
188 | ||
189 | self.assertRaises( | |
190 | errors.PluginError, | |
191 | # _query_soa | pylint: disable=protected-access | |
192 | self.rfc2136_client._query_soa, | |
193 | DOMAIN) | |
194 | ||
195 | @mock.patch("dns.query.udp") | |
196 | @mock.patch("dns.query.tcp") | |
197 | def test_query_soa_fallback_to_udp(self, tcp_mock, udp_mock): | |
198 | tcp_mock.side_effect = OSError | |
199 | udp_mock.return_value = mock.MagicMock(answer=[mock.MagicMock()], flags=dns.flags.AA) | |
200 | udp_mock.return_value.rcode.return_value = dns.rcode.NOERROR | |
201 | ||
202 | # _query_soa | pylint: disable=protected-access | |
203 | result = self.rfc2136_client._query_soa(DOMAIN) | |
204 | ||
205 | tcp_mock.assert_called_with(mock.ANY, SERVER, port=PORT) | |
206 | udp_mock.assert_called_with(mock.ANY, SERVER, port=PORT) | |
207 | self.assertTrue(result) | |
208 | ||
209 | ||
210 | if __name__ == "__main__": | |
211 | unittest.main() # pragma: no cover |