diff --git a/keystone/api/credentials.py b/keystone/api/credentials.py index 90b53dd686..b960e2deaa 100644 --- a/keystone/api/credentials.py +++ b/keystone/api/credentials.py @@ -32,6 +32,16 @@ ENFORCER = rbac_enforcer.RBACEnforcer +def _check_unrestricted_application_credential(token): + if 'application_credential' in token.methods: + if not token.application_credential['unrestricted']: + action = _( + "Using method 'application_credential' is not " + "allowed for managing additional credentials." + ) + raise exception.ForbiddenAction(action=action) + + def _build_target_enforcement(): target = {} try: @@ -156,11 +166,12 @@ def post(self): action='identity:create_credential', target_attr=target ) validation.lazy_validate(schema.credential_create, credential) + token = self.auth_context['token'] + if credential.get('type', '').lower() == 'ec2': + _check_unrestricted_application_credential(token) trust_id = getattr(self.oslo_context, 'trust_id', None) - app_cred_id = getattr( - self.auth_context['token'], 'application_credential_id', None) - access_token_id = getattr( - self.auth_context['token'], 'access_token_id', None) + app_cred_id = getattr(token, 'application_credential_id', None) + access_token_id = getattr(token, 'access_token_id', None) ref = self._assign_unique_id( self._normalize_dict(credential), trust_id=trust_id, app_cred_id=app_cred_id, diff --git a/keystone/tests/unit/test_v3_credential.py b/keystone/tests/unit/test_v3_credential.py index b0d0e7fcc6..17f08afe4d 100644 --- a/keystone/tests/unit/test_v3_credential.py +++ b/keystone/tests/unit/test_v3_credential.py @@ -688,9 +688,11 @@ def test_app_cred_ec2_credential(self): Call ``POST /credentials``. """ - # Create the app cred + # Create an unrestricted app cred (restricted app creds are + # blocked from creating EC2 credentials) ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}]) del ref['id'] + ref['unrestricted'] = True r = self.post('/users/%s/application_credentials' % self.user_id, body={'application_credential': ref}) app_cred = r.result['application_credential'] @@ -743,6 +745,41 @@ def test_app_cred_ec2_credential(self): token=token_id, expected_status=http.client.CONFLICT) + def _get_app_cred_token(self, unrestricted=False): + """Create an application credential and return its token.""" + ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}]) + del ref['id'] + if unrestricted: + ref['unrestricted'] = True + r = self.post( + f'/users/{self.user_id}/application_credentials', + body={'application_credential': ref}, + ) + app_cred = r.result['application_credential'] + auth_data = self.build_authentication_request( + app_cred_id=app_cred['id'], secret=app_cred['secret'] + ) + r = self.v3_create_token(auth_data) + return r.headers.get('X-Subject-Token') + + def test_restricted_app_cred_cannot_create_ec2_credential(self): + """Test that a restricted app cred cannot create EC2 credentials. + + A restricted application credential must not be allowed to + create EC2 credentials via POST /credentials either, as this + would bypass the guard on the OS-EC2 endpoint. + """ + token_id = self._get_app_cred_token(unrestricted=False) + blob, ref = unit.new_ec2_credential( + user_id=self.user_id, project_id=self.project_id + ) + self.post( + '/credentials', + body={'credential': ref}, + token=token_id, + expected_status=http.client.FORBIDDEN, + ) + class TestCredentialAccessToken(CredentialBaseTestCase): """Test credential with access token.""" @@ -994,3 +1031,50 @@ def test_ec2_delete_credential(self): self.assertRaises(exception.CredentialNotFound, PROVIDERS.credential_api.get_credential, cred_from_credential_api[0]['id']) + + def _get_app_cred_token(self, unrestricted=False): + """Create an application credential and return a token for it.""" + ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}]) + del ref['id'] + if unrestricted: + ref['unrestricted'] = True + r = self.post( + f'/users/{self.user_id}/application_credentials', + body={'application_credential': ref}, + ) + app_cred = r.result['application_credential'] + auth_data = self.build_authentication_request( + app_cred_id=app_cred['id'], secret=app_cred['secret'] + ) + r = self.v3_create_token(auth_data) + return r.headers.get('X-Subject-Token') + + def test_ec2_create_credential_with_restricted_app_cred(self): + """Test that a restricted app cred cannot create EC2 credentials. + + A restricted application credential must not be allowed to create + EC2 credentials, as this would bypass the role restriction and + grant full user access to S3. + """ + token_id = self._get_app_cred_token(unrestricted=False) + uri = self._get_ec2_cred_uri() + self.post( + uri, + body={'tenant_id': self.project_id}, + token=token_id, + expected_status=http.client.FORBIDDEN, + ) + + def test_ec2_create_credential_with_unrestricted_app_cred(self): + """Test that an unrestricted app cred can create EC2 credentials.""" + token_id = self._get_app_cred_token(unrestricted=True) + uri = self._get_ec2_cred_uri() + r = self.post( + uri, + body={'tenant_id': self.project_id}, + token=token_id, + expected_status=http.client.CREATED, + ) + ec2_cred = r.result['credential'] + self.assertEqual(self.user_id, ec2_cred['user_id']) + self.assertEqual(self.project_id, ec2_cred['tenant_id'])