Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/spaceone/identity/error/error_mfa.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,10 @@ class ERROR_MFA_REQUIRED(ERROR_UNKNOWN):
class ERROR_MFA_NOT_ENABLED(ERROR_UNKNOWN):
_message = "MFA is not enabled. (user_id = {user_id})"


class ERROR_CANNOT_DISABLE_ENFORCED_MFA(ERROR_UNKNOWN):
_message = "If mfa enforce is enabled, it cannot be disabled. (user_id = {user_id})"
_message = "If mfa enforce is enabled, it cannot be disabled. (user_id = {user_id})"


class ERROR_MFA_NOT_ACTIVATED(ERROR_UNKNOWN):
_message = "MFA is not activated. (user_id = {user_id}), (mfa_type = {mfa_type}), (mfa_state = {mfa_state}), (access_token = {access_token})"
24 changes: 17 additions & 7 deletions src/spaceone/identity/manager/token_manager/mfa_token_manager.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import logging

from spaceone.identity.error.error_authentication import *
from spaceone.identity.error.error_user import *
from spaceone.identity.error.error_mfa import *
from spaceone.identity.manager.external_auth_manager import ExternalAuthManager
from spaceone.identity.error.error_user import *
from spaceone.identity.manager import SecretManager
from spaceone.identity.manager.domain_manager import DomainManager
from spaceone.identity.manager.user_manager import UserManager
from spaceone.identity.manager.external_auth_manager import ExternalAuthManager
from spaceone.identity.manager.mfa_manager.base import MFAManager
from spaceone.identity.manager.token_manager.base import TokenManager
from spaceone.identity.manager import SecretManager
from spaceone.identity.manager.user_manager import UserManager
from spaceone.identity.model.domain.database import Domain
from spaceone.identity.model.user.database import User

Expand Down Expand Up @@ -44,6 +44,8 @@ def authenticate(self, domain_id: str, **kwargs):
if verify_code := kwargs.get("verify_code"):
if mfa_manager.check_mfa_verify_code(credentials, verify_code):
self.is_authenticated = True
if self.user.state == "PENDING":
self.user_mgr.update_user_by_vo({"state": "ENABLED"}, self.user)
else:
raise ERROR_INVALID_CREDENTIALS()

Expand All @@ -52,14 +54,22 @@ def authenticate(self, domain_id: str, **kwargs):
mfa_email = user_mfa["options"].get("email")

mfa_manager.send_mfa_authentication_email(
self.user.user_id, domain_id, mfa_email, self.user.language, credentials
self.user.user_id,
domain_id,
mfa_email,
self.user.language,
credentials,
)
elif mfa_type == "OTP":
secret_manager: SecretManager = self.locator.get_manager(SecretManager)
user_secret_id = user_mfa["options"].get("user_secret_id")
otp_secret_key = secret_manager.get_user_otp_secret_key(user_secret_id, domain_id)
otp_secret_key = secret_manager.get_user_otp_secret_key(
user_secret_id, domain_id
)

mfa_manager.set_cache_otp_mfa_secret_key(otp_secret_key, self.user.user_id, domain_id, credentials)
mfa_manager.set_cache_otp_mfa_secret_key(
otp_secret_key, self.user.user_id, domain_id, credentials
)

def _check_user_state(self) -> None:
if self.user.state not in ["ENABLED", "PENDING"]:
Expand Down
30 changes: 23 additions & 7 deletions src/spaceone/identity/service/role_binding_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from spaceone.core.service import *
from spaceone.core.service.utils import *

from spaceone.identity.error import (
ERROR_NOT_ALLOWED_TO_DELETE_ROLE_BINDING,
ERROR_SERVICE_ACCOUNT_MANAGER_REGISTERED,
Expand Down Expand Up @@ -173,11 +172,16 @@ def update_role(
supported_role_type=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"],
)
self.check_last_workspace_owner_role_binding(
new_role_vo.role_type, rb_vo.workspace_id, rb_vo.domain_id
rb_vo.user_id,
new_role_vo.role_type,
rb_vo.workspace_id,
rb_vo.domain_id,
)
elif rb_vo.role_type == new_role_vo.role_type:
self.check_last_domain_admin_role_binding(
new_role_vo.role_type, rb_vo.domain_id
rb_vo.user_id,
new_role_vo.role_type,
rb_vo.domain_id,
)
else:
raise ERROR_NOT_ALLOWED_ROLE_TYPE(
Expand Down Expand Up @@ -250,10 +254,12 @@ def delete(self, params: RoleBindingDeleteRequest) -> None:
self.check_self_update_and_delete(request_user_id, rb_vo.user_id)

if rb_vo.role_type == "DOMAIN_ADMIN":
self.check_last_domain_admin_role_binding(None, rb_vo.domain_id)
self.check_last_domain_admin_role_binding(
rb_vo.user_id, None, rb_vo.domain_id
)
elif rb_vo.role_type == "WORKSPACE_OWNER":
self.check_last_workspace_owner_role_binding(
None, rb_vo.workspace_id, rb_vo.domain_id
rb_vo.user_id, None, rb_vo.workspace_id, rb_vo.domain_id
)

# Update user role type
Expand Down Expand Up @@ -407,7 +413,7 @@ def check_duplicate_workspace_role(
)

def check_last_domain_admin_role_binding(
self, new_role_type: Union[str, None], domain_id: str
self, user_id: str, new_role_type: Union[str, None], domain_id: str
) -> None:
user_ids = self._get_enabled_user_ids(domain_id)
rb_vos = self.role_binding_manager.filter_role_bindings(
Expand All @@ -416,11 +422,18 @@ def check_last_domain_admin_role_binding(
user_id=user_ids,
)

if not rb_vos.filter(user_id=user_id):
return None

if rb_vos.count() == 1 and new_role_type != "DOMAIN_ADMIN":
raise ERROR_LAST_DOMAIN_ADMIN_CANNOT_DELETE()

def check_last_workspace_owner_role_binding(
self, new_role_type: Union[str, None], workspace_id: str, domain_id: str
self,
user_id: str,
new_role_type: Union[str, None],
workspace_id: str,
domain_id: str,
) -> None:
user_ids = self._get_enabled_user_ids(domain_id)
rb_vos = self.role_binding_manager.filter_role_bindings(
Expand All @@ -430,6 +443,9 @@ def check_last_workspace_owner_role_binding(
role_type="WORKSPACE_OWNER",
)

if not rb_vos.filter(user_id=user_id):
return None

if rb_vos.count() == 1 and new_role_type != "WORKSPACE_OWNER":
raise ERROR_LAST_WORKSPACE_OWNER_CANNOT_DELETE()

Expand Down
16 changes: 13 additions & 3 deletions src/spaceone/identity/service/token_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,17 @@ def issue(self, params: TokenIssueRequest) -> Union[TokenResponse, dict]:
raise e

user_vo = token_mgr.user
user_id = user_vo.user_id
user_mfa = user_vo.mfa.to_dict() if user_vo.mfa else {}
mfa_type = user_mfa.get("mfa_type")
mfa_state = user_mfa.get("state", "DISABLED")

permissions = self._get_permissions_from_required_actions(user_vo)

mfa_user_id = user_vo.user_id

if self._check_login_protocol_with_user_auth_type(auth_type, domain_id):
if user_mfa.get("state", "DISABLED") == "ENABLED" and auth_type != "MFA":
if mfa_state == "ENABLED" and auth_type != "MFA":
mfa_manager = MFAManager.get_manager_by_mfa_type(mfa_type)
if mfa_type == "EMAIL":
mfa_email = user_mfa["options"].get("email")
Expand Down Expand Up @@ -133,6 +136,14 @@ def issue(self, params: TokenIssueRequest) -> Union[TokenResponse, dict]:
permissions=permissions,
)

if "ENFORCE_MFA" in user_vo.required_actions:
raise ERROR_MFA_NOT_ACTIVATED(
user_id=user_id,
mfa_type=mfa_type,
mfa_state=mfa_state,
access_token=token_info["access_token"],
)

self._clear_issue_attempts(domain_id, credentials, auth_type)

return TokenResponse(**token_info)
Expand Down Expand Up @@ -305,7 +316,7 @@ def _get_permissions_from_required_actions(user_vo: User) -> Union[List[str], No
return None

permissions = {"identity:UserProfile"}

if "ENFORCE_MFA" in actions:
permissions.add("secret:UserSecret.write")

Expand Down Expand Up @@ -465,7 +476,6 @@ def _load_conf(self):
def _increment_issue_attempts(
self, domain_id: str, credentials: dict, auth_type: str
) -> None:

if cache.is_set():
copied_credentials = self._get_credentials_without_password(
credentials, auth_type
Expand Down
33 changes: 17 additions & 16 deletions src/spaceone/identity/service/user_profile_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def reset_password(self, params: UserProfileResetPasswordRequest) -> None:
auth_type = user_vo.auth_type
email = user_vo.email
language = user_vo.language
required_actions = set(user_vo.required_actions)

self._check_reset_password_eligibility(user_id, auth_type, email)

Expand All @@ -225,10 +226,11 @@ def reset_password(self, params: UserProfileResetPasswordRequest) -> None:
)

elif reset_password_type == "PASSWORD":
required_actions.add("UPDATE_PASSWORD")
temp_password = self._generate_temporary_password()
self.user_mgr.update_user_by_vo({"password": temp_password}, user_vo)
self.user_mgr.update_user_by_vo(
{"required_actions": ["UPDATE_PASSWORD"]}, user_vo
{"required_actions": list(required_actions)}, user_vo
)
console_link = self._get_console_url(domain_id)
email_manager.send_temporary_password_email(
Expand Down Expand Up @@ -261,16 +263,15 @@ def enable_mfa(
user_vo = self.user_mgr.get_user(user_id, domain_id)
user_mfa = user_vo.mfa.to_dict() if user_vo.mfa else {}
user_mfa_type = user_mfa.get("mfa_type", None)
user_vo_mfa_enforce = user_mfa.get("options", {}).get("enforce", False)
user_mfa_enforce = user_mfa.get("options", {}).get("enforce", False)

if (
user_vo_mfa_enforce
and user_mfa_type is not None
and mfa_type != user_mfa_type
):
if user_vo.auth_type == "EXTERNAL":
raise ERROR_NOT_ALLOWED_ACTIONS(action="MFA")

if user_mfa_enforce and user_mfa_type is not None and mfa_type != user_mfa_type:
raise ERROR_INVALID_PARAMETER(
key="mfa.mfa_type",
reason="Can only request with the enforced MFA type.",
key="mfa_type",
reason="Only requests using the MFA type enforced by admin are allowed.",
)

self._check_mfa_options(options, mfa_type)
Expand Down Expand Up @@ -367,18 +368,18 @@ def confirm_mfa(
if mfa_state == "ENABLED":
if mfa_enforce:
update_require_actions.add("ENFORCE_MFA")

user_mfa = {
"state": "DISABLED",
**({"mfa_type": mfa_type, "options": {"enforce": mfa_enforce}} if mfa_enforce else {}),
}
user_mfa["options"] = {"enforce": mfa_enforce}
user_mfa["mfa_type"] = mfa_type
user_mfa["state"] = "DISABLED"

elif mfa_state == "DISABLED":
update_require_actions.discard("ENFORCE_MFA")
user_mfa["state"] = "ENABLED"
if not mfa_enforce:
user_mfa.pop("mfa_type", None)

user_vo = self.user_mgr.update_user_by_vo(
{"mfa": user_mfa, "required_actions": update_require_actions},
{"mfa": user_mfa, "required_actions": list(update_require_actions)},
user_vo,
)

Expand Down Expand Up @@ -633,7 +634,7 @@ def _get_role_bindings_info(

@staticmethod
def _extract_user_ids(
workspace_group_infos: Union[QuerySet, List[Dict[str, str]]]
workspace_group_infos: Union[QuerySet, List[Dict[str, str]]],
) -> List[str]:
workspace_group_user_ids = []
for workspace_group_info in workspace_group_infos:
Expand Down
Loading
Loading