Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 0 additions & 38 deletions src/spaceone/identity/manager/role_binding_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from spaceone.core.manager import BaseManager
from spaceone.identity.manager.user_group_manager import UserGroupManager
from spaceone.identity.manager.workspace_manager import WorkspaceManager
from spaceone.identity.model.role_binding.database import RoleBinding

_LOGGER = logging.getLogger(__name__)
Expand All @@ -24,11 +23,6 @@ def _rollback(vo: RoleBinding):
role_binding_vo = self.role_binding_model.create(params)
self.transaction.add_rollback(_rollback, role_binding_vo)

if role_binding_vo.workspace_id and role_binding_vo.workspace_id != "*":
self._update_workspace_user_count(
role_binding_vo.workspace_id, role_binding_vo.domain_id
)

return role_binding_vo

def update_role_binding_by_vo(
Expand Down Expand Up @@ -67,11 +61,6 @@ def delete_role_binding_by_vo(
{"users": users}, user_group_vo=user_group_vo
)

if role_binding_vo.workspace_id and role_binding_vo.workspace_id != "*":
self._update_workspace_user_count(
role_binding_vo.workspace_id, role_binding_vo.domain_id
)

def get_role_binding(
self, role_binding_id: str, domain_id: str, workspace_id: str = None
) -> RoleBinding:
Expand All @@ -93,30 +82,3 @@ def list_role_bindings(self, query: dict) -> Tuple[QuerySet, int]:

def stat_role_bindings(self, query: dict) -> dict:
return self.role_binding_model.stat(**query)

def _update_workspace_user_count(self, workspace_id: str, domain_id: str) -> None:
workspace_mgr = WorkspaceManager()

workspace_vo = workspace_mgr.get_workspace(workspace_id, domain_id)

if workspace_vo and workspace_vo.workspace_id != "*":
user_rb_total_count = self._get_workspace_user_count(
workspace_id, domain_id
)

workspace_mgr.update_workspace_by_vo(
{"user_count": user_rb_total_count}, workspace_vo
)

def _get_workspace_user_count(self, workspace_id: str, domain_id: str) -> int:
user_rb_ids = self.stat_role_bindings(
query={
"distinct": "user_id",
"filter": [
{"k": "workspace_id", "v": workspace_id, "o": "eq"},
{"k": "domain_id", "v": domain_id, "o": "eq"},
],
}
).get("results", [])

return len(user_rb_ids)
68 changes: 7 additions & 61 deletions src/spaceone/identity/manager/user_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@

import pytz
from mongoengine import QuerySet
from spaceone.core.manager import BaseManager

from spaceone.core.manager import BaseManager
from spaceone.identity.error.error_user import *
from spaceone.identity.lib.cipher import PasswordCipher
from spaceone.identity.manager.project_manager import ProjectManager
from spaceone.identity.manager.role_binding_manager import RoleBindingManager
from spaceone.identity.manager.user_group_manager import UserGroupManager
from spaceone.identity.manager.workspace_group_manager import WorkspaceGroupManager
from spaceone.identity.model.user.database import User

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -61,7 +57,7 @@ def _rollback(vo: User):
def update_user_by_vo(self, params: dict, user_vo: User) -> User:
def _rollback(old_data):
_LOGGER.info(
f'[update_user_by_vo._rollback] Revert Data: {old_data["user_id"]}'
f"[update_user_by_vo._rollback] Revert Data: {old_data['user_id']}"
)
user_vo.update(old_data)

Expand All @@ -71,7 +67,9 @@ def _rollback(old_data):
if email := params.get("email"):
self._check_email_format(email)

required_actions = list(params.get("required_actions", user_vo.required_actions or []))
required_actions = list(
params.get("required_actions", user_vo.required_actions or [])
)
is_change_required_actions = False

if new_password := params.get("password"):
Expand All @@ -96,7 +94,7 @@ def _rollback(old_data):
def update_user_password_by_vo(self, user_vo: User, params: dict) -> User:
def _rollback(old_data):
_LOGGER.info(
f'[update_user_by_vo._rollback] Revert Data: {old_data["user_id"]}'
f"[update_user_by_vo._rollback] Revert Data: {old_data['user_id']}"
)
user_vo.update(old_data)

Expand All @@ -120,59 +118,7 @@ def _rollback(old_data):

return user_vo.update(update_params)

@staticmethod
def delete_user_by_vo(user_vo: User) -> None:
rb_mgr = RoleBindingManager()
user_group_mgr = UserGroupManager()
project_mgr = ProjectManager()
workspace_group_mgr = WorkspaceGroupManager()

# Delete role bindings
rb_vos = rb_mgr.filter_role_bindings(
user_id=user_vo.user_id, domain_id=user_vo.domain_id
)
for rb_vo in rb_vos:
rb_mgr.delete_role_binding_by_vo(rb_vo)

# Delete user from user groups
user_group_vos = user_group_mgr.filter_user_groups(
users=user_vo.user_id, domain_id=user_vo.domain_id
)
for user_group_vo in user_group_vos:
users = user_group_vo.users
users.remove(user_vo.user_id)
user_group_mgr.update_user_group_by_vo(
{"users": users}, user_group_vo=user_group_vo
)

# Delete projects
project_vos = project_mgr.filter_projects(
users=user_vo.user_id, domain_id=user_vo.domain_id
)
for project_vo in project_vos:
users = project_vo.users
users.remove(user_vo.user_id)
project_mgr.update_project_by_vo({"users": users}, project_vo=project_vo)

# Delete workspace groups
workspace_group_vos = workspace_group_mgr.filter_workspace_groups(
users__user_id=user_vo.user_id, domain_id=user_vo.domain_id
)

for workspace_group_vo in workspace_group_vos:
workspace_group_dict = workspace_group_vo.to_mongo().to_dict()
users = workspace_group_dict.get("users", [])

if users:
updated_users = [
user for user in users if user.get("user_id") != user_vo.user_id
]

if len(updated_users) != len(users):
workspace_group_mgr.update_workspace_group_by_vo(
{"users": updated_users}, workspace_group_vo=workspace_group_vo
)

def delete_user(self, user_vo: User) -> None:
user_vo.delete()

def get_user(self, user_id: str, domain_id: str) -> User:
Expand Down
47 changes: 38 additions & 9 deletions src/spaceone/identity/service/domain_service.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import logging
from typing import Union

from spaceone.core.service import *
from spaceone.core.service.utils import *
from spaceone.core import utils
from spaceone.core.auth.jwt import JWTAuthenticator

from spaceone.identity.manager.external_auth_manager import ExternalAuthManager
from spaceone.core.service import *
from spaceone.core.service.utils import *
from spaceone.identity.error.error_domain import *
from spaceone.identity.manager.config_manager import ConfigManager
from spaceone.identity.manager.domain_manager import DomainManager
from spaceone.identity.manager.domain_secret_manager import DomainSecretManager
from spaceone.identity.manager.role_manager import RoleManager
from spaceone.identity.manager.external_auth_manager import ExternalAuthManager
from spaceone.identity.manager.role_binding_manager import RoleBindingManager
from spaceone.identity.manager.user_manager import UserManager
from spaceone.identity.manager.config_manager import ConfigManager
from spaceone.identity.manager.role_manager import RoleManager
from spaceone.identity.manager.system_manager import SystemManager
from spaceone.identity.manager.user_manager import UserManager
from spaceone.identity.manager.workspace_manager import WorkspaceManager
from spaceone.identity.model.domain.request import *
from spaceone.identity.model.domain.response import *
from spaceone.identity.error.error_domain import *
from spaceone.identity.service.user_service import UserService

_LOGGER = logging.getLogger(__name__)
Expand All @@ -35,6 +35,8 @@ def __init__(self, *args, **kwargs):
self.domain_secret_mgr = DomainSecretManager()
self.user_mgr = UserManager()
self.role_manager = RoleManager()
self.rb_mgr = RoleBindingManager()
self.workspace_mgr = WorkspaceManager()

@transaction(permission="identity:Domain.write", role_types=["SYSTEM_ADMIN"])
@convert_model
Expand Down Expand Up @@ -87,6 +89,7 @@ def create(self, params: DomainCreateRequest) -> Union[DomainResponse, dict]:
"domain_id": user_vo.domain_id,
"role_type": role_vos[0].role_type,
}

role_binding_mgr.create_role_binding(params_rb)

return DomainResponse(**domain_vo.to_dict())
Expand Down Expand Up @@ -228,7 +231,7 @@ def get_public_key(
root_domain_id = SystemManager.get_root_domain_id()
root_pub_jwk = self.domain_secret_mgr.get_domain_public_key(root_domain_id)
JWTAuthenticator(root_pub_jwk).validate(token)
except Exception as e:
except Exception:
raise ERROR_UNKNOWN(message="Invalid System Token")

# Get Public Key from Domain
Expand Down Expand Up @@ -278,3 +281,29 @@ def stat(self, params: DomainStatQueryRequest) -> dict:

query = params.query or {}
return self.domain_mgr.stat_domains(query)

def _get_workspace_user_count(self, workspace_id: str, domain_id: str) -> int:
user_rb_ids = self.rb_mgr.stat_role_bindings(
query={
"distinct": "user_id",
"filter": [
{"k": "workspace_id", "v": workspace_id, "o": "eq"},
{"k": "domain_id", "v": domain_id, "o": "eq"},
],
}
).get("results", [])
return len(user_rb_ids)

def _update_workspace_user_count(self, workspace_id: str, domain_id: str) -> None:
if not workspace_id and not domain_id:
return

workspace_vo = self.workspace_mgr.get_workspace(workspace_id, domain_id)

if workspace_vo and workspace_vo.workspace_id != "*":
user_rb_total_count = self._get_workspace_user_count(
workspace_id, domain_id
)
self.workspace_mgr.update_workspace_by_vo(
{"user_count": user_rb_total_count}, workspace_vo
)
71 changes: 39 additions & 32 deletions src/spaceone/identity/service/role_binding_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ def create_role_binding(self, params: dict):
# Create role binding
rb_vo = self.role_binding_manager.create_role_binding(params)

self._update_workspace_user_count(rb_vo.workspace_id, rb_vo.domain_id)

return rb_vo

@transaction(
Expand Down Expand Up @@ -163,12 +165,6 @@ def update_role(
request_role_type=new_role_vo.role_type,
supported_role_type=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"],
)
self.check_last_workspace_owner_role_binding(
rb_vo.user_id,
new_role_vo.role_type,
rb_vo.workspace_id,
rb_vo.domain_id,
)
elif rb_vo.role_type == new_role_vo.role_type:
self.check_last_domain_admin_role_binding(
rb_vo.user_id,
Expand Down Expand Up @@ -249,12 +245,12 @@ def delete(self, params: RoleBindingDeleteRequest) -> None:
self.check_last_domain_admin_role_binding(
rb_vo.user_id, None, rb_vo.domain_id
)
elif rb_vo.role_type == "WORKSPACE_OWNER":
self.check_last_workspace_owner_role_binding(
rb_vo.user_id, None, rb_vo.workspace_id, rb_vo.domain_id
)

# Update user role type
# Update the user's representative role_type.
# A user's role_type is a single, domain-level field that summarizes their highest-priority role.
# Since deleting a role binding (even a workspace-specific one) might change this representative role,
# we must re-evaluate all of the user's remaining role bindings across the entire domain
# to determine their new, correct role_type.
remain_rb_vos = self.role_binding_manager.filter_role_bindings(
domain_id=params.domain_id, user_id=rb_vo.user_id
)
Expand All @@ -277,6 +273,7 @@ def delete(self, params: RoleBindingDeleteRequest) -> None:
self.user_mgr.update_user_by_vo(user_role_info, user_vo)

self.role_binding_manager.delete_role_binding_by_vo(rb_vo)
self._update_workspace_user_count(rb_vo.workspace_id, rb_vo.domain_id)

@transaction(
permission="identity:RoleBinding.read",
Expand Down Expand Up @@ -417,27 +414,6 @@ def check_last_domain_admin_role_binding(
if rb_vos.count() == 1 and new_role_type != "DOMAIN_ADMIN":
raise ERROR_LAST_DOMAIN_ADMIN_CANNOT_DELETE()

def check_last_workspace_owner_role_binding(
self,
user_id: str,
new_role_type: Union[str, None],
workspace_id: str,
domain_id: str,
) -> None:
user_ids = self._get_enabled_user_ids(domain_id)
rb_vos = self.role_binding_manager.filter_role_bindings(
domain_id=domain_id,
workspace_id=workspace_id,
user_id=user_ids,
role_type="WORKSPACE_OWNER",
)

if not rb_vos.filter(user_id=user_id):
return None

if rb_vos.count() == 1 and new_role_type != "WORKSPACE_OWNER":
raise ERROR_LAST_WORKSPACE_OWNER_CANNOT_DELETE()

def _get_enabled_user_ids(self, domain_id: str) -> list:
user_vos = self.user_mgr.filter_users(
domain_id=domain_id,
Expand All @@ -453,6 +429,11 @@ def check_self_update_and_delete(requested_user_id: str, user_id: str) -> None:

@staticmethod
def _get_latest_role_type(before: str, after: str) -> str:
# Determines the user's representative role by comparing priorities (lower number is higher).
# Policy: Workspace-level roles (OWNER, MEMBER) grant permissions within a workspace
# but do not elevate the user's fundamental role at the domain level.
# Therefore, if a user's highest role is a workspace role, their representative
# role_type defaults to 'USER'. Only 'DOMAIN_ADMIN' elevates this status.
priority = {
"DOMAIN_ADMIN": 1,
"WORKSPACE_OWNER": 2,
Expand All @@ -470,3 +451,29 @@ def _get_latest_role_type(before: str, after: str) -> str:
return "USER"

return after

def _get_workspace_user_count(self, workspace_id: str, domain_id: str) -> int:
user_rb_ids = self.role_binding_manager.stat_role_bindings(
query={
"distinct": "user_id",
"filter": [
{"k": "workspace_id", "v": workspace_id, "o": "eq"},
{"k": "domain_id", "v": domain_id, "o": "eq"},
],
}
).get("results", [])
return len(user_rb_ids)

def _update_workspace_user_count(self, workspace_id: str, domain_id: str) -> None:
if not workspace_id and not domain_id:
return

workspace_vo = self.workspace_mgr.get_workspace(workspace_id, domain_id)

if workspace_vo and workspace_vo.workspace_id != "*":
user_rb_total_count = self._get_workspace_user_count(
workspace_id, domain_id
)
self.workspace_mgr.update_workspace_by_vo(
{"user_count": user_rb_total_count}, workspace_vo
)
8 changes: 8 additions & 0 deletions src/spaceone/identity/service/role_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ def disable(self, params: RoleDisableRequest) -> Union[RoleResponse, dict]:
"""

role_vo = self.role_mgr.get_role(params.role_id, params.domain_id)

rb_vos = self.rb_mgr.filter_role_bindings(
role_id=role_vo.role_id, domain_id=role_vo.domain_id
)

if rb_vos.count() > 0:
raise ERROR_ROLE_IN_USED_AT_ROLE_BINDING(role_id=role_vo.role_id)

role_vo = self.role_mgr.disable_role_by_vo(role_vo)

return RoleResponse(**role_vo.to_dict())
Expand Down
Loading
Loading