Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions keystone/api/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@
ENFORCER = rbac_enforcer.RBACEnforcer


def _check_unrestricted_application_credential(token):
if 'application_credential' in token.methods:
if not token.application_credential['unrestricted']:
action = _(
"Using method 'application_credential' is not "
"allowed for managing additional credentials."
)
raise exception.ForbiddenAction(action=action)


def _build_target_enforcement():
target = {}
try:
Expand Down Expand Up @@ -156,11 +166,12 @@ def post(self):
action='identity:create_credential', target_attr=target
)
validation.lazy_validate(schema.credential_create, credential)
token = self.auth_context['token']
if credential.get('type', '').lower() == 'ec2':
_check_unrestricted_application_credential(token)
trust_id = getattr(self.oslo_context, 'trust_id', None)
app_cred_id = getattr(
self.auth_context['token'], 'application_credential_id', None)
access_token_id = getattr(
self.auth_context['token'], 'access_token_id', None)
app_cred_id = getattr(token, 'application_credential_id', None)
access_token_id = getattr(token, 'access_token_id', None)
ref = self._assign_unique_id(
self._normalize_dict(credential),
trust_id=trust_id, app_cred_id=app_cred_id,
Expand Down
86 changes: 85 additions & 1 deletion keystone/tests/unit/test_v3_credential.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,9 +688,11 @@ def test_app_cred_ec2_credential(self):

Call ``POST /credentials``.
"""
# Create the app cred
# Create an unrestricted app cred (restricted app creds are
# blocked from creating EC2 credentials)
ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}])
del ref['id']
ref['unrestricted'] = True
r = self.post('/users/%s/application_credentials' % self.user_id,
body={'application_credential': ref})
app_cred = r.result['application_credential']
Expand Down Expand Up @@ -743,6 +745,41 @@ def test_app_cred_ec2_credential(self):
token=token_id,
expected_status=http.client.CONFLICT)

def _get_app_cred_token(self, unrestricted=False):
"""Create an application credential and return its token."""
ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}])
del ref['id']
if unrestricted:
ref['unrestricted'] = True
r = self.post(
f'/users/{self.user_id}/application_credentials',
body={'application_credential': ref},
)
app_cred = r.result['application_credential']
auth_data = self.build_authentication_request(
app_cred_id=app_cred['id'], secret=app_cred['secret']
)
r = self.v3_create_token(auth_data)
return r.headers.get('X-Subject-Token')

def test_restricted_app_cred_cannot_create_ec2_credential(self):
"""Test that a restricted app cred cannot create EC2 credentials.

A restricted application credential must not be allowed to
create EC2 credentials via POST /credentials either, as this
would bypass the guard on the OS-EC2 endpoint.
"""
token_id = self._get_app_cred_token(unrestricted=False)
blob, ref = unit.new_ec2_credential(
user_id=self.user_id, project_id=self.project_id
)
self.post(
'/credentials',
body={'credential': ref},
token=token_id,
expected_status=http.client.FORBIDDEN,
)


class TestCredentialAccessToken(CredentialBaseTestCase):
"""Test credential with access token."""
Expand Down Expand Up @@ -994,3 +1031,50 @@ def test_ec2_delete_credential(self):
self.assertRaises(exception.CredentialNotFound,
PROVIDERS.credential_api.get_credential,
cred_from_credential_api[0]['id'])

def _get_app_cred_token(self, unrestricted=False):
"""Create an application credential and return a token for it."""
ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}])
del ref['id']
if unrestricted:
ref['unrestricted'] = True
r = self.post(
f'/users/{self.user_id}/application_credentials',
body={'application_credential': ref},
)
app_cred = r.result['application_credential']
auth_data = self.build_authentication_request(
app_cred_id=app_cred['id'], secret=app_cred['secret']
)
r = self.v3_create_token(auth_data)
return r.headers.get('X-Subject-Token')

def test_ec2_create_credential_with_restricted_app_cred(self):
"""Test that a restricted app cred cannot create EC2 credentials.

A restricted application credential must not be allowed to create
EC2 credentials, as this would bypass the role restriction and
grant full user access to S3.
"""
token_id = self._get_app_cred_token(unrestricted=False)
uri = self._get_ec2_cred_uri()
self.post(
uri,
body={'tenant_id': self.project_id},
token=token_id,
expected_status=http.client.FORBIDDEN,
)

def test_ec2_create_credential_with_unrestricted_app_cred(self):
"""Test that an unrestricted app cred can create EC2 credentials."""
token_id = self._get_app_cred_token(unrestricted=True)
uri = self._get_ec2_cred_uri()
r = self.post(
uri,
body={'tenant_id': self.project_id},
token=token_id,
expected_status=http.client.CREATED,
)
ec2_cred = r.result['credential']
self.assertEqual(self.user_id, ec2_cred['user_id'])
self.assertEqual(self.project_id, ec2_cred['tenant_id'])