Add keystone oidc tests
This adds tests to test getting a token (scoped and unscoped) when
keystone is configured to use oidc for authentication. The oidc
provider is keycloak. This is based in very large part on Kristi's
work in [1] and [2].
[1] https://github.com/knikolla/devstack-plugin-oidc
[2] https://github.com/CCI-MOC/onboarding-tools
Co-Authored-By: David Wilde <dwilde@redhat.com>
Change-Id: I1772b65f1cc3830ac293a800a79d044a6ab69d65
Ade Lee
1 year, 2 months ago
46 | 46 | help='Password used to login in the Identity Provider'), |
47 | 47 | cfg.StrOpt('idp_ecp_url', |
48 | 48 | help='Identity Provider SAML2/ECP URL'), |
49 | cfg.StrOpt('idp_oidc_url', | |
50 | help='Identity Provider OIDC URL'), | |
51 | ||
52 | # client id (oidc) | |
53 | cfg.StrOpt('idp_client_id', | |
54 | help='Identity Provider Client ID'), | |
55 | cfg.StrOpt('idp_client_secret', | |
56 | help='Identity Provider Client Secret'), | |
49 | 57 | |
50 | 58 | # Mapping rules |
51 | 59 | cfg.StrOpt('mapping_remote_type', |
71 | 79 | # Protocol |
72 | 80 | cfg.StrOpt('protocol_id', |
73 | 81 | default='mapped', |
74 | help='The Protocol ID') | |
82 | help='The Protocol ID'), | |
83 | ||
75 | 84 | ] |
0 | # Licensed under the Apache License, Version 2.0 (the "License"); | |
1 | # you may not use this file except in compliance with the License. | |
2 | # You may obtain a copy of the License at | |
3 | # | |
4 | # http://www.apache.org/licenses/LICENSE-2.0 | |
5 | # | |
6 | # Unless required by applicable law or agreed to in writing, software | |
7 | # distributed under the License is distributed on an "AS IS" BASIS, | |
8 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
9 | # See the License for the specific language governing permissions and | |
10 | # limitations under the License. | |
11 | ||
12 | import requests | |
13 | ||
14 | ||
15 | class KeycloakClient(object): | |
16 | def __init__(self, keycloak_url, keycloak_username, keycloak_password, | |
17 | realm='master', ca_certs_file=False): | |
18 | self.keycloak_url = keycloak_url | |
19 | self.keycloak_username = keycloak_username | |
20 | self.keycloak_password = keycloak_password | |
21 | self.session = requests.session() | |
22 | self.realm = realm | |
23 | self.ca_certs_file = ca_certs_file | |
24 | self._admin_auth() | |
25 | ||
26 | @property | |
27 | def url_base(self): | |
28 | return self.keycloak_url + f'/admin/realms' | |
29 | ||
30 | @property | |
31 | def token_endpoint(self): | |
32 | return self.keycloak_url + \ | |
33 | f'/realms/{self.realm}/protocol/openid-connect/token' | |
34 | ||
35 | @property | |
36 | def discovery_endpoint(self): | |
37 | return self.keycloak_url + \ | |
38 | f'/realms/{self.realm}/.well-known/openid-configuration' | |
39 | ||
40 | def _construct_url(self, path): | |
41 | return self.url_base + f'/{self.realm}/{path}' | |
42 | ||
43 | def _admin_auth(self): | |
44 | params = { | |
45 | 'grant_type': 'password', | |
46 | 'client_id': 'admin-cli', | |
47 | 'username': self.keycloak_username, | |
48 | 'password': self.keycloak_password, | |
49 | 'scope': 'openid', | |
50 | } | |
51 | r = requests.post( | |
52 | self.token_endpoint, | |
53 | data=params, | |
54 | verify=self.ca_certs_file).json() | |
55 | ||
56 | headers = { | |
57 | 'Authorization': ("Bearer %s" % r['access_token']), | |
58 | 'Content-Type': 'application/json' | |
59 | } | |
60 | self.session.headers.update(headers) | |
61 | return r | |
62 | ||
63 | def create_user(self, email, first_name, last_name): | |
64 | self._admin_auth() | |
65 | data = { | |
66 | 'username': email, | |
67 | 'email': email, | |
68 | 'firstName': first_name, | |
69 | 'lastName': last_name, | |
70 | 'enabled': True, | |
71 | 'emailVerified': True, | |
72 | 'credentials': [{ | |
73 | 'value': 'secret', | |
74 | 'type': 'password', | |
75 | }], | |
76 | 'requiredActions': [] | |
77 | } | |
78 | return self.session.post( | |
79 | self._construct_url('users'), | |
80 | json=data, verify=self.ca_certs_file) | |
81 | ||
82 | def delete_user(self, username): | |
83 | self._admin_auth() | |
84 | data = { | |
85 | 'id': username, | |
86 | } | |
87 | return self.session.delete( | |
88 | self._construct_url('users'), | |
89 | json=data, verify=self.ca_certs_file) |
205 | 205 | "Federated Identity feature not enabled") |
206 | 206 | @testtools.skipUnless(CONF.identity_feature_enabled.external_idp, |
207 | 207 | "External identity provider is not available") |
208 | @testtools.skipUnless(CONF.fed_scenario.protocol_id == 'mapped', | |
209 | "Protocol not mapped") | |
208 | 210 | def test_request_unscoped_token(self): |
209 | 211 | self._test_request_unscoped_token() |
210 | 212 | |
212 | 214 | "Federated Identity feature not enabled") |
213 | 215 | @testtools.skipUnless(CONF.identity_feature_enabled.external_idp, |
214 | 216 | "External identity provider is not available") |
217 | @testtools.skipUnless(CONF.fed_scenario.protocol_id == 'mapped', | |
218 | "Protocol not mapped") | |
215 | 219 | def test_request_scoped_token(self): |
216 | 220 | self._test_request_scoped_token() |
217 | 221 | |
327 | 331 | |
328 | 332 | @testtools.skipUnless(CONF.identity_feature_enabled.federation, |
329 | 333 | "Federated Identity feature not enabled") |
334 | @testtools.skipUnless(CONF.fed_scenario.protocol_id == 'mapped', | |
335 | "Protocol not mapped") | |
330 | 336 | def test_request_unscoped_token(self): |
331 | 337 | self._test_request_unscoped_token() |
332 | 338 | |
333 | 339 | @testtools.skipUnless(CONF.identity_feature_enabled.federation, |
334 | 340 | "Federated Identity feature not enabled") |
341 | @testtools.skipUnless(CONF.fed_scenario.protocol_id == 'mapped', | |
342 | "Protocol not mapped") | |
335 | 343 | def test_request_scoped_token(self): |
336 | 344 | self._test_request_scoped_token() |
0 | # Copyright 2022 Red Hat, Inc. | |
1 | # | |
2 | # Licensed under the Apache License, Version 2.0 (the "License"); you may | |
3 | # not use this file except in compliance with the License. You may obtain | |
4 | # a copy of the License at | |
5 | # | |
6 | # http://www.apache.org/licenses/LICENSE-2.0 | |
7 | # | |
8 | # Unless required by applicable law or agreed to in writing, software | |
9 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
10 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
11 | # License for the specific language governing permissions and limitations | |
12 | # under the License. | |
13 | ||
14 | import uuid | |
15 | ||
16 | from keystoneauth1 import identity | |
17 | from keystoneauth1 import session as ks_session | |
18 | from tempest import config | |
19 | from tempest.lib.common.utils import data_utils | |
20 | import testtools | |
21 | ||
22 | from .keycloak import KeycloakClient | |
23 | from keystone_tempest_plugin.tests import base | |
24 | ||
25 | CONF = config.CONF | |
26 | ||
27 | ||
28 | class TestOidcFederatedAuthentication(base.BaseIdentityTest): | |
29 | ||
30 | def _setup_settings(self): | |
31 | # Keycloak Settings | |
32 | self.idp_id = CONF.fed_scenario.idp_id | |
33 | self.idp_remote_ids = CONF.fed_scenario.idp_remote_ids | |
34 | self.idp_url = CONF.fed_scenario.idp_oidc_url | |
35 | self.idp_client_id = CONF.fed_scenario.idp_client_id | |
36 | self.idp_client_secret = CONF.fed_scenario.idp_client_secret | |
37 | self.idp_password = CONF.fed_scenario.idp_password | |
38 | self.idp_username = CONF.fed_scenario.idp_username | |
39 | ||
40 | self.protocol_id = CONF.fed_scenario.protocol_id | |
41 | self.keystone_v3_endpoint = CONF.identity.uri_v3 | |
42 | ||
43 | # mapping settings | |
44 | self.mapping_remote_type = CONF.fed_scenario.mapping_remote_type | |
45 | self.mapping_user_name = CONF.fed_scenario.mapping_user_name | |
46 | self.mapping_group_name = CONF.fed_scenario.mapping_group_name | |
47 | self.mapping_group_domain_name = \ | |
48 | CONF.fed_scenario.mapping_group_domain_name | |
49 | ||
50 | # custom CA certificate settings | |
51 | self.ca_certificates_file = CONF.identity.ca_certificates_file | |
52 | ||
53 | def _setup_mapping(self): | |
54 | self.mapping_id = data_utils.rand_uuid_hex() | |
55 | rules = [{ | |
56 | 'local': [ | |
57 | { | |
58 | 'user': {'name': self.mapping_user_name} | |
59 | }, | |
60 | { | |
61 | 'group': { | |
62 | 'domain': {'name': self.mapping_group_domain_name}, | |
63 | 'name': self.mapping_group_name | |
64 | } | |
65 | } | |
66 | ], | |
67 | 'remote': [ | |
68 | { | |
69 | 'type': self.mapping_remote_type | |
70 | } | |
71 | ] | |
72 | }] | |
73 | mapping_ref = {'rules': rules} | |
74 | self.mappings_client.create_mapping_rule(self.mapping_id, mapping_ref) | |
75 | self.addCleanup( | |
76 | self.mappings_client.delete_mapping_rule, self.mapping_id) | |
77 | ||
78 | def _setup_protocol(self): | |
79 | self.idps_client.add_protocol_and_mapping( | |
80 | self.idp_id, self.protocol_id, self.mapping_id) | |
81 | self.addCleanup( | |
82 | self.idps_client.delete_protocol_and_mapping, | |
83 | self.idp_id, | |
84 | self.protocol_id) | |
85 | ||
86 | def setUp(self): | |
87 | super(TestOidcFederatedAuthentication, self).setUp() | |
88 | self._setup_settings() | |
89 | ||
90 | # Setup mapping and protocol | |
91 | self._setup_mapping() | |
92 | self._setup_protocol() | |
93 | self.keycloak = KeycloakClient( | |
94 | keycloak_url=self.idp_url, | |
95 | keycloak_username=self.idp_username, | |
96 | keycloak_password=self.idp_password, | |
97 | ca_certs_file=self.ca_certificates_file, | |
98 | ) | |
99 | ||
100 | def _setup_user(self, email=None): | |
101 | email = email if email else f'test-{uuid.uuid4().hex}@example.com' | |
102 | self.keycloak.create_user(email, 'Test', 'User') | |
103 | return email | |
104 | ||
105 | def _request_unscoped_token(self, user): | |
106 | auth = identity.v3.OidcPassword( | |
107 | auth_url=self.keystone_v3_endpoint, | |
108 | identity_provider=self.idp_id, | |
109 | protocol=self.protocol_id, | |
110 | client_id=self.idp_client_id, | |
111 | client_secret=self.idp_client_secret, | |
112 | access_token_endpoint=self.keycloak.token_endpoint, | |
113 | discovery_endpoint=self.keycloak.discovery_endpoint, | |
114 | username=user, | |
115 | password='secret' | |
116 | ) | |
117 | s = ks_session.Session(auth, verify=self.ca_certificates_file) | |
118 | return s.get_auth_headers() | |
119 | ||
120 | @testtools.skipUnless(CONF.identity_feature_enabled.federation, | |
121 | "Federated Identity feature not enabled") | |
122 | @testtools.skipUnless(CONF.identity_feature_enabled.external_idp, | |
123 | "External identity provider is not available") | |
124 | @testtools.skipUnless(CONF.fed_scenario.protocol_id == 'openid', | |
125 | "Protocol not openid") | |
126 | def test_request_unscoped_token(self): | |
127 | user = self._setup_user() | |
128 | token = self._request_unscoped_token(user) | |
129 | self.assertNotEmpty(token) | |
130 | self.keycloak.delete_user(user) | |
131 | ||
132 | @testtools.skipUnless(CONF.identity_feature_enabled.federation, | |
133 | "Federated Identity feature not enabled") | |
134 | @testtools.skipUnless(CONF.identity_feature_enabled.external_idp, | |
135 | "External identity provider is not available") | |
136 | @testtools.skipUnless(CONF.fed_scenario.protocol_id == 'openid', | |
137 | "Protocol not openid") | |
138 | def test_request_scoped_token(self): | |
139 | user = self._setup_user() | |
140 | token = self._request_unscoped_token(user) | |
141 | token_id = token['X-Auth-Token'] | |
142 | ||
143 | projects = self.auth_client.get_available_projects_scopes( | |
144 | self.keystone_v3_endpoint, token_id)['projects'] | |
145 | self.assertNotEmpty(projects) | |
146 | ||
147 | # Get a scoped token to one of the listed projects | |
148 | self.tokens_client.auth( | |
149 | project_id=projects[0]['id'], token=token_id) | |
150 | self.keycloak.delete_user(user) |