diff --git a/castellan/tests/unit/key_manager/mock_key_manager.py b/castellan/tests/unit/key_manager/mock_key_manager.py index c9789bd..00eb919 100644 --- a/castellan/tests/unit/key_manager/mock_key_manager.py +++ b/castellan/tests/unit/key_manager/mock_key_manager.py @@ -30,7 +30,13 @@ import random import uuid +from cryptography.hazmat import backends +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives import serialization + from castellan.common import exception +from castellan.common.objects import private_key as pri_key +from castellan.common.objects import public_key as pub_key from castellan.common.objects import symmetric_key as sym_key from castellan.key_manager import key_manager @@ -68,8 +74,9 @@ def create_key(self, context, **kwargs): """Creates a symmetric key. - This implementation returns a UUID for the created key. A - Forbidden exception is raised if the specified context is None. + This implementation returns a UUID for the created key. The algorithm + for the key will always be AES. A Forbidden exception is raised if the + specified context is None. """ if context is None: raise exception.Forbidden() @@ -77,8 +84,62 @@ key = self._generate_key(**kwargs) return self.store(context, key) + def _generate_public_and_private_key(self, length): + crypto_private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=length, + backend=backends.default_backend()) + + private_der = crypto_private_key.private_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption()) + + crypto_public_key = crypto_private_key.public_key() + + public_der = crypto_public_key.public_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PublicFormat.SubjectPublicKeyInfo) + + private_key = pri_key.PrivateKey( + algorithm='RSA', + bit_length=length, + key=bytearray(private_der)) + + public_key = pub_key.PublicKey( + algorithm='RSA', + bit_length=length, + key=bytearray(public_der)) + + return private_key, public_key + def create_key_pair(self, context, algorithm, length, expiration=None): - raise NotImplementedError() + """Creates an asymmetric key pair. + + This implementation returns UUIDs for the created keys in the order: + (private, public) + Forbidden is raised if the context is None. + """ + if context is None: + raise exception.Forbidden() + + if algorithm.lower() != 'rsa': + msg = 'Invalid algorithm: {}, only RSA supported'.format(algorithm) + raise ValueError(msg) + + valid_lengths = [2048, 3072, 4096] + + if length not in valid_lengths: + msg = 'Invalid bit length: {}, only {} supported'.format( + length, valid_lengths) + raise ValueError(msg) + + private_key, public_key = self._generate_public_and_private_key(length) + + private_key_uuid = self.store(context, private_key) + public_key_uuid = self.store(context, public_key) + + return private_key_uuid, public_key_uuid def _generate_key_id(self): key_id = str(uuid.uuid4()) diff --git a/castellan/tests/unit/key_manager/test_mock_key_manager.py b/castellan/tests/unit/key_manager/test_mock_key_manager.py index 6e6e609..ea91012 100644 --- a/castellan/tests/unit/key_manager/test_mock_key_manager.py +++ b/castellan/tests/unit/key_manager/test_mock_key_manager.py @@ -17,12 +17,31 @@ Test cases for the mock key manager. """ +from cryptography.hazmat import backends +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import serialization from oslo_context import context from castellan.common import exception from castellan.common.objects import symmetric_key as sym_key from castellan.tests.unit.key_manager import mock_key_manager as mock_key_mgr from castellan.tests.unit.key_manager import test_key_manager as test_key_mgr + + +def get_cryptography_private_key(private_key): + crypto_private_key = serialization.load_der_private_key( + bytes(private_key.get_encoded()), + password=None, + backend=backends.default_backend()) + return crypto_private_key + + +def get_cryptography_public_key(public_key): + crypto_public_key = serialization.load_der_public_key( + bytes(public_key.get_encoded()), + backend=backends.default_backend()) + return crypto_public_key class MockKeyManagerTestCase(test_key_mgr.KeyManagerTestCase): @@ -47,9 +66,63 @@ key = self.key_mgr.get(self.context, key_id) self.assertEqual(length / 8, len(key.get_encoded())) - def test_create_null_context(self): + def test_create_key_null_context(self): self.assertRaises(exception.Forbidden, self.key_mgr.create_key, None) + + def test_create_key_pair(self): + for length in [2048, 3072, 4096]: + private_key_uuid, public_key_uuid = self.key_mgr.create_key_pair( + self.context, 'RSA', length) + + private_key = self.key_mgr.get(self.context, private_key_uuid) + public_key = self.key_mgr.get(self.context, public_key_uuid) + + crypto_private_key = get_cryptography_private_key(private_key) + crypto_public_key = get_cryptography_public_key(public_key) + + self.assertEqual(length, crypto_private_key.key_size) + self.assertEqual(length, crypto_public_key.key_size) + + def test_create_key_pair_encryption(self): + private_key_uuid, public_key_uuid = self.key_mgr.create_key_pair( + self.context, 'RSA', 2048) + + private_key = self.key_mgr.get(self.context, private_key_uuid) + public_key = self.key_mgr.get(self.context, public_key_uuid) + + crypto_private_key = get_cryptography_private_key(private_key) + crypto_public_key = get_cryptography_public_key(public_key) + + message = b'secret plaintext' + ciphertext = crypto_public_key.encrypt( + message, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA1()), + algorithm=hashes.SHA1(), + label=None)) + plaintext = crypto_private_key.decrypt( + ciphertext, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA1()), + algorithm=hashes.SHA1(), + label=None)) + + self.assertEqual(message, plaintext) + + def test_create_key_pair_null_context(self): + self.assertRaises(exception.Forbidden, + self.key_mgr.create_key_pair, None, 'RSA', 2048) + + def test_create_key_pair_invalid_algorithm(self): + self.assertRaises(ValueError, + self.key_mgr.create_key_pair, + self.context, 'DSA', 2048) + + def test_create_key_pair_invalid_length(self): + self.assertRaises(ValueError, + self.key_mgr.create_key_pair, + self.context, 'RSA', 10) def test_store_and_get_key(self): secret_key = bytes(b'0' * 64) diff --git a/requirements.txt b/requirements.txt index 3e77b7a..b60045e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ pbr>=0.6,!=0.7,<1.0 Babel>=1.3 +cryptography>=0.9.1 # Apache-2.0 oslo.config>=1.9.3,<1.10.0 # Apache-2.0 oslo.context>=0.2.0 # Apache-2.0 oslo.log>=1.0.0,<1.1.0 # Apache-2.0