From c7a3f111ceb401c872f42338bd32af0862ce956a Mon Sep 17 00:00:00 2001 From: daeyeon ko Date: Mon, 28 Jul 2025 12:48:13 +0900 Subject: [PATCH 1/2] refactor: handle enforce condition when disabling MFA Signed-off-by: daeyeon ko --- .../identity/service/user_profile_service.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/spaceone/identity/service/user_profile_service.py b/src/spaceone/identity/service/user_profile_service.py index cdd0dfe9..6ebd0f70 100644 --- a/src/spaceone/identity/service/user_profile_service.py +++ b/src/spaceone/identity/service/user_profile_service.py @@ -348,7 +348,7 @@ def confirm_mfa( user_vo = self.user_mgr.get_user(user_id, domain_id) user_mfa = user_vo.mfa.to_dict() if user_vo.mfa else {} mfa_state = user_mfa.get("state", "DISABLED") - mfa_enforce = user_mfa.get("options", {}).get("enforce") == True + mfa_enforce = user_mfa.get("options", {}).get("enforce", False) if mfa_state == "DISABLED": user_mfa = MFAManager.get_mfa_info(credentials)["user_mfa"] @@ -359,25 +359,22 @@ def confirm_mfa( raise ERROR_MFA_NOT_ENABLED(user_id=user_id) mfa_manager = MFAManager.get_manager_by_mfa_type(mfa_type) - update_require_actions = list(user_vo.required_actions) + update_require_actions = set(user_vo.required_actions) if mfa_manager.confirm_mfa(credentials, verify_code): user_mfa = mfa_manager.set_mfa_options(user_mfa, credentials) - if mfa_state == "ENABLED" or (mfa_state == "DISABLED" and mfa_enforce): - update_require_actions = [ - action - for action in update_require_actions - if action != "ENFORCE_MFA" - ] - if mfa_state == "ENABLED": + if mfa_enforce: + update_require_actions.add("ENFORCE_MFA") + user_mfa = { "state": "DISABLED", **({"mfa_type": mfa_type, "options": {"enforce": mfa_enforce}} if mfa_enforce else {}), } elif mfa_state == "DISABLED": + update_require_actions.discard("ENFORCE_MFA") user_mfa["state"] = "ENABLED" user_vo = self.user_mgr.update_user_by_vo( From 98761c0ae4b8f82c504290741497a6e5da7cea20 Mon Sep 17 00:00:00 2001 From: daeyeon ko Date: Mon, 28 Jul 2025 12:48:39 +0900 Subject: [PATCH 2/2] refactor: improve safe access to MFA fields during user creation Signed-off-by: daeyeon ko --- src/spaceone/identity/service/user_service.py | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/src/spaceone/identity/service/user_service.py b/src/spaceone/identity/service/user_service.py index d7eb9720..207e7d3a 100644 --- a/src/spaceone/identity/service/user_service.py +++ b/src/spaceone/identity/service/user_service.py @@ -75,8 +75,8 @@ def create_user(self, params: dict) -> User: language = self._get_domain_default_language(domain_id, params.get("language")) params["language"] = language params["timezone"] = params.get("timezone", "UTC") - mfa_enforce = params["enforce_mfa_state"] - mfa_enforce_type = params["enforce_mfa_type"] + mfa_enforce = params.get("enforce_mfa_state") + mfa_enforce_type = params.get("enforce_mfa_type") if mfa_enforce: if mfa_enforce_type is None: @@ -88,7 +88,7 @@ def create_user(self, params: dict) -> User: "options": {"enforce": mfa_enforce}, } params["required_actions"] = ["ENFORCE_MFA"] - else: + elif mfa_enforce is not None: if mfa_enforce_type is not None: raise ERROR_INVALID_PARAMETER( key="mfa.mfa_type", @@ -195,7 +195,7 @@ def update(self, params: UserUpdateRequest) -> Union[UserResponse, dict]: domain_id = params.domain_id update_user_vo = {} - update_require_actions = list(user_vo.required_actions) + update_require_actions = set(user_vo.required_actions) if params.reset_password: domain_name = self._get_domain_name(domain_id) @@ -216,8 +216,7 @@ def update(self, params: UserUpdateRequest) -> Union[UserResponse, dict]: temp_password = self._generate_temporary_password() update_user_vo["password"] = temp_password - if "UPDATE_PASSWORD" not in update_require_actions: - update_require_actions.append("UPDATE_PASSWORD") + update_require_actions.add("UPDATE_PASSWORD") if reset_password_type == "ACCESS_TOKEN": token = self._issue_temporary_token(user_id, domain_id) @@ -262,7 +261,7 @@ def update(self, params: UserUpdateRequest) -> Union[UserResponse, dict]: update_mfa["options"].clear() update_mfa["options"].update({"enforce": mfa_enforce}) - update_require_actions.append("ENFORCE_MFA") + update_require_actions.add("ENFORCE_MFA") else: if mfa_type is not None: @@ -271,11 +270,7 @@ def update(self, params: UserUpdateRequest) -> Union[UserResponse, dict]: reason="Type can only be set when mfa enforce is True.", ) update_mfa["options"].pop("enforce", None) - update_require_actions = [ - actions - for actions in update_require_actions - if actions != "ENFORCE_MFA" - ] + update_require_actions.discard("ENFORCE_MFA") update_user_vo["mfa"] = update_mfa @@ -283,9 +278,7 @@ def update(self, params: UserUpdateRequest) -> Union[UserResponse, dict]: exclude_unset=True, exclude={"reset_password", "mfa"} ) update_user_vo.update(general_params) - update_user_vo["required_actions"] = list( - set(update_require_actions) - ) + update_user_vo["required_actions"] = list(update_require_actions) user_vo = self.user_mgr.update_user_by_vo(update_user_vo, user_vo)