Add ability to get only metadata
Adds the ability to retrieve only the metadata of a secret. This is
helpful in situations when the caller wants to know information about
the secret, but doesn't want to unnecessarily handle the secret data.
Change-Id: I63aec037973aad2555190ca3eb6bba765955399a
Kaitlin Farr
7 years ago
76 | 76 | specified. |
77 | 77 | """ |
78 | 78 | pass |
79 | ||
80 | def is_metadata_only(self): | |
81 | """Returns if the associated object is only metadata or not.""" | |
82 | return self.get_encoded() is None |
453 | 453 | else: |
454 | 454 | return secret.payload |
455 | 455 | |
456 | def _get_castellan_object(self, secret): | |
456 | def _get_castellan_object(self, secret, metadata_only=False): | |
457 | 457 | """Creates a Castellan managed object given the Barbican secret. |
458 | 458 | |
459 | :param secret: the secret from barbican with the payload of data | |
459 | The python barbicanclient lazy-loads the secret data, i.e. the secret | |
460 | data is not requested until secret.payload is called. If the user | |
461 | specifies metadata_only=True, secret.payload is never called, | |
462 | preventing unnecessary loading of secret data. | |
463 | ||
464 | :param secret: the barbican secret object | |
465 | :metadata_only: boolean indicating if the secret bytes should be | |
466 | included in the managed object | |
460 | 467 | :returns: the castellan object |
461 | 468 | """ |
462 | 469 | secret_type = op_data.OpaqueData |
464 | 471 | if barbican_type == secret.secret_type: |
465 | 472 | secret_type = castellan_type |
466 | 473 | |
467 | secret_data = self._get_secret_data(secret) | |
474 | if metadata_only: | |
475 | secret_data = None | |
476 | else: | |
477 | secret_data = self._get_secret_data(secret) | |
468 | 478 | |
469 | 479 | # convert created ISO8601 in Barbican to POSIX |
470 | 480 | if secret.created: |
513 | 523 | else: |
514 | 524 | return False |
515 | 525 | |
516 | def get(self, context, managed_object_id): | |
526 | def get(self, context, managed_object_id, metadata_only=False): | |
517 | 527 | """Retrieves the specified managed object. |
518 | 528 | |
519 | 529 | :param context: contains information of the user and the environment |
520 | 530 | for the request (castellan/context.py) |
521 | 531 | :param managed_object_id: the UUID of the object to retrieve |
532 | :param metadata_only: whether secret data should be included | |
522 | 533 | :return: ManagedObject representation of the managed object |
523 | 534 | :raises KeyManagerError: if object retrieval fails |
524 | 535 | :raises ManagedObjectNotFoundError: if object not found |
525 | 536 | """ |
526 | 537 | try: |
527 | 538 | secret = self._get_secret(context, managed_object_id) |
528 | return self._get_castellan_object(secret) | |
539 | return self._get_castellan_object(secret, metadata_only) | |
529 | 540 | except (barbican_exceptions.HTTPAuthError, |
530 | 541 | barbican_exceptions.HTTPClientError, |
531 | 542 | barbican_exceptions.HTTPServerError) as e: |
73 | 73 | pass |
74 | 74 | |
75 | 75 | @abc.abstractmethod |
76 | def get(self, context, managed_object_id): | |
76 | def get(self, context, managed_object_id, metadata_only=False): | |
77 | 77 | """Retrieves the specified managed object. |
78 | 78 | |
79 | 79 | Implementations should verify that the caller has permissions to |
80 | 80 | retrieve the managed object by checking the context object passed in |
81 | 81 | as context. If the user lacks permission then a NotAuthorized |
82 | 82 | exception is raised. |
83 | ||
84 | If the caller requests only metadata, then the object that is | |
85 | returned will contain only the secret metadata and no secret bytes. | |
83 | 86 | |
84 | 87 | If the specified object does not exist, then a KeyError should be |
85 | 88 | raised. Implementations should preclude users from discerning the |
135 | 135 | retrieved_object = self.key_mgr.get(self.ctxt, uuid) |
136 | 136 | self.assertEqual(managed_object.get_encoded(), |
137 | 137 | retrieved_object.get_encoded()) |
138 | self.assertFalse(managed_object.is_metadata_only()) | |
139 | ||
140 | @utils.parameterized_dataset({ | |
141 | 'symmetric_key': [_get_test_symmetric_key()], | |
142 | 'public_key': [_get_test_public_key()], | |
143 | 'private_key': [_get_test_private_key()], | |
144 | 'certificate': [_get_test_certificate()], | |
145 | 'passphrase': [_get_test_passphrase()], | |
146 | 'opaque_data': [_get_test_opaque_data()], | |
147 | }) | |
148 | def test_get_metadata(self, managed_object): | |
149 | uuid = self._get_valid_object_uuid(managed_object) | |
150 | self.addCleanup(self.key_mgr.delete, self.ctxt, uuid) | |
151 | ||
152 | retrieved_object = self.key_mgr.get(self.ctxt, | |
153 | uuid, | |
154 | metadata_only=True) | |
155 | self.assertFalse(managed_object.is_metadata_only()) | |
156 | self.assertTrue(retrieved_object.is_metadata_only()) | |
138 | 157 | |
139 | 158 | @utils.parameterized_dataset({ |
140 | 159 | 'symmetric_key': [_get_test_symmetric_key()], |
165 | 165 | |
166 | 166 | return key_id |
167 | 167 | |
168 | def get(self, context, managed_object_id, **kwargs): | |
168 | def get(self, context, managed_object_id, metadata_only=False, **kwargs): | |
169 | 169 | """Retrieves the key identified by the specified id. |
170 | 170 | |
171 | 171 | This implementation returns the key that is associated with the |
175 | 175 | if context is None: |
176 | 176 | raise exception.Forbidden() |
177 | 177 | |
178 | return self.keys[managed_object_id] | |
178 | obj = self.keys[managed_object_id] | |
179 | if metadata_only: | |
180 | if hasattr(obj, "_key"): | |
181 | obj._key = None | |
182 | if hasattr(obj, "_data"): | |
183 | obj._data = None | |
184 | if hasattr(obj, "_passphrase"): | |
185 | obj._passphrase = None | |
186 | return obj | |
179 | 187 | |
180 | 188 | def delete(self, context, managed_object_id, **kwargs): |
181 | 189 | """Deletes the object identified by the specified id. |
147 | 147 | actual_key = self.key_mgr.get(self.context, key_id) |
148 | 148 | self.assertEqual(_key, actual_key) |
149 | 149 | |
150 | def test_store_key_and_get_metadata(self): | |
151 | secret_key = bytes(b'0' * 64) | |
152 | _key = sym_key.SymmetricKey('AES', 64 * 8, secret_key) | |
153 | key_id = self.key_mgr.store(self.context, _key) | |
154 | ||
155 | actual_key = self.key_mgr.get(self.context, | |
156 | key_id, | |
157 | metadata_only=True) | |
158 | self.assertIsNone(actual_key.get_encoded()) | |
159 | self.assertTrue(actual_key.is_metadata_only()) | |
160 | ||
150 | 161 | def test_store_null_context(self): |
151 | 162 | self.assertRaises(exception.Forbidden, |
152 | 163 | self.key_mgr.store, None, None) |
33 | 33 | self.opaque_data = self._create_data() |
34 | 34 | |
35 | 35 | super(OpaqueDataTestCase, self).setUp() |
36 | ||
37 | def test_is_not_only_metadata(self): | |
38 | self.assertFalse(self.opaque_data.is_metadata_only()) | |
39 | ||
40 | def test_is_only_metadata(self): | |
41 | d = opaque_data.OpaqueData(None, self.name, self.created) | |
42 | self.assertTrue(d.is_metadata_only()) | |
36 | 43 | |
37 | 44 | def test_get_format(self): |
38 | 45 | self.assertEqual('Opaque', self.opaque_data.format) |
33 | 33 | self.passphrase = self._create_passphrase() |
34 | 34 | |
35 | 35 | super(PassphraseTestCase, self).setUp() |
36 | ||
37 | def test_is_not_only_metadata(self): | |
38 | self.assertFalse(self.passphrase.is_metadata_only()) | |
39 | ||
40 | def test_is_only_metadata(self): | |
41 | p = passphrase.Passphrase(None, self.name, self.created) | |
42 | self.assertTrue(p.is_metadata_only()) | |
36 | 43 | |
37 | 44 | def test_get_format(self): |
38 | 45 | self.assertEqual('RAW', self.passphrase.format) |
37 | 37 | self.created = 1448088699 |
38 | 38 | |
39 | 39 | super(PrivateKeyTestCase, self).setUp() |
40 | ||
41 | def test_is_not_only_metadata(self): | |
42 | self.assertFalse(self.key.is_metadata_only()) | |
43 | ||
44 | def test_is_only_metadata(self): | |
45 | k = private_key.PrivateKey(self.algorithm, | |
46 | self.bit_length, | |
47 | None, | |
48 | self.name, | |
49 | self.created) | |
50 | self.assertTrue(k.is_metadata_only()) | |
40 | 51 | |
41 | 52 | def test_get_algorithm(self): |
42 | 53 | self.assertEqual(self.algorithm, self.key.algorithm) |
37 | 37 | self.created = 1448088699 |
38 | 38 | |
39 | 39 | super(PublicKeyTestCase, self).setUp() |
40 | ||
41 | def test_is_not_only_metadata(self): | |
42 | self.assertFalse(self.key.is_metadata_only()) | |
43 | ||
44 | def test_is_only_metadata(self): | |
45 | k = public_key.PublicKey(self.algorithm, | |
46 | self.bit_length, | |
47 | None, | |
48 | self.name, | |
49 | self.created) | |
50 | self.assertTrue(k.is_metadata_only()) | |
40 | 51 | |
41 | 52 | def test_get_algorithm(self): |
42 | 53 | self.assertEqual(self.algorithm, self.key.algorithm) |
36 | 36 | self.created = 1448088699 |
37 | 37 | |
38 | 38 | super(SymmetricKeyTestCase, self).setUp() |
39 | ||
40 | def test_is_not_only_metadata(self): | |
41 | self.assertFalse(self.key.is_metadata_only()) | |
42 | ||
43 | def test_is_only_metadata(self): | |
44 | k = sym_key.SymmetricKey(self.algorithm, | |
45 | self.bit_length, | |
46 | None, | |
47 | self.name, | |
48 | self.created) | |
49 | self.assertTrue(k.is_metadata_only()) | |
39 | 50 | |
40 | 51 | def test_get_format(self): |
41 | 52 | self.assertEqual('RAW', self.key.format) |
33 | 33 | self.created = 1448088699 |
34 | 34 | |
35 | 35 | super(X509TestCase, self).setUp() |
36 | ||
37 | def test_is_not_only_metadata(self): | |
38 | self.assertFalse(self.cert.is_metadata_only()) | |
39 | ||
40 | def test_is_only_metadata(self): | |
41 | c = x_509.X509(None, self.name, self.created) | |
42 | self.assertTrue(c.is_metadata_only()) | |
36 | 43 | |
37 | 44 | def test_get_format(self): |
38 | 45 | self.assertEqual('X.509', self.cert.format) |