Merge "Add list capability"
Jenkins authored 6 years ago
Gerrit Code Review committed 6 years ago
571 | 571 | uuid=managed_object_id) |
572 | 572 | else: |
573 | 573 | raise exception.KeyManagerError(reason=e) |
574 | ||
575 | def list(self, context, object_type=None, metadata_only=False): | |
576 | """Retrieves a list of managed objects that match the criteria. | |
577 | ||
578 | If no search criteria is given, all objects are returned. | |
579 | ||
580 | :param context: contains information of the user and the environment | |
581 | for the request (castellan/context.py) | |
582 | :param object_type: the type of object to retrieve | |
583 | :param metadata_only: whether secret data should be included | |
584 | :raises KeyManagerError: if listing secrets fails | |
585 | """ | |
586 | objects = [] | |
587 | barbican_client = self._get_barbican_client(context) | |
588 | ||
589 | if object_type and object_type not in self._secret_type_dict: | |
590 | msg = _("Invalid secret type: %s") % object_type | |
591 | LOG.error(msg) | |
592 | raise exception.KeyManagerError(reason=msg) | |
593 | ||
594 | secret_type = self._secret_type_dict.get(object_type) | |
595 | ||
596 | try: | |
597 | secrets = barbican_client.secrets.list(secret_type=secret_type) | |
598 | except (barbican_exceptions.HTTPAuthError, | |
599 | barbican_exceptions.HTTPClientError, | |
600 | barbican_exceptions.HTTPServerError) as e: | |
601 | LOG.error(_("Error listing objects: %s"), e) | |
602 | raise exception.KeyManagerError(reason=e) | |
603 | ||
604 | for secret in secrets: | |
605 | try: | |
606 | obj = self._get_castellan_object(secret, metadata_only) | |
607 | objects.append(obj) | |
608 | except (barbican_exceptions.HTTPAuthError, | |
609 | barbican_exceptions.HTTPClientError, | |
610 | barbican_exceptions.HTTPServerError) as e: | |
611 | LOG.warn(_("Error occurred while retrieving object metadata," | |
612 | " not adding it to the list: %s"), e) | |
613 | ||
614 | return objects |
108 | 108 | considered "non-existent" and completely invisible. |
109 | 109 | """ |
110 | 110 | pass |
111 | ||
112 | @abc.abstractmethod | |
113 | def list(self, context, object_type=None, metadata_only=False): | |
114 | """Lists the managed objects given the criteria. | |
115 | ||
116 | Implementations should verify that the caller has permission to list | |
117 | the managed objects and should only list the objects the caller has | |
118 | access to by checking the context object (context). A NotAuthorized | |
119 | exception should be raised if the caller lacks permission. | |
120 | ||
121 | A list of managed objects or managed object metadata should be | |
122 | returned, depending on the metadata_only flag. If no objects are | |
123 | found, an empty list should be returned instead. | |
124 | """ | |
125 | pass |
44 | 44 | def get(self, context, managed_object_id, **kwargs): |
45 | 45 | raise NotImplementedError() |
46 | 46 | |
47 | def list(self, context, object_type=None): | |
48 | raise NotImplementedError() | |
49 | ||
47 | 50 | def delete(self, context, managed_object_id, **kwargs): |
48 | 51 | raise NotImplementedError() |
170 | 170 | retrieved_object = self.key_mgr.get(self.ctxt, uuid) |
171 | 171 | self.assertEqual(managed_object.get_encoded(), |
172 | 172 | retrieved_object.get_encoded()) |
173 | ||
174 | @utils.parameterized_dataset({ | |
175 | 'symmetric_key': [_get_test_symmetric_key()], | |
176 | 'public_key': [_get_test_public_key()], | |
177 | 'private_key': [_get_test_private_key()], | |
178 | 'certificate': [_get_test_certificate()], | |
179 | 'passphrase': [_get_test_passphrase()], | |
180 | 'opaque_data': [_get_test_opaque_data()], | |
181 | }) | |
182 | def test_list(self, managed_object): | |
183 | uuid = self.key_mgr.store(self.ctxt, managed_object) | |
184 | self.addCleanup(self.key_mgr.delete, self.ctxt, uuid) | |
185 | ||
186 | # the list command may return more objects than the one we just | |
187 | # created if older objects were not cleaned up, so we will simply | |
188 | # check if the object we created is in the list | |
189 | retrieved_objects = self.key_mgr.list(self.ctxt) | |
190 | self.assertTrue(managed_object in retrieved_objects) | |
191 | for obj in retrieved_objects: | |
192 | self.assertFalse(obj.is_metadata_only()) | |
193 | ||
194 | @utils.parameterized_dataset({ | |
195 | 'symmetric_key': [_get_test_symmetric_key()], | |
196 | 'public_key': [_get_test_public_key()], | |
197 | 'private_key': [_get_test_private_key()], | |
198 | 'certificate': [_get_test_certificate()], | |
199 | 'passphrase': [_get_test_passphrase()], | |
200 | 'opaque_data': [_get_test_opaque_data()], | |
201 | }) | |
202 | def test_list_metadata_only(self, managed_object): | |
203 | uuid = self.key_mgr.store(self.ctxt, managed_object) | |
204 | self.addCleanup(self.key_mgr.delete, self.ctxt, uuid) | |
205 | ||
206 | expected_obj = self.key_mgr.get(self.ctxt, uuid, metadata_only=True) | |
207 | ||
208 | # the list command may return more objects than the one we just | |
209 | # created if older objects were not cleaned up, so we will simply | |
210 | # check if the object we created is in the list | |
211 | retrieved_objects = self.key_mgr.list(self.ctxt, metadata_only=True) | |
212 | self.assertTrue(expected_obj in retrieved_objects) | |
213 | for obj in retrieved_objects: | |
214 | self.assertTrue(obj.is_metadata_only()) | |
215 | ||
216 | @utils.parameterized_dataset({ | |
217 | 'query_by_object_type': { | |
218 | 'object_1': _get_test_symmetric_key(), | |
219 | 'object_2': _get_test_public_key(), | |
220 | 'query_dict': dict(object_type=symmetric_key.SymmetricKey) | |
221 | }, | |
222 | }) | |
223 | def test_list_with_filter(self, object_1, object_2, query_dict): | |
224 | uuid1 = self.key_mgr.store(self.ctxt, object_1) | |
225 | uuid2 = self.key_mgr.store(self.ctxt, object_2) | |
226 | self.addCleanup(self.key_mgr.delete, self.ctxt, uuid1) | |
227 | self.addCleanup(self.key_mgr.delete, self.ctxt, uuid2) | |
228 | ||
229 | # the list command may return more objects than the one we just | |
230 | # created if older objects were not cleaned up, so we will simply | |
231 | # check that the returned objects have the expected type | |
232 | retrieved_objects = self.key_mgr.list(self.ctxt, **query_dict) | |
233 | for retrieved_object in retrieved_objects: | |
234 | self.assertEqual(type(object_1), type(retrieved_object)) | |
235 | self.assertTrue(object_1 in retrieved_objects) |
225 | 225 | random.shuffle(password) |
226 | 226 | |
227 | 227 | return ''.join(password) |
228 | ||
229 | def list(self, context, object_type=None, metadata_only=False): | |
230 | """Retrieves a list of managed objects that match the criteria. | |
231 | ||
232 | A Forbidden exception is raised if the context is None. | |
233 | If no search criteria is given, all objects are returned. | |
234 | """ | |
235 | if context is None: | |
236 | raise exception.Forbidden() | |
237 | ||
238 | objects = [] | |
239 | for obj_id in self.keys: | |
240 | obj = self.get(context, obj_id, metadata_only=metadata_only) | |
241 | if type(obj) == object_type or object_type is None: | |
242 | objects.append(obj) | |
243 | return objects |
71 | 71 | self.delete = self.mock_barbican.secrets.delete |
72 | 72 | self.store = self.mock_barbican.secrets.store |
73 | 73 | self.create = self.mock_barbican.secrets.create |
74 | self.list = self.mock_barbican.secrets.list | |
74 | 75 | |
75 | 76 | self.key_mgr._barbican_client = self.mock_barbican |
76 | 77 | self.key_mgr._current_context = self.ctxt |
347 | 348 | order_ref_url) |
348 | 349 | |
349 | 350 | self.assertEqual(1, self.mock_barbican.orders.get.call_count) |
351 | ||
352 | def test_list_null_context(self): | |
353 | self.key_mgr._barbican_client = None | |
354 | self.assertRaises(exception.Forbidden, | |
355 | self.key_mgr.list, None) | |
356 | ||
357 | def test_list(self): | |
358 | original_secret_metadata = mock.Mock() | |
359 | original_secret_metadata.algorithm = mock.sentinel.alg | |
360 | original_secret_metadata.bit_length = mock.sentinel.bit | |
361 | original_secret_metadata.secret_type = 'symmetric' | |
362 | ||
363 | created = timeutils.parse_isotime('2015-10-20 18:51:17+00:00') | |
364 | original_secret_metadata.created = created | |
365 | created_formatted = timeutils.parse_isotime(str(created)) | |
366 | created_posix = calendar.timegm(created_formatted.timetuple()) | |
367 | ||
368 | key_name = 'my key' | |
369 | original_secret_metadata.name = key_name | |
370 | ||
371 | original_secret_data = b'test key' | |
372 | original_secret_metadata.payload = original_secret_data | |
373 | ||
374 | self.mock_barbican.secrets.list.return_value = ( | |
375 | [original_secret_metadata]) | |
376 | ||
377 | # check metadata_only = False | |
378 | key_list = self.key_mgr.list(self.ctxt) | |
379 | self.assertEqual(1, len(key_list)) | |
380 | key = key_list[0] | |
381 | ||
382 | self.list.assert_called_once() | |
383 | self.assertEqual(key_name, key.name) | |
384 | self.assertEqual(original_secret_data, key.get_encoded()) | |
385 | self.assertEqual(created_posix, key.created) | |
386 | ||
387 | self.list.reset_mock() | |
388 | ||
389 | # check metadata_only = True | |
390 | key_list = self.key_mgr.list(self.ctxt, metadata_only=True) | |
391 | self.assertEqual(1, len(key_list)) | |
392 | key = key_list[0] | |
393 | ||
394 | self.list.assert_called_once() | |
395 | self.assertEqual(key_name, key.name) | |
396 | self.assertIsNone(key.get_encoded()) | |
397 | self.assertEqual(created_posix, key.created) | |
398 | ||
399 | def test_list_with_error(self): | |
400 | self.mock_barbican.secrets.list = mock.Mock( | |
401 | side_effect=barbican_exceptions.HTTPClientError('test error')) | |
402 | self.assertRaises(exception.KeyManagerError, | |
403 | self.key_mgr.list, self.ctxt) | |
404 | ||
405 | def test_list_with_invalid_object_type(self): | |
406 | self.assertRaises(exception.KeyManagerError, | |
407 | self.key_mgr.list, self.ctxt, "invalid_type") |
53 | 53 | |
54 | 54 | self.context = context.RequestContext('fake', 'fake') |
55 | 55 | |
56 | def cleanUp(self): | |
57 | super(MockKeyManagerTestCase, self).cleanUp() | |
58 | ||
59 | self.key_mgr.keys = {} | |
60 | ||
56 | 61 | def test_create_key(self): |
57 | 62 | key_id_1 = self.key_mgr.create_key(self.context) |
58 | 63 | key_id_2 = self.key_mgr.create_key(self.context) |
200 | 205 | def test_delete_unknown_key(self): |
201 | 206 | self.assertRaises(KeyError, self.key_mgr.delete, self.context, |
202 | 207 | None) |
208 | ||
209 | def test_list_null_context(self): | |
210 | self.assertRaises(exception.Forbidden, self.key_mgr.list, None) | |
211 | ||
212 | def test_list_keys(self): | |
213 | key1 = sym_key.SymmetricKey('AES', 64 * 8, bytes(b'0' * 64)) | |
214 | self.key_mgr.store(self.context, key1) | |
215 | key2 = sym_key.SymmetricKey('AES', 32 * 8, bytes(b'0' * 32)) | |
216 | self.key_mgr.store(self.context, key2) | |
217 | ||
218 | keys = self.key_mgr.list(self.context) | |
219 | self.assertEqual(2, len(keys)) | |
220 | self.assertTrue(key1 in keys) | |
221 | self.assertTrue(key2 in keys) | |
222 | ||
223 | def test_list_keys_metadata_only(self): | |
224 | key1 = sym_key.SymmetricKey('AES', 64 * 8, bytes(b'0' * 64)) | |
225 | self.key_mgr.store(self.context, key1) | |
226 | key2 = sym_key.SymmetricKey('AES', 32 * 8, bytes(b'0' * 32)) | |
227 | self.key_mgr.store(self.context, key2) | |
228 | ||
229 | keys = self.key_mgr.list(self.context, metadata_only=True) | |
230 | self.assertEqual(2, len(keys)) | |
231 | bit_length_list = [key1.bit_length, key2.bit_length] | |
232 | for key in keys: | |
233 | self.assertTrue(key.is_metadata_only()) | |
234 | self.assertTrue(key.bit_length in bit_length_list) |