Merge "Add to_dict and from_dict conversions to managed objects"
Zuul authored 3 years ago
Gerrit Code Review committed 3 years ago
63 | 63 | message = _("Key not found, uuid: %(uuid)s") |
64 | 64 | |
65 | 65 | |
66 | class InvalidManagedObjectDictError(CastellanException): | |
67 | message = _("Dict has no field '%(field)s'.") | |
68 | ||
69 | ||
70 | class UnknownManagedObjectTypeError(CastellanException): | |
71 | message = _("Type not found, type: %(type)s") | |
72 | ||
73 | ||
66 | 74 | class AuthTypeInvalidError(CastellanException): |
67 | 75 | message = _("Invalid auth_type was specified, auth_type: %(type)s") |
68 | 76 |
0 | # Licensed under the Apache License, Version 2.0 (the "License"); you may | |
1 | # not use this file except in compliance with the License. You may obtain | |
2 | # a copy of the License at | |
3 | # | |
4 | # http://www.apache.org/licenses/LICENSE-2.0 | |
5 | # | |
6 | # Unless required by applicable law or agreed to in writing, software | |
7 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
8 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
9 | # License for the specific language governing permissions and limitations | |
10 | # under the License. | |
11 | ||
12 | from castellan.common import exception | |
13 | from castellan.common.objects import opaque_data | |
14 | from castellan.common.objects import passphrase | |
15 | from castellan.common.objects import private_key | |
16 | from castellan.common.objects import public_key | |
17 | from castellan.common.objects import symmetric_key | |
18 | from castellan.common.objects import x_509 | |
19 | ||
20 | _managed_objects_by_type = { | |
21 | cls.managed_type(): cls for cls in [ | |
22 | opaque_data.OpaqueData, | |
23 | passphrase.Passphrase, | |
24 | private_key.PrivateKey, | |
25 | public_key.PublicKey, | |
26 | symmetric_key.SymmetricKey, | |
27 | x_509.X509, | |
28 | ] | |
29 | } | |
30 | ||
31 | ||
32 | def from_dict(obj, id=None): | |
33 | try: | |
34 | managed_object_type = obj["type"] | |
35 | except KeyError: | |
36 | raise exception.InvalidManagedObjectDictError(field="type") | |
37 | ||
38 | try: | |
39 | cls = _managed_objects_by_type[managed_object_type] | |
40 | except KeyError: | |
41 | raise exception.UnknownManagedObjectTypeError(type=managed_object_type) | |
42 | ||
43 | try: | |
44 | managed_object = cls.from_dict(obj, id) | |
45 | except KeyError as e: | |
46 | raise exception.InvalidManagedObjectDictError(field=str(e)) | |
47 | ||
48 | return managed_object |
21 | 21 | """ |
22 | 22 | |
23 | 23 | import abc |
24 | import binascii | |
24 | 25 | |
26 | from castellan.common.objects import exception | |
25 | 27 | from castellan.common.objects import managed_object |
26 | 28 | |
27 | 29 | |
28 | 30 | class Key(managed_object.ManagedObject): |
29 | 31 | """Base class to represent all keys.""" |
30 | 32 | |
31 | @abc.abstractproperty | |
33 | @property | |
34 | @abc.abstractmethod | |
32 | 35 | def algorithm(self): |
33 | 36 | """Returns the key's algorithm. |
34 | 37 | |
37 | 40 | """ |
38 | 41 | pass |
39 | 42 | |
40 | @abc.abstractproperty | |
43 | @property | |
44 | @abc.abstractmethod | |
41 | 45 | def bit_length(self): |
42 | 46 | """Returns the key's bit length. |
43 | 47 | |
46 | 50 | the length of the modulus. |
47 | 51 | """ |
48 | 52 | pass |
53 | ||
54 | def to_dict(self): | |
55 | dict_fields = super().to_dict() | |
56 | ||
57 | dict_fields["algorithm"] = self.algorithm | |
58 | dict_fields["bit_length"] = self.bit_length | |
59 | ||
60 | return dict_fields | |
61 | ||
62 | @classmethod | |
63 | def from_dict(cls, dict_fields, id=None, metadata_only=False): | |
64 | try: | |
65 | value = None | |
66 | ||
67 | # NOTE(moguimar): the managed object's value is exported as | |
68 | # a hex string. For now, this is a compatibility thing with | |
69 | # the already existent vault_key_manager backend. | |
70 | if not metadata_only and dict_fields["value"] is not None: | |
71 | value = binascii.unhexlify(dict_fields["value"]) | |
72 | ||
73 | return cls( | |
74 | algorithm=dict_fields["algorithm"], | |
75 | bit_length=dict_fields["bit_length"], | |
76 | key=value, | |
77 | name=dict_fields["name"], | |
78 | created=dict_fields["created"], | |
79 | id=id, | |
80 | ) | |
81 | except KeyError as e: | |
82 | raise exception.InvalidManagedObjectDictError(field=str(e)) |
18 | 18 | This module defines the ManagedObject class. The ManagedObject class |
19 | 19 | is the base class to represent all objects managed by the key manager. |
20 | 20 | """ |
21 | ||
21 | 22 | import abc |
23 | import binascii | |
24 | ||
25 | from castellan.common import exception | |
22 | 26 | |
23 | 27 | |
24 | 28 | class ManagedObject(object, metaclass=abc.ABCMeta): |
68 | 72 | """ |
69 | 73 | return self._created |
70 | 74 | |
71 | @abc.abstractproperty | |
75 | @property | |
76 | @abc.abstractmethod | |
72 | 77 | def format(self): |
73 | 78 | """Returns the encoding format. |
74 | 79 | |
76 | 81 | encoded. |
77 | 82 | """ |
78 | 83 | pass |
84 | ||
85 | @property | |
86 | def value(self): | |
87 | """Returns the managed object value.""" | |
88 | return self.get_encoded() | |
79 | 89 | |
80 | 90 | @abc.abstractmethod |
81 | 91 | def get_encoded(self): |
89 | 99 | def is_metadata_only(self): |
90 | 100 | """Returns if the associated object is only metadata or not.""" |
91 | 101 | return self.get_encoded() is None |
102 | ||
103 | @classmethod | |
104 | @abc.abstractmethod | |
105 | def managed_type(cls): | |
106 | """Returns the managed object type identifier. | |
107 | ||
108 | Returns the object's type identifier for serialization purpose. | |
109 | """ | |
110 | pass | |
111 | ||
112 | @classmethod | |
113 | def from_dict(cls, dict_fields, id=None, metadata_only=False): | |
114 | """Returns an instance of this class based on a dict object. | |
115 | ||
116 | :param dict_fields: The dictionary containing all necessary params | |
117 | to create one instance. | |
118 | :param id: The optional param 'id' to be passed to the constructor. | |
119 | :param metadata_only: A switch to create an instance with metadata | |
120 | only, without the secret itself. | |
121 | """ | |
122 | try: | |
123 | value = None | |
124 | ||
125 | # NOTE(moguimar): the managed object's value is exported as | |
126 | # a hex string. For now, this is a compatibility thing with | |
127 | # the already existent vault_key_manager backend. | |
128 | if not metadata_only and dict_fields["value"] is not None: | |
129 | value = binascii.unhexlify(dict_fields["value"]) | |
130 | ||
131 | return cls( | |
132 | value, | |
133 | name=dict_fields["name"], | |
134 | created=dict_fields["created"], | |
135 | id=id, | |
136 | ) | |
137 | except KeyError as e: | |
138 | raise exception.InvalidManagedObjectDictError(field=str(e)) | |
139 | ||
140 | def to_dict(self, metadata_only=False): | |
141 | """Returns a dict that can be used with the from_dict() method. | |
142 | ||
143 | :param metadata_only: A switch to create an dictionary with metadata | |
144 | only, without the secret itself. | |
145 | ||
146 | :rtype: dict | |
147 | """ | |
148 | value = None | |
149 | ||
150 | # NOTE(moguimar): the managed object's value is exported as | |
151 | # a hex string. For now, this is a compatibility thing with | |
152 | # the already existent vault_key_manager backend. | |
153 | if not metadata_only and self.value is not None: | |
154 | value = binascii.hexlify(self.value).decode("utf-8") | |
155 | ||
156 | return { | |
157 | "type": self.managed_type(), | |
158 | "name": self.name, | |
159 | "created": self.created, | |
160 | "value": value, | |
161 | } |
30 | 30 | Expected type for data is a bytestring. |
31 | 31 | """ |
32 | 32 | self._data = data |
33 | super(OpaqueData, self).__init__(name=name, created=created, id=id) | |
33 | super().__init__(name=name, created=created, id=id) | |
34 | ||
35 | @classmethod | |
36 | def managed_type(cls): | |
37 | return "opaque" | |
34 | 38 | |
35 | 39 | @property |
36 | 40 | def format(self): |
37 | """This method returns 'Opaque'.""" | |
38 | 41 | return "Opaque" |
39 | 42 | |
40 | 43 | def get_encoded(self): |
41 | """Returns the data in its original format.""" | |
42 | 44 | return self._data |
43 | 45 | |
44 | 46 | def __eq__(self, other): |
30 | 30 | The expected type for the passphrase is a bytestring. |
31 | 31 | """ |
32 | 32 | self._passphrase = passphrase |
33 | super(Passphrase, self).__init__(name=name, created=created, id=id) | |
33 | super().__init__(name=name, created=created, id=id) | |
34 | ||
35 | @classmethod | |
36 | def managed_type(cls): | |
37 | return "passphrase" | |
34 | 38 | |
35 | 39 | @property |
36 | 40 | def format(self): |
37 | """This method returns 'RAW'.""" | |
38 | 41 | return "RAW" |
39 | 42 | |
40 | 43 | def get_encoded(self): |
41 | """Returns the data in a bytestring.""" | |
42 | 44 | return self._passphrase |
43 | 45 | |
44 | 46 | def __eq__(self, other): |
34 | 34 | self._alg = algorithm |
35 | 35 | self._bit_length = bit_length |
36 | 36 | self._key = key |
37 | super(PrivateKey, self).__init__(name=name, created=created, id=id) | |
37 | super().__init__(name=name, created=created, id=id) | |
38 | ||
39 | @classmethod | |
40 | def managed_type(cls): | |
41 | return "private" | |
38 | 42 | |
39 | 43 | @property |
40 | 44 | def algorithm(self): |
41 | """Returns the algorithm for asymmetric encryption.""" | |
42 | 45 | return self._alg |
43 | 46 | |
44 | 47 | @property |
45 | 48 | def format(self): |
46 | """This method returns 'PKCS8'.""" | |
47 | 49 | return "PKCS8" |
48 | 50 | |
49 | 51 | @property |
50 | 52 | def bit_length(self): |
51 | """Returns the key length.""" | |
52 | 53 | return self._bit_length |
53 | 54 | |
54 | 55 | def get_encoded(self): |
55 | """Returns the key in DER encoded format.""" | |
56 | 56 | return self._key |
57 | 57 | |
58 | 58 | def __eq__(self, other): |
35 | 35 | self._alg = algorithm |
36 | 36 | self._bit_length = bit_length |
37 | 37 | self._key = key |
38 | super(PublicKey, self).__init__(name=name, created=created, id=id) | |
38 | super().__init__(name=name, created=created, id=id) | |
39 | ||
40 | @classmethod | |
41 | def managed_type(cls): | |
42 | return "public" | |
39 | 43 | |
40 | 44 | @property |
41 | 45 | def algorithm(self): |
42 | """Returns the algorithm for asymmetric encryption.""" | |
43 | 46 | return self._alg |
44 | 47 | |
45 | 48 | @property |
46 | 49 | def format(self): |
47 | """This method returns 'SubjectPublicKeyInfo'.""" | |
48 | 50 | return "SubjectPublicKeyInfo" |
49 | 51 | |
50 | 52 | def get_encoded(self): |
51 | """Returns the key in its encoded format.""" | |
52 | 53 | return self._key |
53 | 54 | |
54 | 55 | @property |
55 | 56 | def bit_length(self): |
56 | """Returns the key length.""" | |
57 | 57 | return self._bit_length |
58 | 58 | |
59 | 59 | def __eq__(self, other): |
34 | 34 | self._alg = algorithm |
35 | 35 | self._bit_length = bit_length |
36 | 36 | self._key = key |
37 | super(SymmetricKey, self).__init__(name=name, created=created, id=id) | |
37 | super().__init__(name=name, created=created, id=id) | |
38 | ||
39 | @classmethod | |
40 | def managed_type(cls): | |
41 | return "symmetric" | |
38 | 42 | |
39 | 43 | @property |
40 | 44 | def algorithm(self): |
41 | """Returns the algorithm for symmetric encryption.""" | |
42 | 45 | return self._alg |
43 | 46 | |
44 | 47 | @property |
45 | 48 | def format(self): |
46 | """This method returns 'RAW'.""" | |
47 | 49 | return "RAW" |
48 | 50 | |
49 | 51 | def get_encoded(self): |
50 | """Returns the key in its encoded format.""" | |
51 | 52 | return self._key |
52 | 53 | |
53 | 54 | @property |
54 | 55 | def bit_length(self): |
55 | """Returns the key length.""" | |
56 | 56 | return self._bit_length |
57 | 57 | |
58 | 58 | def __eq__(self, other): |
30 | 30 | The data should be in a bytestring. |
31 | 31 | """ |
32 | 32 | self._data = data |
33 | super(X509, self).__init__(name=name, created=created, id=id) | |
33 | super().__init__(name=name, created=created, id=id) | |
34 | ||
35 | @classmethod | |
36 | def managed_type(cls): | |
37 | return "certificate" | |
34 | 38 | |
35 | 39 | @property |
36 | 40 | def format(self): |
37 | """This method returns 'X.509'.""" | |
38 | 41 | return "X.509" |
39 | 42 | |
40 | 43 | def get_encoded(self): |
41 | """Returns the data in its encoded format.""" | |
42 | 44 | return self._data |
43 | 45 | |
44 | 46 | def __eq__(self, other): |
0 | # Copyright 2020 Red Hat, Inc. | |
1 | # All Rights Reserved. | |
2 | # | |
3 | # Licensed under the Apache License, Version 2.0 (the "License"); you may | |
4 | # not use this file except in compliance with the License. You may obtain | |
5 | # a copy of the License at | |
6 | # | |
7 | # http://www.apache.org/licenses/LICENSE-2.0 | |
8 | # | |
9 | # Unless required by applicable law or agreed to in writing, software | |
10 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
11 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
12 | # License for the specific language governing permissions and limitations | |
13 | # under the License. | |
14 | ||
15 | """ | |
16 | Test cases for Managed Objects. | |
17 | """ | |
18 | from castellan.common import exception | |
19 | from castellan.common import objects | |
20 | from castellan.tests import base | |
21 | ||
22 | ||
23 | class ManagedObjectFromDictTestCase(base.TestCase): | |
24 | def test_invalid_dict(self): | |
25 | self.assertRaises( | |
26 | exception.InvalidManagedObjectDictError, | |
27 | objects.from_dict, | |
28 | {}, | |
29 | ) | |
30 | ||
31 | def test_unknown_type(self): | |
32 | self.assertRaises( | |
33 | exception.UnknownManagedObjectTypeError, | |
34 | objects.from_dict, | |
35 | {"type": "non-existing-managed-object-type"}, | |
36 | ) |
15 | 15 | """ |
16 | 16 | Test cases for the opaque data class. |
17 | 17 | """ |
18 | from castellan.common import objects | |
18 | 19 | from castellan.common.objects import opaque_data |
19 | 20 | from castellan.tests import base |
20 | 21 | |
79 | 80 | def test___ne___data(self): |
80 | 81 | other_opaque = opaque_data.OpaqueData(b'other data', self.name) |
81 | 82 | self.assertTrue(self.opaque_data != other_opaque) |
83 | ||
84 | def test_to_and_from_dict(self): | |
85 | other = objects.from_dict(self.opaque_data.to_dict()) | |
86 | self.assertEqual(self.opaque_data, other) |
15 | 15 | """ |
16 | 16 | Test cases for the passphrase class. |
17 | 17 | """ |
18 | from castellan.common import objects | |
18 | 19 | from castellan.common.objects import passphrase |
19 | 20 | from castellan.tests import base |
20 | 21 | |
79 | 80 | def test___ne___data(self): |
80 | 81 | other_phrase = passphrase.Passphrase(b"other passphrase", self.name) |
81 | 82 | self.assertTrue(self.passphrase != other_phrase) |
83 | ||
84 | def test_to_and_from_dict(self): | |
85 | other = objects.from_dict(self.passphrase.to_dict()) | |
86 | self.assertEqual(self.passphrase, other) |
15 | 15 | """ |
16 | 16 | Test cases for the private key class. |
17 | 17 | """ |
18 | from castellan.common import objects | |
18 | 19 | from castellan.common.objects import private_key |
19 | 20 | from castellan.tests import base |
20 | 21 | from castellan.tests import utils |
115 | 116 | different_encoded, |
116 | 117 | self.name) |
117 | 118 | self.assertTrue(self.key != other_key) |
119 | ||
120 | def test_to_and_from_dict(self): | |
121 | other = objects.from_dict(self.key.to_dict()) | |
122 | self.assertEqual(self.key, other) |
15 | 15 | """ |
16 | 16 | Test cases for the public key class. |
17 | 17 | """ |
18 | from castellan.common import objects | |
18 | 19 | from castellan.common.objects import public_key |
19 | 20 | from castellan.tests import base |
20 | 21 | from castellan.tests import utils |
115 | 116 | different_encoded, |
116 | 117 | self.name) |
117 | 118 | self.assertTrue(self.key != other_key) |
119 | ||
120 | def test_to_and_from_dict(self): | |
121 | other = objects.from_dict(self.key.to_dict()) | |
122 | self.assertEqual(self.key, other) |
15 | 15 | """ |
16 | 16 | Test cases for the symmetric key class. |
17 | 17 | """ |
18 | from castellan.common import objects | |
18 | 19 | from castellan.common.objects import symmetric_key as sym_key |
19 | 20 | from castellan.tests import base |
20 | 21 | |
114 | 115 | different_encoded, |
115 | 116 | self.name) |
116 | 117 | self.assertTrue(self.key != other_key) |
118 | ||
119 | def test_to_and_from_dict(self): | |
120 | other = objects.from_dict(self.key.to_dict()) | |
121 | self.assertEqual(self.key, other) |
15 | 15 | """ |
16 | 16 | Test cases for the X.509 certificate class. |
17 | 17 | """ |
18 | from castellan.common import objects | |
18 | 19 | from castellan.common.objects import x_509 |
19 | 20 | from castellan.tests import base |
20 | 21 | from castellan.tests import utils |
79 | 80 | def test___ne___data(self): |
80 | 81 | other_x509 = x_509.X509(b'\x00\x00\x00', self.name) |
81 | 82 | self.assertTrue(self.cert != other_x509) |
83 | ||
84 | def test_to_and_from_dict(self): | |
85 | other = objects.from_dict(self.cert.to_dict()) | |
86 | self.assertEqual(self.cert, other) |
+10
-0
0 | --- | |
1 | features: | |
2 | - | | |
3 | Historically, the vault key manager backend converts its managed objects | |
4 | to dictionaries in order to send them as a json object. To promote | |
5 | cross-backend compatibility, suck feature should be migrated to managed | |
6 | objects. Methods from_dict() and to_dict() added to class ManagedObject. | |
7 | The Method from_dict() is a class method to create instances based on a | |
8 | dictionary while the method to_dict() is an instance method to translate | |
9 | an instance to a dictionary. |