Package list python-castellan / 1a13c2b
Add list capability Adds ability to list secrets, and adds initial filtering ability. Can filter by secret_type. Depends-On: I583f27f91cb3c6bdb23438dff6b539407b4005ed Depends-On: I99cd72724e11bab362bcaaeb773f33b2abfe815c Change-Id: I245d5846aa8d3b9586bea6dc4e0b24db86c911c9 Kaitlin Farr 4 years ago
8 changed file(s) with 232 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
571571 uuid=managed_object_id)
572572 else:
573573 raise exception.KeyManagerError(reason=e)
574
575 def list(self, context, object_type=None, metadata_only=False):
576 """Retrieves a list of managed objects that match the criteria.
577
578 If no search criteria is given, all objects are returned.
579
580 :param context: contains information of the user and the environment
581 for the request (castellan/context.py)
582 :param object_type: the type of object to retrieve
583 :param metadata_only: whether secret data should be included
584 :raises KeyManagerError: if listing secrets fails
585 """
586 objects = []
587 barbican_client = self._get_barbican_client(context)
588
589 if object_type and object_type not in self._secret_type_dict:
590 msg = _("Invalid secret type: %s") % object_type
591 LOG.error(msg)
592 raise exception.KeyManagerError(reason=msg)
593
594 secret_type = self._secret_type_dict.get(object_type)
595
596 try:
597 secrets = barbican_client.secrets.list(secret_type=secret_type)
598 except (barbican_exceptions.HTTPAuthError,
599 barbican_exceptions.HTTPClientError,
600 barbican_exceptions.HTTPServerError) as e:
601 LOG.error(_("Error listing objects: %s"), e)
602 raise exception.KeyManagerError(reason=e)
603
604 for secret in secrets:
605 try:
606 obj = self._get_castellan_object(secret, metadata_only)
607 objects.append(obj)
608 except (barbican_exceptions.HTTPAuthError,
609 barbican_exceptions.HTTPClientError,
610 barbican_exceptions.HTTPServerError) as e:
611 LOG.warn(_("Error occurred while retrieving object metadata,"
612 " not adding it to the list: %s"), e)
613
614 return objects
108108 considered "non-existent" and completely invisible.
109109 """
110110 pass
111
112 @abc.abstractmethod
113 def list(self, context, object_type=None, metadata_only=False):
114 """Lists the managed objects given the criteria.
115
116 Implementations should verify that the caller has permission to list
117 the managed objects and should only list the objects the caller has
118 access to by checking the context object (context). A NotAuthorized
119 exception should be raised if the caller lacks permission.
120
121 A list of managed objects or managed object metadata should be
122 returned, depending on the metadata_only flag. If no objects are
123 found, an empty list should be returned instead.
124 """
125 pass
4444 def get(self, context, managed_object_id, **kwargs):
4545 raise NotImplementedError()
4646
47 def list(self, context, object_type=None):
48 raise NotImplementedError()
49
4750 def delete(self, context, managed_object_id, **kwargs):
4851 raise NotImplementedError()
170170 retrieved_object = self.key_mgr.get(self.ctxt, uuid)
171171 self.assertEqual(managed_object.get_encoded(),
172172 retrieved_object.get_encoded())
173
174 @utils.parameterized_dataset({
175 'symmetric_key': [_get_test_symmetric_key()],
176 'public_key': [_get_test_public_key()],
177 'private_key': [_get_test_private_key()],
178 'certificate': [_get_test_certificate()],
179 'passphrase': [_get_test_passphrase()],
180 'opaque_data': [_get_test_opaque_data()],
181 })
182 def test_list(self, managed_object):
183 uuid = self.key_mgr.store(self.ctxt, managed_object)
184 self.addCleanup(self.key_mgr.delete, self.ctxt, uuid)
185
186 # the list command may return more objects than the one we just
187 # created if older objects were not cleaned up, so we will simply
188 # check if the object we created is in the list
189 retrieved_objects = self.key_mgr.list(self.ctxt)
190 self.assertTrue(managed_object in retrieved_objects)
191 for obj in retrieved_objects:
192 self.assertFalse(obj.is_metadata_only())
193
194 @utils.parameterized_dataset({
195 'symmetric_key': [_get_test_symmetric_key()],
196 'public_key': [_get_test_public_key()],
197 'private_key': [_get_test_private_key()],
198 'certificate': [_get_test_certificate()],
199 'passphrase': [_get_test_passphrase()],
200 'opaque_data': [_get_test_opaque_data()],
201 })
202 def test_list_metadata_only(self, managed_object):
203 uuid = self.key_mgr.store(self.ctxt, managed_object)
204 self.addCleanup(self.key_mgr.delete, self.ctxt, uuid)
205
206 expected_obj = self.key_mgr.get(self.ctxt, uuid, metadata_only=True)
207
208 # the list command may return more objects than the one we just
209 # created if older objects were not cleaned up, so we will simply
210 # check if the object we created is in the list
211 retrieved_objects = self.key_mgr.list(self.ctxt, metadata_only=True)
212 self.assertTrue(expected_obj in retrieved_objects)
213 for obj in retrieved_objects:
214 self.assertTrue(obj.is_metadata_only())
215
216 @utils.parameterized_dataset({
217 'query_by_object_type': {
218 'object_1': _get_test_symmetric_key(),
219 'object_2': _get_test_public_key(),
220 'query_dict': dict(object_type=symmetric_key.SymmetricKey)
221 },
222 })
223 def test_list_with_filter(self, object_1, object_2, query_dict):
224 uuid1 = self.key_mgr.store(self.ctxt, object_1)
225 uuid2 = self.key_mgr.store(self.ctxt, object_2)
226 self.addCleanup(self.key_mgr.delete, self.ctxt, uuid1)
227 self.addCleanup(self.key_mgr.delete, self.ctxt, uuid2)
228
229 # the list command may return more objects than the one we just
230 # created if older objects were not cleaned up, so we will simply
231 # check that the returned objects have the expected type
232 retrieved_objects = self.key_mgr.list(self.ctxt, **query_dict)
233 for retrieved_object in retrieved_objects:
234 self.assertEqual(type(object_1), type(retrieved_object))
235 self.assertTrue(object_1 in retrieved_objects)
225225 random.shuffle(password)
226226
227227 return ''.join(password)
228
229 def list(self, context, object_type=None, metadata_only=False):
230 """Retrieves a list of managed objects that match the criteria.
231
232 A Forbidden exception is raised if the context is None.
233 If no search criteria is given, all objects are returned.
234 """
235 if context is None:
236 raise exception.Forbidden()
237
238 objects = []
239 for obj_id in self.keys:
240 obj = self.get(context, obj_id, metadata_only=metadata_only)
241 if type(obj) == object_type or object_type is None:
242 objects.append(obj)
243 return objects
7171 self.delete = self.mock_barbican.secrets.delete
7272 self.store = self.mock_barbican.secrets.store
7373 self.create = self.mock_barbican.secrets.create
74 self.list = self.mock_barbican.secrets.list
7475
7576 self.key_mgr._barbican_client = self.mock_barbican
7677 self.key_mgr._current_context = self.ctxt
347348 order_ref_url)
348349
349350 self.assertEqual(1, self.mock_barbican.orders.get.call_count)
351
352 def test_list_null_context(self):
353 self.key_mgr._barbican_client = None
354 self.assertRaises(exception.Forbidden,
355 self.key_mgr.list, None)
356
357 def test_list(self):
358 original_secret_metadata = mock.Mock()
359 original_secret_metadata.algorithm = mock.sentinel.alg
360 original_secret_metadata.bit_length = mock.sentinel.bit
361 original_secret_metadata.secret_type = 'symmetric'
362
363 created = timeutils.parse_isotime('2015-10-20 18:51:17+00:00')
364 original_secret_metadata.created = created
365 created_formatted = timeutils.parse_isotime(str(created))
366 created_posix = calendar.timegm(created_formatted.timetuple())
367
368 key_name = 'my key'
369 original_secret_metadata.name = key_name
370
371 original_secret_data = b'test key'
372 original_secret_metadata.payload = original_secret_data
373
374 self.mock_barbican.secrets.list.return_value = (
375 [original_secret_metadata])
376
377 # check metadata_only = False
378 key_list = self.key_mgr.list(self.ctxt)
379 self.assertEqual(1, len(key_list))
380 key = key_list[0]
381
382 self.list.assert_called_once()
383 self.assertEqual(key_name, key.name)
384 self.assertEqual(original_secret_data, key.get_encoded())
385 self.assertEqual(created_posix, key.created)
386
387 self.list.reset_mock()
388
389 # check metadata_only = True
390 key_list = self.key_mgr.list(self.ctxt, metadata_only=True)
391 self.assertEqual(1, len(key_list))
392 key = key_list[0]
393
394 self.list.assert_called_once()
395 self.assertEqual(key_name, key.name)
396 self.assertIsNone(key.get_encoded())
397 self.assertEqual(created_posix, key.created)
398
399 def test_list_with_error(self):
400 self.mock_barbican.secrets.list = mock.Mock(
401 side_effect=barbican_exceptions.HTTPClientError('test error'))
402 self.assertRaises(exception.KeyManagerError,
403 self.key_mgr.list, self.ctxt)
404
405 def test_list_with_invalid_object_type(self):
406 self.assertRaises(exception.KeyManagerError,
407 self.key_mgr.list, self.ctxt, "invalid_type")
5353
5454 self.context = context.RequestContext('fake', 'fake')
5555
56 def cleanUp(self):
57 super(MockKeyManagerTestCase, self).cleanUp()
58
59 self.key_mgr.keys = {}
60
5661 def test_create_key(self):
5762 key_id_1 = self.key_mgr.create_key(self.context)
5863 key_id_2 = self.key_mgr.create_key(self.context)
200205 def test_delete_unknown_key(self):
201206 self.assertRaises(KeyError, self.key_mgr.delete, self.context,
202207 None)
208
209 def test_list_null_context(self):
210 self.assertRaises(exception.Forbidden, self.key_mgr.list, None)
211
212 def test_list_keys(self):
213 key1 = sym_key.SymmetricKey('AES', 64 * 8, bytes(b'0' * 64))
214 self.key_mgr.store(self.context, key1)
215 key2 = sym_key.SymmetricKey('AES', 32 * 8, bytes(b'0' * 32))
216 self.key_mgr.store(self.context, key2)
217
218 keys = self.key_mgr.list(self.context)
219 self.assertEqual(2, len(keys))
220 self.assertTrue(key1 in keys)
221 self.assertTrue(key2 in keys)
222
223 def test_list_keys_metadata_only(self):
224 key1 = sym_key.SymmetricKey('AES', 64 * 8, bytes(b'0' * 64))
225 self.key_mgr.store(self.context, key1)
226 key2 = sym_key.SymmetricKey('AES', 32 * 8, bytes(b'0' * 32))
227 self.key_mgr.store(self.context, key2)
228
229 keys = self.key_mgr.list(self.context, metadata_only=True)
230 self.assertEqual(2, len(keys))
231 bit_length_list = [key1.bit_length, key2.bit_length]
232 for key in keys:
233 self.assertTrue(key.is_metadata_only())
234 self.assertTrue(key.bit_length in bit_length_list)
4545 self.assertRaises(NotImplementedError,
4646 self.key_mgr.get, None, None)
4747
48 def test_list(self):
49 self.assertRaises(NotImplementedError,
50 self.key_mgr.list, None)
51
4852 def test_delete(self):
4953 self.assertRaises(NotImplementedError,
5054 self.key_mgr.delete, None, None)