diff --git a/.testr.conf b/.testr.conf index 6d83b3c..f624a4f 100644 --- a/.testr.conf +++ b/.testr.conf @@ -2,6 +2,6 @@ test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \ OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \ OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \ - ${PYTHON:-python} -m subunit.run discover -t ./ . $LISTOPT $IDOPTION + ${PYTHON:-python} -m subunit.run discover -t ./ ${OS_TEST_PATH:-./castellan/tests} $LISTOPT $IDOPTION test_id_option=--load-list $IDFILE test_list_option=--list diff --git a/castellan/tests/key_manager/__init__.py b/castellan/tests/key_manager/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/castellan/tests/key_manager/fake.py b/castellan/tests/key_manager/fake.py deleted file mode 100644 index a3e2782..0000000 --- a/castellan/tests/key_manager/fake.py +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright 2011 Justin Santa Barbara -# Copyright 2012 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Implementation of a fake key manager.""" - - -from castellan.tests.key_manager import mock_key_manager - - -def fake_api(): - return mock_key_manager.MockKeyManager() diff --git a/castellan/tests/key_manager/mock_key_manager.py b/castellan/tests/key_manager/mock_key_manager.py deleted file mode 100644 index 8722f7d..0000000 --- a/castellan/tests/key_manager/mock_key_manager.py +++ /dev/null @@ -1,157 +0,0 @@ -# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -A mock implementation of a key manager that stores keys in a dictionary. - -This key manager implementation is primarily intended for testing. In -particular, it does not store keys persistently. Lack of a centralized key -store also makes this implementation unsuitable for use among different -services. - -Note: Instantiating this class multiple times will create separate key stores. -Keys created in one instance will not be accessible from other instances of -this class. -""" - -import array -import binascii -import random -import uuid - -from castellan.common import exception -from castellan.key_manager import key_manager -from castellan.key_manager import symmetric_key as sym_key - - -class MockKeyManager(key_manager.KeyManager): - - """Mocking manager for integration tests. - - This mock key manager implementation supports all the methods specified - by the key manager interface. This implementation stores keys within a - dictionary, and as a result, it is not acceptable for use across different - services. Side effects (e.g., raising exceptions) for each method are - handled as specified by the key manager interface. - - This key manager is not suitable for use in production deployments. - """ - - def __init__(self): - self.keys = {} - - def _generate_hex_key(self, **kwargs): - key_length = kwargs.get('key_length', 256) - # hex digit => 4 bits - length = int(key_length / 4) - hex_encoded = self._generate_password(length=length, - symbolgroups='0123456789ABCDEF') - return hex_encoded - - def _generate_key(self, **kwargs): - _hex = self._generate_hex_key(**kwargs) - return sym_key.SymmetricKey( - 'AES', - array.array('B', binascii.unhexlify(_hex)).tolist()) - - def create_key(self, context, **kwargs): - """Creates a key. - - This implementation returns a UUID for the created key. A - Forbidden exception is raised if the specified context is None. - """ - if context is None: - raise exception.Forbidden() - - key = self._generate_key(**kwargs) - return self.store_key(context, key) - - def _generate_key_id(self): - key_id = str(uuid.uuid4()) - while key_id in self.keys: - key_id = str(uuid.uuid4()) - - return key_id - - def store_key(self, context, key, **kwargs): - """Stores (i.e., registers) a key with the key manager.""" - if context is None: - raise exception.Forbidden() - - key_id = self._generate_key_id() - self.keys[key_id] = key - - return key_id - - def copy_key(self, context, key_id, **kwargs): - if context is None: - raise exception.Forbidden() - - copied_key_id = self._generate_key_id() - self.keys[copied_key_id] = self.keys[key_id] - - return copied_key_id - - def get_key(self, context, key_id, **kwargs): - """Retrieves the key identified by the specified id. - - This implementation returns the key that is associated with the - specified UUID. A Forbidden exception is raised if the specified - context is None; a KeyError is raised if the UUID is invalid. - """ - if context is None: - raise exception.Forbidden() - - return self.keys[key_id] - - def delete_key(self, context, key_id, **kwargs): - """Deletes the key identified by the specified id. - - A Forbidden exception is raised if the context is None and a - KeyError is raised if the UUID is invalid. - """ - if context is None: - raise exception.Forbidden() - - del self.keys[key_id] - - def _generate_password(self, length, symbolgroups): - """Generate a random password from the supplied symbol groups. - - At least one symbol from each group will be included. Unpredictable - results if length is less than the number of symbol groups. - - Believed to be reasonably secure (with a reasonable password length!) - """ - # NOTE(jerdfelt): Some password policies require at least one character - # from each group of symbols, so start off with one random character - # from each symbol group - password = [random.choice(s) for s in symbolgroups] - # If length < len(symbolgroups), the leading characters will only - # be from the first length groups. Try our best to not be predictable - # by shuffling and then truncating. - random.shuffle(password) - password = password[:length] - length -= len(password) - - # then fill with random characters from all symbol groups - symbols = ''.join(symbolgroups) - password.extend([random.choice(symbols) for _i in range(length)]) - - # finally shuffle to ensure first x characters aren't from a - # predictable group - random.shuffle(password) - - return ''.join(password) diff --git a/castellan/tests/key_manager/test_barbican_key_manager.py b/castellan/tests/key_manager/test_barbican_key_manager.py deleted file mode 100644 index 0098e8c..0000000 --- a/castellan/tests/key_manager/test_barbican_key_manager.py +++ /dev/null @@ -1,218 +0,0 @@ -# Copyright (c) The Johns Hopkins University/Applied Physics Laboratory -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Test cases for the barbican key manager. -""" - -import array - -import mock - -from castellan.common import exception -from castellan.key_manager import barbican_key_manager -from castellan.key_manager import symmetric_key as key_manager_key -from castellan.tests.key_manager import test_key_manager - - -class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase): - - def _create_key_manager(self): - return barbican_key_manager.BarbicanKeyManager() - - def setUp(self): - super(BarbicanKeyManagerTestCase, self).setUp() - - # Create fake auth_token - self.ctxt = mock.Mock() - self.ctxt.auth_token = "fake_token" - - # Create mock barbican client - self._build_mock_barbican() - - # Create a key_id, secret_ref, pre_hex, and hex to use - self.key_id = "d152fa13-2b41-42ca-a934-6c21566c0f40" - self.secret_ref = ("http://host:9311/v1/secrets/" + self.key_id) - self.pre_hex = "AIDxQp2++uAbKaTVDMXFYIu8PIugJGqkK0JLqkU0rhY=" - self.hex = ("0080f1429dbefae01b29a4d50cc5c5608bbc3c8ba0246aa42b424baa4" - "534ae16") - self.key_mgr._base_url = "http://host:9311/v1/" - self.addCleanup(self._restore) - - def _restore(self): - try: - getattr(self, 'original_key') - key_manager_key.SymmetricKey = self.original_key - except AttributeError: - return None - - def _build_mock_barbican(self): - self.mock_barbican = mock.MagicMock(name='mock_barbican') - - # Set commonly used methods - self.get = self.mock_barbican.secrets.get - self.delete = self.mock_barbican.secrets.delete - self.store = self.mock_barbican.secrets.store - self.create = self.mock_barbican.secrets.create - - self.key_mgr._barbican_client = self.mock_barbican - self.key_mgr._current_context = self.ctxt - - def _build_mock_symKey(self): - self.mock_symKey = mock.Mock() - - def fake_sym_key(alg, key): - self.mock_symKey.get_encoded.return_value = key - self.mock_symKey.get_algorithm.return_value = alg - return self.mock_symKey - self.original_key = key_manager_key.SymmetricKey - key_manager_key.SymmetricKey = fake_sym_key - - def test_copy_key(self): - # Create metadata for original secret - original_secret_metadata = mock.Mock() - original_secret_metadata.algorithm = mock.sentinel.alg - original_secret_metadata.bit_length = mock.sentinel.bit - original_secret_metadata.name = mock.sentinel.name - original_secret_metadata.expiration = mock.sentinel.expiration - original_secret_metadata.mode = mock.sentinel.mode - content_types = {'default': 'fake_type'} - original_secret_metadata.content_types = content_types - original_secret_data = mock.Mock() - original_secret_metadata.payload = original_secret_data - - # Create href for copied secret - copied_secret = mock.Mock() - copied_secret.store.return_value = ( - 'http://http://host:9311/v1/secrets/uuid') - - # Set get and create return values - self.get.return_value = original_secret_metadata - self.create.return_value = copied_secret - - # Create the mock key - self._build_mock_symKey() - - # Copy the original - self.key_mgr.copy_key(self.ctxt, self.key_id) - - # Assert proper methods were called - self.get.assert_called_once_with(self.secret_ref) - self.create.assert_called_once_with( - payload=self.mock_symKey.get_encoded(), - algorithm=mock.sentinel.alg, - expiration=mock.sentinel.expiration) - copied_secret.store.assert_called_once_with() - - def test_copy_null_context(self): - self.key_mgr._barbican_client = None - self.assertRaises(exception.Forbidden, - self.key_mgr.copy_key, None, self.key_id) - - def test_create_key(self): - # Create order_ref_url and assign return value - order_ref_url = ("http://localhost:9311/v1/orders/" - "4fe939b7-72bc-49aa-bd1e-e979589858af") - key_order = mock.Mock() - self.mock_barbican.orders.create_key.return_value = key_order - key_order.submit.return_value = order_ref_url - - # Create order and assign return value - order = mock.Mock() - order.secret_ref = self.secret_ref - self.mock_barbican.orders.get.return_value = order - - # Create the key, get the UUID - returned_uuid = self.key_mgr.create_key(self.ctxt, - algorithm='AES', - length=256) - - self.mock_barbican.orders.get.assert_called_once_with(order_ref_url) - self.assertEqual(self.key_id, returned_uuid) - - def test_create_null_context(self): - self.key_mgr._barbican_client = None - self.assertRaises(exception.Forbidden, - self.key_mgr.create_key, None, 'AES', 256) - - def test_delete_null_context(self): - self.key_mgr._barbican_client = None - self.assertRaises(exception.Forbidden, - self.key_mgr.delete_key, None, self.key_id) - - def test_delete_key(self): - self.key_mgr.delete_key(self.ctxt, self.key_id) - self.delete.assert_called_once_with(self.secret_ref) - - def test_delete_unknown_key(self): - self.assertRaises(exception.KeyManagerError, - self.key_mgr.delete_key, self.ctxt, None) - - def test_get_key(self): - original_secret_metadata = mock.Mock() - original_secret_metadata.algorithm = mock.sentinel.alg - original_secret_metadata.bit_length = mock.sentinel.bit - original_secret_data = mock.Mock() - original_secret_metadata.payload = original_secret_data - - self.mock_barbican.secrets.get.return_value = original_secret_metadata - key = self.key_mgr.get_key(self.ctxt, self.key_id) - - self.get.assert_called_once_with(self.secret_ref) - self.assertEqual(key.get_encoded(), original_secret_data) - - def test_get_null_context(self): - self.key_mgr._barbican_client = None - self.assertRaises(exception.Forbidden, - self.key_mgr.get_key, None, self.key_id) - - def test_get_unknown_key(self): - self.assertRaises(exception.KeyManagerError, - self.key_mgr.get_key, self.ctxt, None) - - def test_store_key_base64(self): - # Create Key to store - secret_key = array.array('B', [0x01, 0x02, 0xA0, 0xB3]).tolist() - _key = key_manager_key.SymmetricKey('AES', secret_key) - - # Define the return values - secret = mock.Mock() - self.create.return_value = secret - secret.store.return_value = self.secret_ref - - # Store the Key - returned_uuid = self.key_mgr.store_key(self.ctxt, _key) - - self.create.assert_called_once_with(algorithm='AES', - payload=secret_key, - expiration=None) - self.assertEqual(self.key_id, returned_uuid) - - def test_store_key_plaintext(self): - # Create the plaintext key - secret_key_text = "This is a test text key." - _key = key_manager_key.SymmetricKey('AES', secret_key_text) - - # Store the Key - self.key_mgr.store_key(self.ctxt, _key) - self.create.assert_called_once_with(algorithm='AES', - payload=secret_key_text, - expiration=None) - self.assertEqual(0, self.store.call_count) - - def test_store_null_context(self): - self.key_mgr._barbican_client = None - self.assertRaises(exception.Forbidden, - self.key_mgr.store_key, None, None) diff --git a/castellan/tests/key_manager/test_key.py b/castellan/tests/key_manager/test_key.py deleted file mode 100644 index 20c1db5..0000000 --- a/castellan/tests/key_manager/test_key.py +++ /dev/null @@ -1,68 +0,0 @@ -# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Test cases for the key classes. -""" - -import array -import binascii - -from castellan.key_manager import symmetric_key as sym_key -from castellan.tests import base - - -class KeyTestCase(base.TestCase): - - def _create_key(self): - raise NotImplementedError() - - def setUp(self): - super(KeyTestCase, self).setUp() - - self.key = self._create_key() - - -class SymmetricKeyTestCase(KeyTestCase): - - def _create_key(self): - return sym_key.SymmetricKey(self.algorithm, self.encoded) - - def setUp(self): - self.algorithm = 'AES' - self.encoded = array.array('B', binascii.unhexlify('0' * 64)).tolist() - - super(SymmetricKeyTestCase, self).setUp() - - def test_get_algorithm(self): - self.assertEqual(self.key.get_algorithm(), self.algorithm) - - def test_get_format(self): - self.assertEqual(self.key.get_format(), 'RAW') - - def test_get_encoded(self): - self.assertEqual(self.key.get_encoded(), self.encoded) - - def test___eq__(self): - self.assertTrue(self.key == self.key) - - self.assertFalse(self.key is None) - self.assertFalse(None == self.key) - - def test___ne__(self): - self.assertFalse(self.key != self.key) - - self.assertTrue(self.key is not None) - self.assertTrue(None != self.key) diff --git a/castellan/tests/key_manager/test_key_manager.py b/castellan/tests/key_manager/test_key_manager.py deleted file mode 100644 index 5b29b1d..0000000 --- a/castellan/tests/key_manager/test_key_manager.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Test cases for the key manager. -""" - -from castellan.tests import base - - -class KeyManagerTestCase(base.TestCase): - - def _create_key_manager(self): - raise NotImplementedError() - - def setUp(self): - super(KeyManagerTestCase, self).setUp() - - self.key_mgr = self._create_key_manager() diff --git a/castellan/tests/key_manager/test_mock_key_manager.py b/castellan/tests/key_manager/test_mock_key_manager.py deleted file mode 100644 index b8fb975..0000000 --- a/castellan/tests/key_manager/test_mock_key_manager.py +++ /dev/null @@ -1,103 +0,0 @@ -# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Test cases for the mock key manager. -""" - -import array -import binascii - -from oslo_context import context - -from castellan.common import exception -from castellan.key_manager import symmetric_key as sym_key -from castellan.tests.key_manager import mock_key_manager as mock_key_mgr -from castellan.tests.key_manager import test_key_manager as test_key_mgr - - -class MockKeyManagerTestCase(test_key_mgr.KeyManagerTestCase): - - def _create_key_manager(self): - return mock_key_mgr.MockKeyManager() - - def setUp(self): - super(MockKeyManagerTestCase, self).setUp() - - self.context = context.RequestContext('fake', 'fake') - - def test_create_key(self): - key_id_1 = self.key_mgr.create_key(self.context) - key_id_2 = self.key_mgr.create_key(self.context) - # ensure that the UUIDs are unique - self.assertNotEqual(key_id_1, key_id_2) - - def test_create_key_with_length(self): - for length in [64, 128, 256]: - key_id = self.key_mgr.create_key(self.context, key_length=length) - key = self.key_mgr.get_key(self.context, key_id) - self.assertEqual(length / 8, len(key.get_encoded())) - - def test_create_null_context(self): - self.assertRaises(exception.Forbidden, - self.key_mgr.create_key, None) - - def test_store_and_get_key(self): - secret_key = array.array('B', binascii.unhexlify('0' * 64)).tolist() - _key = sym_key.SymmetricKey('AES', secret_key) - key_id = self.key_mgr.store_key(self.context, _key) - - actual_key = self.key_mgr.get_key(self.context, key_id) - self.assertEqual(_key, actual_key) - - def test_store_null_context(self): - self.assertRaises(exception.Forbidden, - self.key_mgr.store_key, None, None) - - def test_copy_key(self): - key_id = self.key_mgr.create_key(self.context) - key = self.key_mgr.get_key(self.context, key_id) - - copied_key_id = self.key_mgr.copy_key(self.context, key_id) - copied_key = self.key_mgr.get_key(self.context, copied_key_id) - - self.assertNotEqual(key_id, copied_key_id) - self.assertEqual(key, copied_key) - - def test_copy_null_context(self): - self.assertRaises(exception.Forbidden, - self.key_mgr.copy_key, None, None) - - def test_get_null_context(self): - self.assertRaises(exception.Forbidden, - self.key_mgr.get_key, None, None) - - def test_get_unknown_key(self): - self.assertRaises(KeyError, self.key_mgr.get_key, self.context, None) - - def test_delete_key(self): - key_id = self.key_mgr.create_key(self.context) - self.key_mgr.delete_key(self.context, key_id) - - self.assertRaises(KeyError, self.key_mgr.get_key, self.context, - key_id) - - def test_delete_null_context(self): - self.assertRaises(exception.Forbidden, - self.key_mgr.delete_key, None, None) - - def test_delete_unknown_key(self): - self.assertRaises(KeyError, self.key_mgr.delete_key, self.context, - None) diff --git a/castellan/tests/key_manager/test_not_implemented_key_manager.py b/castellan/tests/key_manager/test_not_implemented_key_manager.py deleted file mode 100644 index b021d19..0000000 --- a/castellan/tests/key_manager/test_not_implemented_key_manager.py +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Test cases for the not implemented key manager. -""" - -from castellan.key_manager import not_implemented_key_manager -from castellan.tests.key_manager import test_key_manager - - -class NotImplementedKeyManagerTestCase(test_key_manager.KeyManagerTestCase): - - def _create_key_manager(self): - return not_implemented_key_manager.NotImplementedKeyManager() - - def test_create_key(self): - self.assertRaises(NotImplementedError, - self.key_mgr.create_key, None) - - def test_store_key(self): - self.assertRaises(NotImplementedError, - self.key_mgr.store_key, None, None) - - def test_copy_key(self): - self.assertRaises(NotImplementedError, - self.key_mgr.copy_key, None, None) - - def test_get_key(self): - self.assertRaises(NotImplementedError, - self.key_mgr.get_key, None, None) - - def test_delete_key(self): - self.assertRaises(NotImplementedError, - self.key_mgr.delete_key, None, None) diff --git a/castellan/tests/unit/__init__.py b/castellan/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/castellan/tests/unit/key_manager/__init__.py b/castellan/tests/unit/key_manager/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/castellan/tests/unit/key_manager/fake.py b/castellan/tests/unit/key_manager/fake.py new file mode 100644 index 0000000..f825c0a --- /dev/null +++ b/castellan/tests/unit/key_manager/fake.py @@ -0,0 +1,24 @@ +# Copyright 2011 Justin Santa Barbara +# Copyright 2012 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Implementation of a fake key manager.""" + + +from castellan.tests.unit.key_manager import mock_key_manager + + +def fake_api(): + return mock_key_manager.MockKeyManager() diff --git a/castellan/tests/unit/key_manager/mock_key_manager.py b/castellan/tests/unit/key_manager/mock_key_manager.py new file mode 100644 index 0000000..8722f7d --- /dev/null +++ b/castellan/tests/unit/key_manager/mock_key_manager.py @@ -0,0 +1,157 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +A mock implementation of a key manager that stores keys in a dictionary. + +This key manager implementation is primarily intended for testing. In +particular, it does not store keys persistently. Lack of a centralized key +store also makes this implementation unsuitable for use among different +services. + +Note: Instantiating this class multiple times will create separate key stores. +Keys created in one instance will not be accessible from other instances of +this class. +""" + +import array +import binascii +import random +import uuid + +from castellan.common import exception +from castellan.key_manager import key_manager +from castellan.key_manager import symmetric_key as sym_key + + +class MockKeyManager(key_manager.KeyManager): + + """Mocking manager for integration tests. + + This mock key manager implementation supports all the methods specified + by the key manager interface. This implementation stores keys within a + dictionary, and as a result, it is not acceptable for use across different + services. Side effects (e.g., raising exceptions) for each method are + handled as specified by the key manager interface. + + This key manager is not suitable for use in production deployments. + """ + + def __init__(self): + self.keys = {} + + def _generate_hex_key(self, **kwargs): + key_length = kwargs.get('key_length', 256) + # hex digit => 4 bits + length = int(key_length / 4) + hex_encoded = self._generate_password(length=length, + symbolgroups='0123456789ABCDEF') + return hex_encoded + + def _generate_key(self, **kwargs): + _hex = self._generate_hex_key(**kwargs) + return sym_key.SymmetricKey( + 'AES', + array.array('B', binascii.unhexlify(_hex)).tolist()) + + def create_key(self, context, **kwargs): + """Creates a key. + + This implementation returns a UUID for the created key. A + Forbidden exception is raised if the specified context is None. + """ + if context is None: + raise exception.Forbidden() + + key = self._generate_key(**kwargs) + return self.store_key(context, key) + + def _generate_key_id(self): + key_id = str(uuid.uuid4()) + while key_id in self.keys: + key_id = str(uuid.uuid4()) + + return key_id + + def store_key(self, context, key, **kwargs): + """Stores (i.e., registers) a key with the key manager.""" + if context is None: + raise exception.Forbidden() + + key_id = self._generate_key_id() + self.keys[key_id] = key + + return key_id + + def copy_key(self, context, key_id, **kwargs): + if context is None: + raise exception.Forbidden() + + copied_key_id = self._generate_key_id() + self.keys[copied_key_id] = self.keys[key_id] + + return copied_key_id + + def get_key(self, context, key_id, **kwargs): + """Retrieves the key identified by the specified id. + + This implementation returns the key that is associated with the + specified UUID. A Forbidden exception is raised if the specified + context is None; a KeyError is raised if the UUID is invalid. + """ + if context is None: + raise exception.Forbidden() + + return self.keys[key_id] + + def delete_key(self, context, key_id, **kwargs): + """Deletes the key identified by the specified id. + + A Forbidden exception is raised if the context is None and a + KeyError is raised if the UUID is invalid. + """ + if context is None: + raise exception.Forbidden() + + del self.keys[key_id] + + def _generate_password(self, length, symbolgroups): + """Generate a random password from the supplied symbol groups. + + At least one symbol from each group will be included. Unpredictable + results if length is less than the number of symbol groups. + + Believed to be reasonably secure (with a reasonable password length!) + """ + # NOTE(jerdfelt): Some password policies require at least one character + # from each group of symbols, so start off with one random character + # from each symbol group + password = [random.choice(s) for s in symbolgroups] + # If length < len(symbolgroups), the leading characters will only + # be from the first length groups. Try our best to not be predictable + # by shuffling and then truncating. + random.shuffle(password) + password = password[:length] + length -= len(password) + + # then fill with random characters from all symbol groups + symbols = ''.join(symbolgroups) + password.extend([random.choice(symbols) for _i in range(length)]) + + # finally shuffle to ensure first x characters aren't from a + # predictable group + random.shuffle(password) + + return ''.join(password) diff --git a/castellan/tests/unit/key_manager/test_barbican_key_manager.py b/castellan/tests/unit/key_manager/test_barbican_key_manager.py new file mode 100644 index 0000000..0bf2187 --- /dev/null +++ b/castellan/tests/unit/key_manager/test_barbican_key_manager.py @@ -0,0 +1,218 @@ +# Copyright (c) The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Test cases for the barbican key manager. +""" + +import array + +import mock + +from castellan.common import exception +from castellan.key_manager import barbican_key_manager +from castellan.key_manager import symmetric_key as key_manager_key +from castellan.tests.unit.key_manager import test_key_manager + + +class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase): + + def _create_key_manager(self): + return barbican_key_manager.BarbicanKeyManager() + + def setUp(self): + super(BarbicanKeyManagerTestCase, self).setUp() + + # Create fake auth_token + self.ctxt = mock.Mock() + self.ctxt.auth_token = "fake_token" + + # Create mock barbican client + self._build_mock_barbican() + + # Create a key_id, secret_ref, pre_hex, and hex to use + self.key_id = "d152fa13-2b41-42ca-a934-6c21566c0f40" + self.secret_ref = ("http://host:9311/v1/secrets/" + self.key_id) + self.pre_hex = "AIDxQp2++uAbKaTVDMXFYIu8PIugJGqkK0JLqkU0rhY=" + self.hex = ("0080f1429dbefae01b29a4d50cc5c5608bbc3c8ba0246aa42b424baa4" + "534ae16") + self.key_mgr._base_url = "http://host:9311/v1/" + self.addCleanup(self._restore) + + def _restore(self): + try: + getattr(self, 'original_key') + key_manager_key.SymmetricKey = self.original_key + except AttributeError: + return None + + def _build_mock_barbican(self): + self.mock_barbican = mock.MagicMock(name='mock_barbican') + + # Set commonly used methods + self.get = self.mock_barbican.secrets.get + self.delete = self.mock_barbican.secrets.delete + self.store = self.mock_barbican.secrets.store + self.create = self.mock_barbican.secrets.create + + self.key_mgr._barbican_client = self.mock_barbican + self.key_mgr._current_context = self.ctxt + + def _build_mock_symKey(self): + self.mock_symKey = mock.Mock() + + def fake_sym_key(alg, key): + self.mock_symKey.get_encoded.return_value = key + self.mock_symKey.get_algorithm.return_value = alg + return self.mock_symKey + self.original_key = key_manager_key.SymmetricKey + key_manager_key.SymmetricKey = fake_sym_key + + def test_copy_key(self): + # Create metadata for original secret + original_secret_metadata = mock.Mock() + original_secret_metadata.algorithm = mock.sentinel.alg + original_secret_metadata.bit_length = mock.sentinel.bit + original_secret_metadata.name = mock.sentinel.name + original_secret_metadata.expiration = mock.sentinel.expiration + original_secret_metadata.mode = mock.sentinel.mode + content_types = {'default': 'fake_type'} + original_secret_metadata.content_types = content_types + original_secret_data = mock.Mock() + original_secret_metadata.payload = original_secret_data + + # Create href for copied secret + copied_secret = mock.Mock() + copied_secret.store.return_value = ( + 'http://http://host:9311/v1/secrets/uuid') + + # Set get and create return values + self.get.return_value = original_secret_metadata + self.create.return_value = copied_secret + + # Create the mock key + self._build_mock_symKey() + + # Copy the original + self.key_mgr.copy_key(self.ctxt, self.key_id) + + # Assert proper methods were called + self.get.assert_called_once_with(self.secret_ref) + self.create.assert_called_once_with( + payload=self.mock_symKey.get_encoded(), + algorithm=mock.sentinel.alg, + expiration=mock.sentinel.expiration) + copied_secret.store.assert_called_once_with() + + def test_copy_null_context(self): + self.key_mgr._barbican_client = None + self.assertRaises(exception.Forbidden, + self.key_mgr.copy_key, None, self.key_id) + + def test_create_key(self): + # Create order_ref_url and assign return value + order_ref_url = ("http://localhost:9311/v1/orders/" + "4fe939b7-72bc-49aa-bd1e-e979589858af") + key_order = mock.Mock() + self.mock_barbican.orders.create_key.return_value = key_order + key_order.submit.return_value = order_ref_url + + # Create order and assign return value + order = mock.Mock() + order.secret_ref = self.secret_ref + self.mock_barbican.orders.get.return_value = order + + # Create the key, get the UUID + returned_uuid = self.key_mgr.create_key(self.ctxt, + algorithm='AES', + length=256) + + self.mock_barbican.orders.get.assert_called_once_with(order_ref_url) + self.assertEqual(self.key_id, returned_uuid) + + def test_create_null_context(self): + self.key_mgr._barbican_client = None + self.assertRaises(exception.Forbidden, + self.key_mgr.create_key, None, 'AES', 256) + + def test_delete_null_context(self): + self.key_mgr._barbican_client = None + self.assertRaises(exception.Forbidden, + self.key_mgr.delete_key, None, self.key_id) + + def test_delete_key(self): + self.key_mgr.delete_key(self.ctxt, self.key_id) + self.delete.assert_called_once_with(self.secret_ref) + + def test_delete_unknown_key(self): + self.assertRaises(exception.KeyManagerError, + self.key_mgr.delete_key, self.ctxt, None) + + def test_get_key(self): + original_secret_metadata = mock.Mock() + original_secret_metadata.algorithm = mock.sentinel.alg + original_secret_metadata.bit_length = mock.sentinel.bit + original_secret_data = mock.Mock() + original_secret_metadata.payload = original_secret_data + + self.mock_barbican.secrets.get.return_value = original_secret_metadata + key = self.key_mgr.get_key(self.ctxt, self.key_id) + + self.get.assert_called_once_with(self.secret_ref) + self.assertEqual(key.get_encoded(), original_secret_data) + + def test_get_null_context(self): + self.key_mgr._barbican_client = None + self.assertRaises(exception.Forbidden, + self.key_mgr.get_key, None, self.key_id) + + def test_get_unknown_key(self): + self.assertRaises(exception.KeyManagerError, + self.key_mgr.get_key, self.ctxt, None) + + def test_store_key_base64(self): + # Create Key to store + secret_key = array.array('B', [0x01, 0x02, 0xA0, 0xB3]).tolist() + _key = key_manager_key.SymmetricKey('AES', secret_key) + + # Define the return values + secret = mock.Mock() + self.create.return_value = secret + secret.store.return_value = self.secret_ref + + # Store the Key + returned_uuid = self.key_mgr.store_key(self.ctxt, _key) + + self.create.assert_called_once_with(algorithm='AES', + payload=secret_key, + expiration=None) + self.assertEqual(self.key_id, returned_uuid) + + def test_store_key_plaintext(self): + # Create the plaintext key + secret_key_text = "This is a test text key." + _key = key_manager_key.SymmetricKey('AES', secret_key_text) + + # Store the Key + self.key_mgr.store_key(self.ctxt, _key) + self.create.assert_called_once_with(algorithm='AES', + payload=secret_key_text, + expiration=None) + self.assertEqual(0, self.store.call_count) + + def test_store_null_context(self): + self.key_mgr._barbican_client = None + self.assertRaises(exception.Forbidden, + self.key_mgr.store_key, None, None) diff --git a/castellan/tests/unit/key_manager/test_key.py b/castellan/tests/unit/key_manager/test_key.py new file mode 100644 index 0000000..20c1db5 --- /dev/null +++ b/castellan/tests/unit/key_manager/test_key.py @@ -0,0 +1,68 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Test cases for the key classes. +""" + +import array +import binascii + +from castellan.key_manager import symmetric_key as sym_key +from castellan.tests import base + + +class KeyTestCase(base.TestCase): + + def _create_key(self): + raise NotImplementedError() + + def setUp(self): + super(KeyTestCase, self).setUp() + + self.key = self._create_key() + + +class SymmetricKeyTestCase(KeyTestCase): + + def _create_key(self): + return sym_key.SymmetricKey(self.algorithm, self.encoded) + + def setUp(self): + self.algorithm = 'AES' + self.encoded = array.array('B', binascii.unhexlify('0' * 64)).tolist() + + super(SymmetricKeyTestCase, self).setUp() + + def test_get_algorithm(self): + self.assertEqual(self.key.get_algorithm(), self.algorithm) + + def test_get_format(self): + self.assertEqual(self.key.get_format(), 'RAW') + + def test_get_encoded(self): + self.assertEqual(self.key.get_encoded(), self.encoded) + + def test___eq__(self): + self.assertTrue(self.key == self.key) + + self.assertFalse(self.key is None) + self.assertFalse(None == self.key) + + def test___ne__(self): + self.assertFalse(self.key != self.key) + + self.assertTrue(self.key is not None) + self.assertTrue(None != self.key) diff --git a/castellan/tests/unit/key_manager/test_key_manager.py b/castellan/tests/unit/key_manager/test_key_manager.py new file mode 100644 index 0000000..5b29b1d --- /dev/null +++ b/castellan/tests/unit/key_manager/test_key_manager.py @@ -0,0 +1,31 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Test cases for the key manager. +""" + +from castellan.tests import base + + +class KeyManagerTestCase(base.TestCase): + + def _create_key_manager(self): + raise NotImplementedError() + + def setUp(self): + super(KeyManagerTestCase, self).setUp() + + self.key_mgr = self._create_key_manager() diff --git a/castellan/tests/unit/key_manager/test_mock_key_manager.py b/castellan/tests/unit/key_manager/test_mock_key_manager.py new file mode 100644 index 0000000..ef8bf5b --- /dev/null +++ b/castellan/tests/unit/key_manager/test_mock_key_manager.py @@ -0,0 +1,103 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Test cases for the mock key manager. +""" + +import array +import binascii + +from oslo_context import context + +from castellan.common import exception +from castellan.key_manager import symmetric_key as sym_key +from castellan.tests.unit.key_manager import mock_key_manager as mock_key_mgr +from castellan.tests.unit.key_manager import test_key_manager as test_key_mgr + + +class MockKeyManagerTestCase(test_key_mgr.KeyManagerTestCase): + + def _create_key_manager(self): + return mock_key_mgr.MockKeyManager() + + def setUp(self): + super(MockKeyManagerTestCase, self).setUp() + + self.context = context.RequestContext('fake', 'fake') + + def test_create_key(self): + key_id_1 = self.key_mgr.create_key(self.context) + key_id_2 = self.key_mgr.create_key(self.context) + # ensure that the UUIDs are unique + self.assertNotEqual(key_id_1, key_id_2) + + def test_create_key_with_length(self): + for length in [64, 128, 256]: + key_id = self.key_mgr.create_key(self.context, key_length=length) + key = self.key_mgr.get_key(self.context, key_id) + self.assertEqual(length / 8, len(key.get_encoded())) + + def test_create_null_context(self): + self.assertRaises(exception.Forbidden, + self.key_mgr.create_key, None) + + def test_store_and_get_key(self): + secret_key = array.array('B', binascii.unhexlify('0' * 64)).tolist() + _key = sym_key.SymmetricKey('AES', secret_key) + key_id = self.key_mgr.store_key(self.context, _key) + + actual_key = self.key_mgr.get_key(self.context, key_id) + self.assertEqual(_key, actual_key) + + def test_store_null_context(self): + self.assertRaises(exception.Forbidden, + self.key_mgr.store_key, None, None) + + def test_copy_key(self): + key_id = self.key_mgr.create_key(self.context) + key = self.key_mgr.get_key(self.context, key_id) + + copied_key_id = self.key_mgr.copy_key(self.context, key_id) + copied_key = self.key_mgr.get_key(self.context, copied_key_id) + + self.assertNotEqual(key_id, copied_key_id) + self.assertEqual(key, copied_key) + + def test_copy_null_context(self): + self.assertRaises(exception.Forbidden, + self.key_mgr.copy_key, None, None) + + def test_get_null_context(self): + self.assertRaises(exception.Forbidden, + self.key_mgr.get_key, None, None) + + def test_get_unknown_key(self): + self.assertRaises(KeyError, self.key_mgr.get_key, self.context, None) + + def test_delete_key(self): + key_id = self.key_mgr.create_key(self.context) + self.key_mgr.delete_key(self.context, key_id) + + self.assertRaises(KeyError, self.key_mgr.get_key, self.context, + key_id) + + def test_delete_null_context(self): + self.assertRaises(exception.Forbidden, + self.key_mgr.delete_key, None, None) + + def test_delete_unknown_key(self): + self.assertRaises(KeyError, self.key_mgr.delete_key, self.context, + None) diff --git a/castellan/tests/unit/key_manager/test_not_implemented_key_manager.py b/castellan/tests/unit/key_manager/test_not_implemented_key_manager.py new file mode 100644 index 0000000..f53a244 --- /dev/null +++ b/castellan/tests/unit/key_manager/test_not_implemented_key_manager.py @@ -0,0 +1,47 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Test cases for the not implemented key manager. +""" + +from castellan.key_manager import not_implemented_key_manager +from castellan.tests.unit.key_manager import test_key_manager + + +class NotImplementedKeyManagerTestCase(test_key_manager.KeyManagerTestCase): + + def _create_key_manager(self): + return not_implemented_key_manager.NotImplementedKeyManager() + + def test_create_key(self): + self.assertRaises(NotImplementedError, + self.key_mgr.create_key, None) + + def test_store_key(self): + self.assertRaises(NotImplementedError, + self.key_mgr.store_key, None, None) + + def test_copy_key(self): + self.assertRaises(NotImplementedError, + self.key_mgr.copy_key, None, None) + + def test_get_key(self): + self.assertRaises(NotImplementedError, + self.key_mgr.get_key, None, None) + + def test_delete_key(self): + self.assertRaises(NotImplementedError, + self.key_mgr.delete_key, None, None) diff --git a/tox.ini b/tox.ini index 0a5f6c9..79c975f 100644 --- a/tox.ini +++ b/tox.ini @@ -8,6 +8,7 @@ install_command = pip install -U {opts} {packages} setenv = VIRTUAL_ENV={envdir} + OS_TEST_PATH=./castellan/tests/unit deps = -r{toxinidir}/requirements.txt -r{toxinidir}/test-requirements.txt commands = python setup.py testr --slowest --testr-args='{posargs}'