From 33a115d10c1c62e828e64288e1e7309cb7f6b6ef Mon Sep 17 00:00:00 2001 From: haojinIntel Date: Tue, 31 Jan 2023 12:00:28 +0800 Subject: [PATCH] Support workspace for BCE --- python/cloudtik/core/_private/providers.py | 6 + .../providers/_private/baiduyun/config.py | 350 ++++++++++++++++++ .../providers/_private/baiduyun/utils.py | 0 .../_private/baiduyun/workspace_provider.py | 78 ++++ .../cloudtik/providers/baiduyun/__init__.py | 0 .../cloudtik/providers/baiduyun/commands.yaml | 2 + .../baiduyun/workspace-defaults.yaml | 13 + 7 files changed, 449 insertions(+) create mode 100644 python/cloudtik/providers/_private/baiduyun/config.py create mode 100644 python/cloudtik/providers/_private/baiduyun/utils.py create mode 100644 python/cloudtik/providers/_private/baiduyun/workspace_provider.py create mode 100644 python/cloudtik/providers/baiduyun/__init__.py create mode 100644 python/cloudtik/providers/baiduyun/commands.yaml create mode 100644 python/cloudtik/providers/baiduyun/workspace-defaults.yaml diff --git a/python/cloudtik/core/_private/providers.py b/python/cloudtik/core/_private/providers.py index a7a81899c..a2259be6b 100644 --- a/python/cloudtik/core/_private/providers.py +++ b/python/cloudtik/core/_private/providers.py @@ -154,6 +154,11 @@ def _import_azure_workspace(provider_config): return AzureWorkspaceProvider +def _import_baiduyun_workspace(provider_config): + from cloudtik.providers._private.baiduyun.workspace_provider import BaiduyunWorkspaceProvider + return BaiduyunWorkspaceProvider + + def _import_local_workspace(provider_config): from cloudtik.providers._private.local.workspace_provider import \ LocalWorkspaceProvider @@ -171,6 +176,7 @@ def _import_kubernetes_workspace(provider_config): "aws": _import_aws_workspace, "gcp": _import_gcp_workspace, "azure": _import_azure_workspace, + "baiduyun": _import_baiduyun_workspace, "kubernetes": _import_kubernetes_workspace, "external": _import_external # Import an external module } diff --git a/python/cloudtik/providers/_private/baiduyun/config.py b/python/cloudtik/providers/_private/baiduyun/config.py new file mode 100644 index 000000000..e2594b06b --- /dev/null +++ b/python/cloudtik/providers/_private/baiduyun/config.py @@ -0,0 +1,350 @@ +import copy +import json +import logging +import time +import uuid +import subprocess +from pathlib import Path +import random + +from typing import Any, Dict, Optional + +from baidubce.auth.bce_credentials import BceCredentials +from baidubce.bce_client_configuration import BceClientConfiguration +from baidubce.services.subnet import subnet_client +from baidubce.services.vpc import vpc_client + + +from cloudtik.core.tags import CLOUDTIK_TAG_NODE_KIND, NODE_KIND_HEAD, CLOUDTIK_TAG_CLUSTER_NAME +from cloudtik.core._private.cli_logger import cli_logger, cf +from cloudtik.core._private.utils import check_cidr_conflict, is_use_internal_ip, _is_use_working_vpc, is_use_working_vpc, is_use_peering_vpc, \ + is_managed_cloud_storage, is_use_managed_cloud_storage, _is_use_managed_cloud_storage, update_nested_dict +from cloudtik.core.workspace_provider import Existence, CLOUDTIK_MANAGED_CLOUD_STORAGE, \ + CLOUDTIK_MANAGED_CLOUD_STORAGE_URI + +BCE_RESOURCE_NAME_PREFIX = "cloudtik" + +BCE_WORKSPACE_SUBNET_NAME = BCE_RESOURCE_NAME_PREFIX + "-{}-{}-subnet" +BCE_WORKSPACE_VNET_PEERING_NAME = BCE_RESOURCE_NAME_PREFIX + "-{}-virtual-network-peering" +BCE_WORKSPACE_STORAGE_ACCOUNT_NAME = BCE_RESOURCE_NAME_PREFIX + "-{}-storage-account" +BCE_WORKSPACE_STORAGE_CONTAINER_NAME = BCE_RESOURCE_NAME_PREFIX + "-{}" +BCE_WORKSPACE_NETWORK_SECURITY_GROUP_NAME = BCE_RESOURCE_NAME_PREFIX + "-{}-network-security-group" +BCE_WORKSPACE_PUBLIC_IP_ADDRESS_NAME = BCE_RESOURCE_NAME_PREFIX + "-{}-public-ip-address" +BCE_WORKSPACE_NAT_NAME = BCE_RESOURCE_NAME_PREFIX + "-{}-nat" +BCE_WORKSPACE_SECURITY_RULE_NAME = BCE_RESOURCE_NAME_PREFIX + "-{}-security-rule-{}" +BCE_WORKSPACE_WORKER_USI_NAME = BCE_RESOURCE_NAME_PREFIX + "-{}-worker-user-assigned-identity" +BCE_WORKSPACE_HEAD_USI_NAME = BCE_RESOURCE_NAME_PREFIX + "-{}-user-assigned-identity" + +BCE_WORKSPACE_VERSION_TAG_NAME = "cloudtik-workspace-version" +BCE_WORKSPACE_VERSION_CURRENT = "1" + +BCE_WORKSPACE_NUM_CREATION_STEPS = 9 +BCE_WORKSPACE_NUM_DELETION_STEPS = 9 +BCE_WORKSPACE_TARGET_RESOURCES = 12 + +BCE_WORKSPACE_VPC_NAME = BCE_RESOURCE_NAME_PREFIX + "-{}-vpc" + +BCE_VPC_SUBNETS_COUNT = 2 + +logger = logging.getLogger(__name__) + + +def create_baiduyun_workspace(config): + return config + + +def delete_baiduyun_workspace(config, delete_managed_storage: bool = False): + pass + + +def check_baiduyun_workspace_integrity(config): + # existence = check_azure_workspace_existence(config) + # return True if existence == Existence.COMPLETED else False + pass + + +def update_baiduyun_workspace_firewalls(config): + pass + + +def get_workspace_head_nodes(provider_config, workspace_name): + pass + + +def list_baiduyun_clusters(config: Dict[str, Any]) -> Optional[Dict[str, Any]]: + pass + + +def bootstrap_baiduyun_workspace(config): + pass + + +def check_baiduyun_workspace_existence(config): + pass + + +def get_baiduyun_workspace_info(config): + pass + + +def _get_workspace_vpc_name(workspace_name): + return BCE_WORKSPACE_VPC_NAME.format(workspace_name) + + +def _create_vpc(config, vpc_cli): + workspace_name = config["workspace_name"] + vpc_name = _get_workspace_vpc_name(workspace_name) + cli_logger.print("Creating workspace VPC: {}...", vpc_name) + # create vpc + cidr_block = '10.0.0.0/16' + if is_use_peering_vpc(config): + # TODO + return + # current_vpc = get_current_vpc(config) + # cidr_block = _configure_peering_vpc_cidr_block(current_vpc) + + try: + response = vpc_cli.create_vpc(vpc_name, cidr_block) + cli_logger.print("Successfully created workspace VPC: {}.", vpc_name) + return response.vpc_id + except Exception as e: + cli_logger.error("Failed to create workspace VPC. {}", str(e)) + raise e + + +def _delete_vpc(config, vpc_cli): + use_working_vpc = is_use_working_vpc(config) + if use_working_vpc: + cli_logger.print("Will not delete the current working VPC.") + return + + vpc_id = get_workspace_vpc_id(config, vpc_cli) + vpc_name = _get_workspace_vpc_name(config["workspace_name"]) + + if vpc_id is None: + cli_logger.print("The VPC: {} doesn't exist.".format(vpc_name)) + return + + """ Delete the VPC """ + cli_logger.print("Deleting the VPC: {}...".format(vpc_name)) + + try: + vpc_cli.delete_vpc(vpc_id) + cli_logger.print("Successfully deleted the VPC: {}.".format(vpc_name)) + except Exception as e: + cli_logger.error("Failed to delete the VPC: {}. {}", vpc_name, str(e)) + raise e + + +def get_workspace_vpc_id(config, vpc_cli): + return _get_workspace_vpc_id(config["workspace_name"], vpc_cli) + + +def _get_workspace_vpc_id(workspace_name, vpc_cli): + vpc_name = _get_workspace_vpc_name(workspace_name) + cli_logger.verbose("Getting the VPC Id for workspace: {}...".format(vpc_name)) + vpc_ids = [vpc.vpc_id for vpc in vpc_cli.list_vpcs().vpcs if vpc.name == vpc_name] + if len(vpc_ids) == 0: + cli_logger.verbose("The VPC for workspace is not found: {}.".format(vpc_name)) + return None + else: + cli_logger.verbose_error("Successfully get the VPC Id of {} for workspace.".format(vpc_name)) + return vpc_ids[0] + + +def get_vpc(vpc_cli, vpc_id): + return vpc_cli.get_vpc(vpc_id).vpc + + +def _create_and_configure_subnets(config, vpc_cli, subnet_cli): + workspace_name = config["workspace_name"] + vpc_id = _get_workspace_vpc_id(workspace_name, vpc_cli) + vpc = get_vpc(vpc_cli, vpc_id) + + subnets = [] + cidr_list = _configure_subnets_cidr(vpc) + cidr_len = len(cidr_list) + + availability_zones = set(_get_availability_zones(subnet_cli)) + used_availability_zones = set() + default_availability_zone = list(availability_zones)[0] + last_availability_zone = None + + for i in range(0, cidr_len): + cidr_block = cidr_list[i] + subnet_type = "public" if i == 0 else "private" + with cli_logger.group( + "Creating {} subnet", subnet_type, + _numbered=("()", i + 1, cidr_len)): + try: + if i == 0: + subnet = _create_subnet(subnet_cli, default_availability_zone, workspace_name, vpc_id, cidr_block, isPrivate=False) + else: + if last_availability_zone is None: + last_availability_zone = default_availability_zone + + subnet = _create_subnet(subnet_cli, last_availability_zone, workspace_name, vpc_id, cidr_block) + + last_availability_zone = _next_availability_zone( + availability_zones, used_availability_zones, last_availability_zone) + + except Exception as e: + cli_logger.error("Failed to create {} subnet. {}", subnet_type, str(e)) + raise e + subnets.append(subnet) + + assert len(subnets) == BCE_VPC_SUBNETS_COUNT, "We must create {} subnets for VPC: {}!".format( + BCE_VPC_SUBNETS_COUNT, vpc_id) + return subnets + + +def _delete_private_subnets(workspace_name, vpc_id, subnet_cli): + _delete_subnets(workspace_name, vpc_id, subnet_cli, isPrivate=True) + + +def _delete_public_subnets(workspace_name, vpc_id, subnet_cli): + _delete_subnets(workspace_name, vpc_id, subnet_cli, isPrivate=False) + + +def _delete_subnets(workspace_name, vpc_id, subnet_cli, isPrivate=True): + subnetsType = "private" if isPrivate else "public" + """ Delete custom subnets """ + subnets = get_workspace_private_subnets(workspace_name, vpc_id, subnet_cli) \ + if isPrivate else get_workspace_public_subnets(workspace_name, vpc_id, subnet_cli) + + if len(subnets) == 0: + cli_logger.print("No subnets for workspace were found under this VPC: {}...".format(vpc_id)) + return + try: + for subnet in subnets: + cli_logger.print("Deleting {} subnet: {}...".format(subnetsType, subnet.subnet_id)) + subnet.delete() + cli_logger.print("Successfully deleted {} subnet: {}.".format(subnetsType, subnet.subnet_id)) + except Exception as e: + cli_logger.error("Failed to delete {} subnet. {}".format(subnetsType, str(e))) + raise e + + +def get_workspace_private_subnets(workspace_name, vpc_id, subnet_cli): + return _get_workspace_subnets(workspace_name, vpc_id, subnet_cli, "cloudtik-{}-private-subnet") + + +def get_workspace_public_subnets(workspace_name, vpc_id, subnet_cli): + return _get_workspace_subnets(workspace_name, vpc_id, subnet_cli, "cloudtik-{}-public-subnet") + + +def _get_workspace_subnets(workspace_name, vpc_id, subnet_cli, name_pattern): + subnets = [subnet for subnet in subnet_cli.list_subnets(vpc_id=vpc_id).subnets + if subnet.name.startswith(name_pattern.format(workspace_name))] + return subnets + + +def _next_availability_zone(availability_zones: set, used: set, last_availability_zone): + used.add(last_availability_zone) + unused = availability_zones.difference(used) + if len(unused) > 0: + return unused.pop() + + # Used all, restart + used.clear() + if len(availability_zones) > 0: + return next(iter(availability_zones)) + + return None + + +def _create_public_subnet(subnet_cli, zone_name, workspace_name, vpc_id, cidr_block): + cli_logger.print("Creating public subnet for VPC: {} with CIDR: {}...".format(vpc_id, cidr_block)) + subnet_name = 'cloudtik-{}-public-subnet'.format(workspace_name) + + response = subnet_cli.create_subnet(name=subnet_name, zone_name=zone_name, cidr=cidr_block, vpc_id=vpc_id) + cli_logger.print("Successfully created public subnet: {}.".format(subnet_name)) + + return response.subnet + + +def _create_subnet(subnet_cli, zone_name, workspace_name, vpc_id, cidr_block, isPrivate=True): + subnetType = "private" if isPrivate else "public" + cli_logger.print("Creating {} subnet for VPC: {} with CIDR: {}...".format(subnetType, vpc_id, cidr_block)) + subnet_name = 'cloudtik-{}-{}-subnet'.format(workspace_name, subnetType) + response = subnet_cli.create_subnet(name=subnet_name, zone_name=zone_name, cidr=cidr_block, vpc_id=vpc_id) + cli_logger.print("Successfully created {} subnet: {}.".format(subnetType, subnet_name)) + + return response.subnet + + +def _get_availability_zones(vpc_cli, subnet_cli): + default_vpc = vpc_cli.list_vpcs(isDefault=True).vpcs[0] + availability_zones = [subnet.zone_name for subnet in subnet_cli.list_subnets(vpc_id=default_vpc.vpc_id).subnets] + return availability_zones + + +def _configure_subnets_cidr(vpc): + cidr_list = [] + subnets = vpc.subnets + vpc_cidr = vpc.cidr + ip = vpc_cidr.split("/")[0].split(".") + + if len(subnets) == 0: + for i in range(0, BCE_VPC_SUBNETS_COUNT): + cidr_list.append(ip[0] + "." + ip[1] + "." + str(i) + ".0/24") + else: + cidr_blocks = [subnet.cidr for subnet in subnets] + for i in range(0, 256): + tmp_cidr_block = ip[0] + "." + ip[1] + "." + str(i) + ".0/24" + + if check_cidr_conflict(tmp_cidr_block, cidr_blocks): + cidr_list.append(tmp_cidr_block) + + if len(cidr_list) == BCE_VPC_SUBNETS_COUNT: + break + + return cidr_list + + +def _get_bce_credentials(provider_config): + access_key = provider_config.get("access_key") + access_key_secret = provider_config.get("access_key_secret") + return BceCredentials(access_key, access_key_secret) + + +def _create_vpc_client(bce_credentials, endpoint): + config = BceClientConfiguration(credentials=bce_credentials, endpoint=endpoint) + vpc_cli = vpc_client.VpcClient(config) + return vpc_cli + + +def _create_subnet_client(bce_credentials, endpoint): + config = BceClientConfiguration(credentials=bce_credentials, endpoint=endpoint) + subnet_cli = subnet_client.SubnetClient(config) + return subnet_cli + + +def construct_vpc_client(provider_config): + credentials = _get_bce_credentials(provider_config) + endpoint = _get_vpc_endpoint(provider_config["region"]) + return _create_vpc_client(credentials, endpoint) + + +def construct_subnet_client(provider_config): + credentials = _get_bce_credentials(provider_config) + endpoint = _get_vpc_endpoint(provider_config["region"]) + return _create_subnet_client(credentials, endpoint) + + +def check_bce_region(region): + bce_available_regions = ["bj", "bd", "su", "gz", "hkg", "sin", "fwh", "fsh"] + if region not in bce_available_regions: + cli_logger.abort( + "Unknown region " + cf.bold("{}") + "\n" + "Available regions are: {}", region, cli_logger.render_list(bce_available_regions)) + + +def _get_bos_endpoint(region): + check_bce_region(region) + return f"https://{region}.bcebos.com" + + +def _get_vpc_endpoint(region): + check_bce_region(region) + return f"https://bcc.{region}.baidubce.com" diff --git a/python/cloudtik/providers/_private/baiduyun/utils.py b/python/cloudtik/providers/_private/baiduyun/utils.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/cloudtik/providers/_private/baiduyun/workspace_provider.py b/python/cloudtik/providers/_private/baiduyun/workspace_provider.py new file mode 100644 index 000000000..54875fe3a --- /dev/null +++ b/python/cloudtik/providers/_private/baiduyun/workspace_provider.py @@ -0,0 +1,78 @@ +import logging +from typing import Any, Dict, Optional + +from cloudtik.core._private.utils import get_running_head_node, check_workspace_name_format +from cloudtik.providers._private.baiduyun.config import create_baiduyun_workspace, \ + delete_baiduyun_workspace, check_baiduyun_workspace_integrity, update_baiduyun_workspace_firewalls, \ + get_workspace_head_nodes, list_baiduyun_clusters, bootstrap_baiduyun_workspace, check_baiduyun_workspace_existence, \ + get_baiduyun_workspace_info +from cloudtik.core._private.providers import _get_node_provider +from cloudtik.core.tags import CLOUDTIK_GLOBAL_VARIABLE_KEY_PREFIX, CLOUDTIK_GLOBAL_VARIABLE_KEY +from cloudtik.core.workspace_provider import WorkspaceProvider + +AZURE_WORKSPACE_NAME_MAX_LEN = 55 + +logger = logging.getLogger(__name__) + + +class BaiduyunWorkspaceProvider(WorkspaceProvider): + def __init__(self, provider_config, workspace_name): + WorkspaceProvider.__init__(self, provider_config, workspace_name) + + def create_workspace(self, config): + create_baiduyun_workspace(config) + + def delete_workspace(self, config, + delete_managed_storage: bool = False): + delete_baiduyun_workspace(config, delete_managed_storage) + + def update_workspace_firewalls(self, config): + update_baiduyun_workspace_firewalls(config) + + def check_workspace_existence(self, config: Dict[str, Any]): + return check_baiduyun_workspace_existence(config) + + def check_workspace_integrity(self, config): + return check_baiduyun_workspace_integrity(config) + + def list_clusters(self, config: Dict[str, Any]) -> Optional[Dict[str, Any]]: + return list_baiduyun_clusters(config) + + def publish_global_variables(self, cluster_config: Dict[str, Any], + global_variables: Dict[str, Any]): + # Add prefix to the variables + global_variables_prefixed = {} + for name in global_variables: + prefixed_name = CLOUDTIK_GLOBAL_VARIABLE_KEY.format(name) + global_variables_prefixed[prefixed_name] = global_variables[name] + + provider = _get_node_provider(cluster_config["provider"], cluster_config["cluster_name"]) + head_node_id = get_running_head_node(cluster_config, provider) + provider.set_node_tags(head_node_id, global_variables_prefixed) + + def subscribe_global_variables(self, cluster_config: Dict[str, Any]): + global_variables = {} + head_nodes = get_workspace_head_nodes( + self.provider_config, self.workspace_name) + for head in head_nodes: + for key, value in head.tags.items(): + if key.startswith(CLOUDTIK_GLOBAL_VARIABLE_KEY_PREFIX): + global_variable_name = key[len(CLOUDTIK_GLOBAL_VARIABLE_KEY_PREFIX):] + global_variables[global_variable_name] = value + + return global_variables + + def validate_config(self, provider_config: Dict[str, Any]): + if len(self.workspace_name) > AZURE_WORKSPACE_NAME_MAX_LEN or\ + not check_workspace_name_format(self.workspace_name): + raise RuntimeError("{} workspace name is between 1 and {} characters, " + "and can only contain lowercase alphanumeric " + "characters and dashes".format(provider_config["type"], AZURE_WORKSPACE_NAME_MAX_LEN)) + + def get_workspace_info(self, config: Dict[str, Any]): + return get_baiduyun_workspace_info(config) + + @staticmethod + def bootstrap_workspace_config(config): + return bootstrap_baiduyun_workspace(config) + diff --git a/python/cloudtik/providers/baiduyun/__init__.py b/python/cloudtik/providers/baiduyun/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/cloudtik/providers/baiduyun/commands.yaml b/python/cloudtik/providers/baiduyun/commands.yaml new file mode 100644 index 000000000..48e79d53f --- /dev/null +++ b/python/cloudtik/providers/baiduyun/commands.yaml @@ -0,0 +1,2 @@ +# Include the common built-in commands +from: commands diff --git a/python/cloudtik/providers/baiduyun/workspace-defaults.yaml b/python/cloudtik/providers/baiduyun/workspace-defaults.yaml new file mode 100644 index 000000000..f9434e935 --- /dev/null +++ b/python/cloudtik/providers/baiduyun/workspace-defaults.yaml @@ -0,0 +1,13 @@ +# Include the common workspace defaults +from: workspace-defaults + +# Cloud-provider specific configuration. +provider: + type: baiduyun + # https://azure.microsoft.com/en-us/global-infrastructure/locations + location: bj + # Decide whether to require public IP for head node. + # When setting to False, Head node will require a public IP. Default to False + use_internal_ips: False + # Whether to create managed cloud storage of workspace. + managed_cloud_storage: True