Codebase list python-certbot-dns-rfc2136 / upstream/1.3.0
New upstream version 1.3.0 Harlan Lieberman-Berg 3 years ago
15 changed file(s) with 485 addition(s) and 452 deletion(s). Raw diff Collapse all Expand all
00 include LICENSE.txt
11 include README.rst
22 recursive-include docs *
3 recursive-include tests *
4 global-exclude __pycache__
5 global-exclude *.py[cod]
00 Metadata-Version: 2.1
11 Name: certbot-dns-rfc2136
2 Version: 0.35.1
2 Version: 1.3.0
33 Summary: RFC 2136 DNS Authenticator plugin for Certbot
44 Home-page: https://github.com/certbot/certbot
55 Author: Certbot Project
77 License: Apache License 2.0
88 Description: UNKNOWN
99 Platform: UNKNOWN
10 Classifier: Development Status :: 3 - Alpha
10 Classifier: Development Status :: 5 - Production/Stable
1111 Classifier: Environment :: Plugins
1212 Classifier: Intended Audience :: System Administrators
1313 Classifier: License :: OSI Approved :: Apache Software License
1616 Classifier: Programming Language :: Python :: 2
1717 Classifier: Programming Language :: Python :: 2.7
1818 Classifier: Programming Language :: Python :: 3
19 Classifier: Programming Language :: Python :: 3.4
2019 Classifier: Programming Language :: Python :: 3.5
2120 Classifier: Programming Language :: Python :: 3.6
2221 Classifier: Programming Language :: Python :: 3.7
22 Classifier: Programming Language :: Python :: 3.8
2323 Classifier: Topic :: Internet :: WWW/HTTP
2424 Classifier: Topic :: Security
2525 Classifier: Topic :: System :: Installation/Setup
2626 Classifier: Topic :: System :: Networking
2727 Classifier: Topic :: System :: Systems Administration
2828 Classifier: Topic :: Utilities
29 Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*
29 Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*
3030 Provides-Extra: docs
0 """Internal implementation of `~certbot_dns_rfc2136.dns_rfc2136` plugin."""
0 """DNS Authenticator using RFC 2136 Dynamic Updates."""
1 import logging
2
3 import dns.flags
4 import dns.message
5 import dns.name
6 import dns.query
7 import dns.rdataclass
8 import dns.rdatatype
9 import dns.tsig
10 import dns.tsigkeyring
11 import dns.update
12 import zope.interface
13
14 from certbot import errors
15 from certbot import interfaces
16 from certbot.plugins import dns_common
17
18 logger = logging.getLogger(__name__)
19
20
21 @zope.interface.implementer(interfaces.IAuthenticator)
22 @zope.interface.provider(interfaces.IPluginFactory)
23 class Authenticator(dns_common.DNSAuthenticator):
24 """DNS Authenticator using RFC 2136 Dynamic Updates
25
26 This Authenticator uses RFC 2136 Dynamic Updates to fulfull a dns-01 challenge.
27 """
28
29 ALGORITHMS = {
30 'HMAC-MD5': dns.tsig.HMAC_MD5,
31 'HMAC-SHA1': dns.tsig.HMAC_SHA1,
32 'HMAC-SHA224': dns.tsig.HMAC_SHA224,
33 'HMAC-SHA256': dns.tsig.HMAC_SHA256,
34 'HMAC-SHA384': dns.tsig.HMAC_SHA384,
35 'HMAC-SHA512': dns.tsig.HMAC_SHA512
36 }
37
38 PORT = 53
39
40 description = 'Obtain certificates using a DNS TXT record (if you are using BIND for DNS).'
41 ttl = 120
42
43 def __init__(self, *args, **kwargs):
44 super(Authenticator, self).__init__(*args, **kwargs)
45 self.credentials = None
46
47 @classmethod
48 def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
49 super(Authenticator, cls).add_parser_arguments(add, default_propagation_seconds=60)
50 add('credentials', help='RFC 2136 credentials INI file.')
51
52 def more_info(self): # pylint: disable=missing-function-docstring
53 return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
54 'RFC 2136 Dynamic Updates.'
55
56 def _validate_algorithm(self, credentials):
57 algorithm = credentials.conf('algorithm')
58 if algorithm:
59 if not self.ALGORITHMS.get(algorithm.upper()):
60 raise errors.PluginError("Unknown algorithm: {0}.".format(algorithm))
61
62 def _setup_credentials(self):
63 self.credentials = self._configure_credentials(
64 'credentials',
65 'RFC 2136 credentials INI file',
66 {
67 'name': 'TSIG key name',
68 'secret': 'TSIG key secret',
69 'server': 'The target DNS server'
70 },
71 self._validate_algorithm
72 )
73
74 def _perform(self, _domain, validation_name, validation):
75 self._get_rfc2136_client().add_txt_record(validation_name, validation, self.ttl)
76
77 def _cleanup(self, _domain, validation_name, validation):
78 self._get_rfc2136_client().del_txt_record(validation_name, validation)
79
80 def _get_rfc2136_client(self):
81 return _RFC2136Client(self.credentials.conf('server'),
82 int(self.credentials.conf('port') or self.PORT),
83 self.credentials.conf('name'),
84 self.credentials.conf('secret'),
85 self.ALGORITHMS.get(self.credentials.conf('algorithm'),
86 dns.tsig.HMAC_MD5))
87
88
89 class _RFC2136Client(object):
90 """
91 Encapsulates all communication with the target DNS server.
92 """
93 def __init__(self, server, port, key_name, key_secret, key_algorithm):
94 self.server = server
95 self.port = port
96 self.keyring = dns.tsigkeyring.from_text({
97 key_name: key_secret
98 })
99 self.algorithm = key_algorithm
100
101 def add_txt_record(self, record_name, record_content, record_ttl):
102 """
103 Add a TXT record using the supplied information.
104
105 :param str record_name: The record name (typically beginning with '_acme-challenge.').
106 :param str record_content: The record content (typically the challenge validation).
107 :param int record_ttl: The record TTL (number of seconds that the record may be cached).
108 :raises certbot.errors.PluginError: if an error occurs communicating with the DNS server
109 """
110
111 domain = self._find_domain(record_name)
112
113 n = dns.name.from_text(record_name)
114 o = dns.name.from_text(domain)
115 rel = n.relativize(o)
116
117 update = dns.update.Update(
118 domain,
119 keyring=self.keyring,
120 keyalgorithm=self.algorithm)
121 update.add(rel, record_ttl, dns.rdatatype.TXT, record_content)
122
123 try:
124 response = dns.query.tcp(update, self.server, port=self.port)
125 except Exception as e:
126 raise errors.PluginError('Encountered error adding TXT record: {0}'
127 .format(e))
128 rcode = response.rcode()
129
130 if rcode == dns.rcode.NOERROR:
131 logger.debug('Successfully added TXT record %s', record_name)
132 else:
133 raise errors.PluginError('Received response from server: {0}'
134 .format(dns.rcode.to_text(rcode)))
135
136 def del_txt_record(self, record_name, record_content):
137 """
138 Delete a TXT record using the supplied information.
139
140 :param str record_name: The record name (typically beginning with '_acme-challenge.').
141 :param str record_content: The record content (typically the challenge validation).
142 :param int record_ttl: The record TTL (number of seconds that the record may be cached).
143 :raises certbot.errors.PluginError: if an error occurs communicating with the DNS server
144 """
145
146 domain = self._find_domain(record_name)
147
148 n = dns.name.from_text(record_name)
149 o = dns.name.from_text(domain)
150 rel = n.relativize(o)
151
152 update = dns.update.Update(
153 domain,
154 keyring=self.keyring,
155 keyalgorithm=self.algorithm)
156 update.delete(rel, dns.rdatatype.TXT, record_content)
157
158 try:
159 response = dns.query.tcp(update, self.server, port=self.port)
160 except Exception as e:
161 raise errors.PluginError('Encountered error deleting TXT record: {0}'
162 .format(e))
163 rcode = response.rcode()
164
165 if rcode == dns.rcode.NOERROR:
166 logger.debug('Successfully deleted TXT record %s', record_name)
167 else:
168 raise errors.PluginError('Received response from server: {0}'
169 .format(dns.rcode.to_text(rcode)))
170
171 def _find_domain(self, record_name):
172 """
173 Find the closest domain with an SOA record for a given domain name.
174
175 :param str record_name: The record name for which to find the closest SOA record.
176 :returns: The domain, if found.
177 :rtype: str
178 :raises certbot.errors.PluginError: if no SOA record can be found.
179 """
180
181 domain_name_guesses = dns_common.base_domain_name_guesses(record_name)
182
183 # Loop through until we find an authoritative SOA record
184 for guess in domain_name_guesses:
185 if self._query_soa(guess):
186 return guess
187
188 raise errors.PluginError('Unable to determine base domain for {0} using names: {1}.'
189 .format(record_name, domain_name_guesses))
190
191 def _query_soa(self, domain_name):
192 """
193 Query a domain name for an authoritative SOA record.
194
195 :param str domain_name: The domain name to query for an SOA record.
196 :returns: True if found, False otherwise.
197 :rtype: bool
198 :raises certbot.errors.PluginError: if no response is received.
199 """
200
201 domain = dns.name.from_text(domain_name)
202
203 request = dns.message.make_query(domain, dns.rdatatype.SOA, dns.rdataclass.IN)
204 # Turn off Recursion Desired bit in query
205 request.flags ^= dns.flags.RD
206
207 try:
208 try:
209 response = dns.query.tcp(request, self.server, port=self.port)
210 except OSError as e:
211 logger.debug('TCP query failed, fallback to UDP: %s', e)
212 response = dns.query.udp(request, self.server, port=self.port)
213 rcode = response.rcode()
214
215 # Authoritative Answer bit should be set
216 if (rcode == dns.rcode.NOERROR and response.get_rrset(response.answer,
217 domain, dns.rdataclass.IN, dns.rdatatype.SOA) and response.flags & dns.flags.AA):
218 logger.debug('Received authoritative SOA response for %s', domain_name)
219 return True
220
221 logger.debug('No authoritative SOA record found for %s', domain_name)
222 return False
223 except Exception as e:
224 raise errors.PluginError('Encountered error when making query: {0}'
225 .format(e))
+0
-222
certbot_dns_rfc2136/dns_rfc2136.py less more
0 """DNS Authenticator using RFC 2136 Dynamic Updates."""
1 import logging
2
3 import dns.flags
4 import dns.message
5 import dns.name
6 import dns.query
7 import dns.rdataclass
8 import dns.rdatatype
9 import dns.tsig
10 import dns.tsigkeyring
11 import dns.update
12 import zope.interface
13
14 from certbot import errors
15 from certbot import interfaces
16 from certbot.plugins import dns_common
17
18 logger = logging.getLogger(__name__)
19
20
21 @zope.interface.implementer(interfaces.IAuthenticator)
22 @zope.interface.provider(interfaces.IPluginFactory)
23 class Authenticator(dns_common.DNSAuthenticator):
24 """DNS Authenticator using RFC 2136 Dynamic Updates
25
26 This Authenticator uses RFC 2136 Dynamic Updates to fulfull a dns-01 challenge.
27 """
28
29 ALGORITHMS = {
30 'HMAC-MD5': dns.tsig.HMAC_MD5,
31 'HMAC-SHA1': dns.tsig.HMAC_SHA1,
32 'HMAC-SHA224': dns.tsig.HMAC_SHA224,
33 'HMAC-SHA256': dns.tsig.HMAC_SHA256,
34 'HMAC-SHA384': dns.tsig.HMAC_SHA384,
35 'HMAC-SHA512': dns.tsig.HMAC_SHA512
36 }
37
38 PORT = 53
39
40 description = 'Obtain certificates using a DNS TXT record (if you are using BIND for DNS).'
41 ttl = 120
42
43 def __init__(self, *args, **kwargs):
44 super(Authenticator, self).__init__(*args, **kwargs)
45 self.credentials = None
46
47 @classmethod
48 def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
49 super(Authenticator, cls).add_parser_arguments(add, default_propagation_seconds=60)
50 add('credentials', help='RFC 2136 credentials INI file.')
51
52 def more_info(self): # pylint: disable=missing-docstring,no-self-use
53 return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
54 'RFC 2136 Dynamic Updates.'
55
56 def _validate_algorithm(self, credentials):
57 algorithm = credentials.conf('algorithm')
58 if algorithm:
59 if not self.ALGORITHMS.get(algorithm.upper()):
60 raise errors.PluginError("Unknown algorithm: {0}.".format(algorithm))
61
62 def _setup_credentials(self):
63 self.credentials = self._configure_credentials(
64 'credentials',
65 'RFC 2136 credentials INI file',
66 {
67 'name': 'TSIG key name',
68 'secret': 'TSIG key secret',
69 'server': 'The target DNS server'
70 },
71 self._validate_algorithm
72 )
73
74 def _perform(self, _domain, validation_name, validation):
75 self._get_rfc2136_client().add_txt_record(validation_name, validation, self.ttl)
76
77 def _cleanup(self, _domain, validation_name, validation):
78 self._get_rfc2136_client().del_txt_record(validation_name, validation)
79
80 def _get_rfc2136_client(self):
81 return _RFC2136Client(self.credentials.conf('server'),
82 int(self.credentials.conf('port') or self.PORT),
83 self.credentials.conf('name'),
84 self.credentials.conf('secret'),
85 self.ALGORITHMS.get(self.credentials.conf('algorithm'),
86 dns.tsig.HMAC_MD5))
87
88
89 class _RFC2136Client(object):
90 """
91 Encapsulates all communication with the target DNS server.
92 """
93 def __init__(self, server, port, key_name, key_secret, key_algorithm):
94 self.server = server
95 self.port = port
96 self.keyring = dns.tsigkeyring.from_text({
97 key_name: key_secret
98 })
99 self.algorithm = key_algorithm
100
101 def add_txt_record(self, record_name, record_content, record_ttl):
102 """
103 Add a TXT record using the supplied information.
104
105 :param str record_name: The record name (typically beginning with '_acme-challenge.').
106 :param str record_content: The record content (typically the challenge validation).
107 :param int record_ttl: The record TTL (number of seconds that the record may be cached).
108 :raises certbot.errors.PluginError: if an error occurs communicating with the DNS server
109 """
110
111 domain = self._find_domain(record_name)
112
113 n = dns.name.from_text(record_name)
114 o = dns.name.from_text(domain)
115 rel = n.relativize(o)
116
117 update = dns.update.Update(
118 domain,
119 keyring=self.keyring,
120 keyalgorithm=self.algorithm)
121 update.add(rel, record_ttl, dns.rdatatype.TXT, record_content)
122
123 try:
124 response = dns.query.tcp(update, self.server, port=self.port)
125 except Exception as e:
126 raise errors.PluginError('Encountered error adding TXT record: {0}'
127 .format(e))
128 rcode = response.rcode()
129
130 if rcode == dns.rcode.NOERROR:
131 logger.debug('Successfully added TXT record')
132 else:
133 raise errors.PluginError('Received response from server: {0}'
134 .format(dns.rcode.to_text(rcode)))
135
136 def del_txt_record(self, record_name, record_content):
137 """
138 Delete a TXT record using the supplied information.
139
140 :param str record_name: The record name (typically beginning with '_acme-challenge.').
141 :param str record_content: The record content (typically the challenge validation).
142 :param int record_ttl: The record TTL (number of seconds that the record may be cached).
143 :raises certbot.errors.PluginError: if an error occurs communicating with the DNS server
144 """
145
146 domain = self._find_domain(record_name)
147
148 n = dns.name.from_text(record_name)
149 o = dns.name.from_text(domain)
150 rel = n.relativize(o)
151
152 update = dns.update.Update(
153 domain,
154 keyring=self.keyring,
155 keyalgorithm=self.algorithm)
156 update.delete(rel, dns.rdatatype.TXT, record_content)
157
158 try:
159 response = dns.query.tcp(update, self.server, port=self.port)
160 except Exception as e:
161 raise errors.PluginError('Encountered error deleting TXT record: {0}'
162 .format(e))
163 rcode = response.rcode()
164
165 if rcode == dns.rcode.NOERROR:
166 logger.debug('Successfully deleted TXT record')
167 else:
168 raise errors.PluginError('Received response from server: {0}'
169 .format(dns.rcode.to_text(rcode)))
170
171 def _find_domain(self, record_name):
172 """
173 Find the closest domain with an SOA record for a given domain name.
174
175 :param str record_name: The record name for which to find the closest SOA record.
176 :returns: The domain, if found.
177 :rtype: str
178 :raises certbot.errors.PluginError: if no SOA record can be found.
179 """
180
181 domain_name_guesses = dns_common.base_domain_name_guesses(record_name)
182
183 # Loop through until we find an authoritative SOA record
184 for guess in domain_name_guesses:
185 if self._query_soa(guess):
186 return guess
187
188 raise errors.PluginError('Unable to determine base domain for {0} using names: {1}.'
189 .format(record_name, domain_name_guesses))
190
191 def _query_soa(self, domain_name):
192 """
193 Query a domain name for an authoritative SOA record.
194
195 :param str domain_name: The domain name to query for an SOA record.
196 :returns: True if found, False otherwise.
197 :rtype: bool
198 :raises certbot.errors.PluginError: if no response is received.
199 """
200
201 domain = dns.name.from_text(domain_name)
202
203 request = dns.message.make_query(domain, dns.rdatatype.SOA, dns.rdataclass.IN)
204 # Turn off Recursion Desired bit in query
205 request.flags ^= dns.flags.RD
206
207 try:
208 response = dns.query.udp(request, self.server, port=self.port)
209 rcode = response.rcode()
210
211 # Authoritative Answer bit should be set
212 if (rcode == dns.rcode.NOERROR and response.get_rrset(response.answer,
213 domain, dns.rdataclass.IN, dns.rdatatype.SOA) and response.flags & dns.flags.AA):
214 logger.debug('Received authoritative SOA response for %s', domain_name)
215 return True
216
217 logger.debug('No authoritative SOA record found for %s', domain_name)
218 return False
219 except Exception as e:
220 raise errors.PluginError('Encountered error when making query: {0}'
221 .format(e))
+0
-198
certbot_dns_rfc2136/dns_rfc2136_test.py less more
0 """Tests for certbot_dns_rfc2136.dns_rfc2136."""
1
2 import unittest
3
4 import dns.flags
5 import dns.rcode
6 import dns.tsig
7 import mock
8
9 from certbot import errors
10 from certbot.compat import os
11 from certbot.plugins import dns_test_common
12 from certbot.plugins.dns_test_common import DOMAIN
13 from certbot.tests import util as test_util
14
15 SERVER = '192.0.2.1'
16 PORT = 53
17 NAME = 'a-tsig-key.'
18 SECRET = 'SSB3b25kZXIgd2hvIHdpbGwgYm90aGVyIHRvIGRlY29kZSB0aGlzIHRleHQK'
19 VALID_CONFIG = {"rfc2136_server": SERVER, "rfc2136_name": NAME, "rfc2136_secret": SECRET}
20
21
22 class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthenticatorTest):
23
24 def setUp(self):
25 from certbot_dns_rfc2136.dns_rfc2136 import Authenticator
26
27 super(AuthenticatorTest, self).setUp()
28
29 path = os.path.join(self.tempdir, 'file.ini')
30 dns_test_common.write(VALID_CONFIG, path)
31
32 self.config = mock.MagicMock(rfc2136_credentials=path,
33 rfc2136_propagation_seconds=0) # don't wait during tests
34
35 self.auth = Authenticator(self.config, "rfc2136")
36
37 self.mock_client = mock.MagicMock()
38 # _get_rfc2136_client | pylint: disable=protected-access
39 self.auth._get_rfc2136_client = mock.MagicMock(return_value=self.mock_client)
40
41 def test_perform(self):
42 self.auth.perform([self.achall])
43
44 expected = [mock.call.add_txt_record('_acme-challenge.'+DOMAIN, mock.ANY, mock.ANY)]
45 self.assertEqual(expected, self.mock_client.mock_calls)
46
47 def test_cleanup(self):
48 # _attempt_cleanup | pylint: disable=protected-access
49 self.auth._attempt_cleanup = True
50 self.auth.cleanup([self.achall])
51
52 expected = [mock.call.del_txt_record('_acme-challenge.'+DOMAIN, mock.ANY)]
53 self.assertEqual(expected, self.mock_client.mock_calls)
54
55 def test_invalid_algorithm_raises(self):
56 config = VALID_CONFIG.copy()
57 config["rfc2136_algorithm"] = "INVALID"
58 dns_test_common.write(config, self.config.rfc2136_credentials)
59
60 self.assertRaises(errors.PluginError,
61 self.auth.perform,
62 [self.achall])
63
64 def test_valid_algorithm_passes(self):
65 config = VALID_CONFIG.copy()
66 config["rfc2136_algorithm"] = "HMAC-sha512"
67 dns_test_common.write(config, self.config.rfc2136_credentials)
68
69 self.auth.perform([self.achall])
70
71
72 class RFC2136ClientTest(unittest.TestCase):
73
74 def setUp(self):
75 from certbot_dns_rfc2136.dns_rfc2136 import _RFC2136Client
76
77 self.rfc2136_client = _RFC2136Client(SERVER, PORT, NAME, SECRET, dns.tsig.HMAC_MD5)
78
79 @mock.patch("dns.query.tcp")
80 def test_add_txt_record(self, query_mock):
81 query_mock.return_value.rcode.return_value = dns.rcode.NOERROR
82 # _find_domain | pylint: disable=protected-access
83 self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
84
85 self.rfc2136_client.add_txt_record("bar", "baz", 42)
86
87 query_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
88 self.assertTrue("bar. 42 IN TXT \"baz\"" in str(query_mock.call_args[0][0]))
89
90 @mock.patch("dns.query.tcp")
91 def test_add_txt_record_wraps_errors(self, query_mock):
92 query_mock.side_effect = Exception
93 # _find_domain | pylint: disable=protected-access
94 self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
95
96 self.assertRaises(
97 errors.PluginError,
98 self.rfc2136_client.add_txt_record,
99 "bar", "baz", 42)
100
101 @mock.patch("dns.query.tcp")
102 def test_add_txt_record_server_error(self, query_mock):
103 query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN
104 # _find_domain | pylint: disable=protected-access
105 self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
106
107 self.assertRaises(
108 errors.PluginError,
109 self.rfc2136_client.add_txt_record,
110 "bar", "baz", 42)
111
112 @mock.patch("dns.query.tcp")
113 def test_del_txt_record(self, query_mock):
114 query_mock.return_value.rcode.return_value = dns.rcode.NOERROR
115 # _find_domain | pylint: disable=protected-access
116 self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
117
118 self.rfc2136_client.del_txt_record("bar", "baz")
119
120 query_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
121 self.assertTrue("bar. 0 NONE TXT \"baz\"" in str(query_mock.call_args[0][0]))
122
123 @mock.patch("dns.query.tcp")
124 def test_del_txt_record_wraps_errors(self, query_mock):
125 query_mock.side_effect = Exception
126 # _find_domain | pylint: disable=protected-access
127 self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
128
129 self.assertRaises(
130 errors.PluginError,
131 self.rfc2136_client.del_txt_record,
132 "bar", "baz")
133
134 @mock.patch("dns.query.tcp")
135 def test_del_txt_record_server_error(self, query_mock):
136 query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN
137 # _find_domain | pylint: disable=protected-access
138 self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
139
140 self.assertRaises(
141 errors.PluginError,
142 self.rfc2136_client.del_txt_record,
143 "bar", "baz")
144
145 def test_find_domain(self):
146 # _query_soa | pylint: disable=protected-access
147 self.rfc2136_client._query_soa = mock.MagicMock(side_effect=[False, False, True])
148
149 # _find_domain | pylint: disable=protected-access
150 domain = self.rfc2136_client._find_domain('foo.bar.'+DOMAIN)
151
152 self.assertTrue(domain == DOMAIN)
153
154 def test_find_domain_wraps_errors(self):
155 # _query_soa | pylint: disable=protected-access
156 self.rfc2136_client._query_soa = mock.MagicMock(return_value=False)
157
158 self.assertRaises(
159 errors.PluginError,
160 # _find_domain | pylint: disable=protected-access
161 self.rfc2136_client._find_domain,
162 'foo.bar.'+DOMAIN)
163
164 @mock.patch("dns.query.udp")
165 def test_query_soa_found(self, query_mock):
166 query_mock.return_value = mock.MagicMock(answer=[mock.MagicMock()], flags=dns.flags.AA)
167 query_mock.return_value.rcode.return_value = dns.rcode.NOERROR
168
169 # _query_soa | pylint: disable=protected-access
170 result = self.rfc2136_client._query_soa(DOMAIN)
171
172 query_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
173 self.assertTrue(result)
174
175 @mock.patch("dns.query.udp")
176 def test_query_soa_not_found(self, query_mock):
177 query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN
178
179 # _query_soa | pylint: disable=protected-access
180 result = self.rfc2136_client._query_soa(DOMAIN)
181
182 query_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
183 self.assertFalse(result)
184
185 @mock.patch("dns.query.udp")
186 def test_query_soa_wraps_errors(self, query_mock):
187 query_mock.side_effect = Exception
188
189 self.assertRaises(
190 errors.PluginError,
191 # _query_soa | pylint: disable=protected-access
192 self.rfc2136_client._query_soa,
193 DOMAIN)
194
195
196 if __name__ == "__main__":
197 unittest.main() # pragma: no cover
00 Metadata-Version: 2.1
11 Name: certbot-dns-rfc2136
2 Version: 0.35.1
2 Version: 1.3.0
33 Summary: RFC 2136 DNS Authenticator plugin for Certbot
44 Home-page: https://github.com/certbot/certbot
55 Author: Certbot Project
77 License: Apache License 2.0
88 Description: UNKNOWN
99 Platform: UNKNOWN
10 Classifier: Development Status :: 3 - Alpha
10 Classifier: Development Status :: 5 - Production/Stable
1111 Classifier: Environment :: Plugins
1212 Classifier: Intended Audience :: System Administrators
1313 Classifier: License :: OSI Approved :: Apache Software License
1616 Classifier: Programming Language :: Python :: 2
1717 Classifier: Programming Language :: Python :: 2.7
1818 Classifier: Programming Language :: Python :: 3
19 Classifier: Programming Language :: Python :: 3.4
2019 Classifier: Programming Language :: Python :: 3.5
2120 Classifier: Programming Language :: Python :: 3.6
2221 Classifier: Programming Language :: Python :: 3.7
22 Classifier: Programming Language :: Python :: 3.8
2323 Classifier: Topic :: Internet :: WWW/HTTP
2424 Classifier: Topic :: Security
2525 Classifier: Topic :: System :: Installation/Setup
2626 Classifier: Topic :: System :: Networking
2727 Classifier: Topic :: System :: Systems Administration
2828 Classifier: Topic :: Utilities
29 Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*
29 Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*
3030 Provides-Extra: docs
33 setup.cfg
44 setup.py
55 certbot_dns_rfc2136/__init__.py
6 certbot_dns_rfc2136/dns_rfc2136.py
7 certbot_dns_rfc2136/dns_rfc2136_test.py
86 certbot_dns_rfc2136.egg-info/PKG-INFO
97 certbot_dns_rfc2136.egg-info/SOURCES.txt
108 certbot_dns_rfc2136.egg-info/dependency_links.txt
119 certbot_dns_rfc2136.egg-info/entry_points.txt
1210 certbot_dns_rfc2136.egg-info/requires.txt
1311 certbot_dns_rfc2136.egg-info/top_level.txt
12 certbot_dns_rfc2136/_internal/__init__.py
13 certbot_dns_rfc2136/_internal/dns_rfc2136.py
1414 docs/.gitignore
1515 docs/Makefile
1616 docs/api.rst
1717 docs/conf.py
1818 docs/index.rst
1919 docs/make.bat
20 docs/api/dns_rfc2136.rst
20 tests/dns_rfc2136_test.py
00 [certbot.plugins]
1 dns-rfc2136 = certbot_dns_rfc2136.dns_rfc2136:Authenticator
1 dns-rfc2136 = certbot_dns_rfc2136._internal.dns_rfc2136:Authenticator
22
00 acme>=0.29.0
1 certbot>=0.34.0
1 certbot>=1.1.0
22 dnspython
33 mock
44 setuptools
+0
-5
docs/api/dns_rfc2136.rst less more
0 :mod:`certbot_dns_rfc2136.dns_rfc2136`
1 --------------------------------------
2
3 .. automodule:: certbot_dns_rfc2136.dns_rfc2136
4 :members:
11 API Documentation
22 =================
33
4 .. toctree::
5 :glob:
6
7 api/**
4 Certbot plugins implement the Certbot plugins API, and do not otherwise have an external API.
1616 # documentation root, use os.path.abspath to make it absolute, like shown here.
1717 #
1818 import os
19
1920 # import sys
2021 # sys.path.insert(0, os.path.abspath('.'))
2122
3637 'sphinx.ext.viewcode']
3738
3839 autodoc_member_order = 'bysource'
39 autodoc_default_flags = ['show-inheritance', 'private-members']
40 autodoc_default_flags = ['show-inheritance']
4041
4142 # Add any paths that contain templates here, relative to this directory.
4243 templates_path = ['_templates']
8283 pygments_style = 'sphinx'
8384
8485 # If true, `todo` and `todoList` produce output, else they produce nothing.
85 todo_include_todos = True
86 todo_include_todos = False
8687
8788
8889 # -- Options for HTML output ----------------------------------------------
0 import sys
1
2 from setuptools import find_packages
03 from setuptools import setup
1 from setuptools import find_packages
4 from setuptools.command.test import test as TestCommand
25
3
4 version = '0.35.1'
6 version = '1.3.0'
57
68 # Remember to update local-oldest-requirements.txt when changing the minimum
79 # acme/certbot version.
810 install_requires = [
911 'acme>=0.29.0',
10 'certbot>=0.34.0',
12 'certbot>=1.1.0',
1113 'dnspython',
1214 'mock',
1315 'setuptools',
1921 'sphinx_rtd_theme',
2022 ]
2123
24 class PyTest(TestCommand):
25 user_options = []
26
27 def initialize_options(self):
28 TestCommand.initialize_options(self)
29 self.pytest_args = ''
30
31 def run_tests(self):
32 import shlex
33 # import here, cause outside the eggs aren't loaded
34 import pytest
35 errno = pytest.main(shlex.split(self.pytest_args))
36 sys.exit(errno)
37
2238 setup(
2339 name='certbot-dns-rfc2136',
2440 version=version,
2743 author="Certbot Project",
2844 author_email='client-dev@letsencrypt.org',
2945 license='Apache License 2.0',
30 python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*',
46 python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*',
3147 classifiers=[
32 'Development Status :: 3 - Alpha',
48 'Development Status :: 5 - Production/Stable',
3349 'Environment :: Plugins',
3450 'Intended Audience :: System Administrators',
3551 'License :: OSI Approved :: Apache Software License',
3854 'Programming Language :: Python :: 2',
3955 'Programming Language :: Python :: 2.7',
4056 'Programming Language :: Python :: 3',
41 'Programming Language :: Python :: 3.4',
4257 'Programming Language :: Python :: 3.5',
4358 'Programming Language :: Python :: 3.6',
4459 'Programming Language :: Python :: 3.7',
60 'Programming Language :: Python :: 3.8',
4561 'Topic :: Internet :: WWW/HTTP',
4662 'Topic :: Security',
4763 'Topic :: System :: Installation/Setup',
5874 },
5975 entry_points={
6076 'certbot.plugins': [
61 'dns-rfc2136 = certbot_dns_rfc2136.dns_rfc2136:Authenticator',
77 'dns-rfc2136 = certbot_dns_rfc2136._internal.dns_rfc2136:Authenticator',
6278 ],
6379 },
80 tests_require=["pytest"],
6481 test_suite='certbot_dns_rfc2136',
82 cmdclass={"test": PyTest},
6583 )
0 """Tests for certbot_dns_rfc2136._internal.dns_rfc2136."""
1
2 import unittest
3
4 import dns.flags
5 import dns.rcode
6 import dns.tsig
7 import mock
8
9 from certbot import errors
10 from certbot.compat import os
11 from certbot.plugins import dns_test_common
12 from certbot.plugins.dns_test_common import DOMAIN
13 from certbot.tests import util as test_util
14
15 SERVER = '192.0.2.1'
16 PORT = 53
17 NAME = 'a-tsig-key.'
18 SECRET = 'SSB3b25kZXIgd2hvIHdpbGwgYm90aGVyIHRvIGRlY29kZSB0aGlzIHRleHQK'
19 VALID_CONFIG = {"rfc2136_server": SERVER, "rfc2136_name": NAME, "rfc2136_secret": SECRET}
20
21
22 class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthenticatorTest):
23
24 def setUp(self):
25 from certbot_dns_rfc2136._internal.dns_rfc2136 import Authenticator
26
27 super(AuthenticatorTest, self).setUp()
28
29 path = os.path.join(self.tempdir, 'file.ini')
30 dns_test_common.write(VALID_CONFIG, path)
31
32 self.config = mock.MagicMock(rfc2136_credentials=path,
33 rfc2136_propagation_seconds=0) # don't wait during tests
34
35 self.auth = Authenticator(self.config, "rfc2136")
36
37 self.mock_client = mock.MagicMock()
38 # _get_rfc2136_client | pylint: disable=protected-access
39 self.auth._get_rfc2136_client = mock.MagicMock(return_value=self.mock_client)
40
41 def test_perform(self):
42 self.auth.perform([self.achall])
43
44 expected = [mock.call.add_txt_record('_acme-challenge.'+DOMAIN, mock.ANY, mock.ANY)]
45 self.assertEqual(expected, self.mock_client.mock_calls)
46
47 def test_cleanup(self):
48 # _attempt_cleanup | pylint: disable=protected-access
49 self.auth._attempt_cleanup = True
50 self.auth.cleanup([self.achall])
51
52 expected = [mock.call.del_txt_record('_acme-challenge.'+DOMAIN, mock.ANY)]
53 self.assertEqual(expected, self.mock_client.mock_calls)
54
55 def test_invalid_algorithm_raises(self):
56 config = VALID_CONFIG.copy()
57 config["rfc2136_algorithm"] = "INVALID"
58 dns_test_common.write(config, self.config.rfc2136_credentials)
59
60 self.assertRaises(errors.PluginError,
61 self.auth.perform,
62 [self.achall])
63
64 def test_valid_algorithm_passes(self):
65 config = VALID_CONFIG.copy()
66 config["rfc2136_algorithm"] = "HMAC-sha512"
67 dns_test_common.write(config, self.config.rfc2136_credentials)
68
69 self.auth.perform([self.achall])
70
71
72 class RFC2136ClientTest(unittest.TestCase):
73
74 def setUp(self):
75 from certbot_dns_rfc2136._internal.dns_rfc2136 import _RFC2136Client
76
77 self.rfc2136_client = _RFC2136Client(SERVER, PORT, NAME, SECRET, dns.tsig.HMAC_MD5)
78
79 @mock.patch("dns.query.tcp")
80 def test_add_txt_record(self, query_mock):
81 query_mock.return_value.rcode.return_value = dns.rcode.NOERROR
82 # _find_domain | pylint: disable=protected-access
83 self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
84
85 self.rfc2136_client.add_txt_record("bar", "baz", 42)
86
87 query_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
88 self.assertTrue("bar. 42 IN TXT \"baz\"" in str(query_mock.call_args[0][0]))
89
90 @mock.patch("dns.query.tcp")
91 def test_add_txt_record_wraps_errors(self, query_mock):
92 query_mock.side_effect = Exception
93 # _find_domain | pylint: disable=protected-access
94 self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
95
96 self.assertRaises(
97 errors.PluginError,
98 self.rfc2136_client.add_txt_record,
99 "bar", "baz", 42)
100
101 @mock.patch("dns.query.tcp")
102 def test_add_txt_record_server_error(self, query_mock):
103 query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN
104 # _find_domain | pylint: disable=protected-access
105 self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
106
107 self.assertRaises(
108 errors.PluginError,
109 self.rfc2136_client.add_txt_record,
110 "bar", "baz", 42)
111
112 @mock.patch("dns.query.tcp")
113 def test_del_txt_record(self, query_mock):
114 query_mock.return_value.rcode.return_value = dns.rcode.NOERROR
115 # _find_domain | pylint: disable=protected-access
116 self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
117
118 self.rfc2136_client.del_txt_record("bar", "baz")
119
120 query_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
121 self.assertTrue("bar. 0 NONE TXT \"baz\"" in str(query_mock.call_args[0][0]))
122
123 @mock.patch("dns.query.tcp")
124 def test_del_txt_record_wraps_errors(self, query_mock):
125 query_mock.side_effect = Exception
126 # _find_domain | pylint: disable=protected-access
127 self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
128
129 self.assertRaises(
130 errors.PluginError,
131 self.rfc2136_client.del_txt_record,
132 "bar", "baz")
133
134 @mock.patch("dns.query.tcp")
135 def test_del_txt_record_server_error(self, query_mock):
136 query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN
137 # _find_domain | pylint: disable=protected-access
138 self.rfc2136_client._find_domain = mock.MagicMock(return_value="example.com")
139
140 self.assertRaises(
141 errors.PluginError,
142 self.rfc2136_client.del_txt_record,
143 "bar", "baz")
144
145 def test_find_domain(self):
146 # _query_soa | pylint: disable=protected-access
147 self.rfc2136_client._query_soa = mock.MagicMock(side_effect=[False, False, True])
148
149 # _find_domain | pylint: disable=protected-access
150 domain = self.rfc2136_client._find_domain('foo.bar.'+DOMAIN)
151
152 self.assertTrue(domain == DOMAIN)
153
154 def test_find_domain_wraps_errors(self):
155 # _query_soa | pylint: disable=protected-access
156 self.rfc2136_client._query_soa = mock.MagicMock(return_value=False)
157
158 self.assertRaises(
159 errors.PluginError,
160 # _find_domain | pylint: disable=protected-access
161 self.rfc2136_client._find_domain,
162 'foo.bar.'+DOMAIN)
163
164 @mock.patch("dns.query.tcp")
165 def test_query_soa_found(self, query_mock):
166 query_mock.return_value = mock.MagicMock(answer=[mock.MagicMock()], flags=dns.flags.AA)
167 query_mock.return_value.rcode.return_value = dns.rcode.NOERROR
168
169 # _query_soa | pylint: disable=protected-access
170 result = self.rfc2136_client._query_soa(DOMAIN)
171
172 query_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
173 self.assertTrue(result)
174
175 @mock.patch("dns.query.tcp")
176 def test_query_soa_not_found(self, query_mock):
177 query_mock.return_value.rcode.return_value = dns.rcode.NXDOMAIN
178
179 # _query_soa | pylint: disable=protected-access
180 result = self.rfc2136_client._query_soa(DOMAIN)
181
182 query_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
183 self.assertFalse(result)
184
185 @mock.patch("dns.query.tcp")
186 def test_query_soa_wraps_errors(self, query_mock):
187 query_mock.side_effect = Exception
188
189 self.assertRaises(
190 errors.PluginError,
191 # _query_soa | pylint: disable=protected-access
192 self.rfc2136_client._query_soa,
193 DOMAIN)
194
195 @mock.patch("dns.query.udp")
196 @mock.patch("dns.query.tcp")
197 def test_query_soa_fallback_to_udp(self, tcp_mock, udp_mock):
198 tcp_mock.side_effect = OSError
199 udp_mock.return_value = mock.MagicMock(answer=[mock.MagicMock()], flags=dns.flags.AA)
200 udp_mock.return_value.rcode.return_value = dns.rcode.NOERROR
201
202 # _query_soa | pylint: disable=protected-access
203 result = self.rfc2136_client._query_soa(DOMAIN)
204
205 tcp_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
206 udp_mock.assert_called_with(mock.ANY, SERVER, port=PORT)
207 self.assertTrue(result)
208
209
210 if __name__ == "__main__":
211 unittest.main() # pragma: no cover