diff --git a/README.md b/README.md index bc13f0f..9ef3c79 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,42 @@ # ckanext-cloudstorage -Implements support for using S3, Azure, or any of 15 different storage -providers supported by [libcloud][] to [CKAN][]. +`ckanext-cloudstorage` is a plugin for CKAN that enhances its capabilities by enabling the use of various cloud storage services. It supports integration with over 15 different storage providers, including Amazon S3, Google Cloud Storage, and Azure, via [libcloud][]. This flexibility allows CKAN[CKAN][] to leverage the robustness and scalability of these cloud storage solutions -# Setup +## Features +- **Google Storage bucket integration**: You have the ability to upload files to Google Cloud Platform (GCP) Bucket Storage and download files from GCP Workspace Storage. +- **GCP group management**: In GCP Workspace, you have the capability to administer groups efficiently. This includes creating and deleting groups, as well as adding and removing members from these groups... +- **Manage IAM permission**: ou have the capability to set IAM permissions for GCP Storage Buckets and configure group permissions, allowing for effective management of access control to these storage resources + +Most libcloud-based providers should work out of the box, but only those listed +below have been tested: + +| Provider | Uploads | Downloads | Secure URLs (private resources) | +| --- | --- | --- | --- | +| Google Bucket | YES | YES | YES (if `google-auth` and `six>=1.5` is installed) + + +## Prerequisites + +- Python 2.7 +- Google Workspace domain with admin access +- Service account with domain-wide delegation and the necessary permissions + + +## Installation + +Fork the repository[repository][https://github.com/ccancellieri/ckanext-cloudstorage] and clone to your local machine and switch to `google-cloud-support` branch + + +## Setup After installing `ckanext-cloudstorage`, add it to your list of plugins in your `.ini`: +```bash ckan.plugins = stats cloudstorage +``` + If you haven't already, setup [CKAN file storage][ckanstorage] or the file upload button will not appear. @@ -17,8 +44,10 @@ Every driver takes two options, regardless of which one you use. Both the name of the driver and the name of the container/bucket are case-sensitive: - ckanext.cloudstorage.driver = AZURE_BLOBS +```bash + ckanext.cloudstorage.driver = GOOGLE_STORAGE ckanext.cloudstorage.container_name = demo +``` You can find a list of driver names [here][storage] (see the `Provider Constant` column.) @@ -27,20 +56,11 @@ Each driver takes its own setup options. See the [libcloud][] documentation. These options are passed in using `driver_options`, which is a Python dict. For most drivers, this is all you need: +```bash ckanext.cloudstorage.driver_options = {"key": "", "secret": ""} +``` -# Support - -Most libcloud-based providers should work out of the box, but only those listed -below have been tested: - -| Provider | Uploads | Downloads | Secure URLs (private resources) | -| --- | --- | --- | --- | -| Azure | YES | YES | YES (if `azure-storage` is installed) | -| AWS S3 | YES | YES | YES (if `boto` is installed) | -| Rackspace | YES | YES | No | - -# What are "Secure URLs"? +### What are "Secure URLs"? "Secure URLs" are a method of preventing access to private resources. By default, anyone that figures out the URL to your resource on your storage @@ -49,9 +69,9 @@ instead let ckanext-cloudstorage generate temporary, one-use URLs to download the resource. This means that the normal CKAN-provided access restrictions can apply to resources with no further effort on your part, but still get all the benefits of your CDN/blob storage. - - ckanext.cloudstorage.use_secure_urls = 1 - +```bash + ckanext.cloudstorage.use_secure_urls = True +``` This option also enables multipart uploads, but you need to create database tables first. Run next command from extension folder: `paster cloudstorage initdb -c /etc/ckan/default/production.ini ` @@ -60,37 +80,83 @@ With that feature you can use `cloudstorage_clean_multipart` action, which is av only for sysadmins. After executing, all unfinished multipart uploads, older than 7 days, will be aborted. You can configure this lifetime, example: +```bash ckanext.cloudstorage.max_multipart_lifetime = 7 +``` + +## Install the require dependencies -# Migrating From FileStorage +from `ckanext-cloudstorage` folder execute the activate your virtual environment and run the command below: +```bash +pip install -r requirements.txt +``` + +## Migrating From FileStorage If you already have resources that have been uploaded and saved using CKAN's built-in FileStorage, cloudstorage provides an easy migration command. Simply setup cloudstorage as explained above, enable the plugin, and run the migrate command. Provide the path to your resources on-disk (the -`ckan.storage_path` setting in your CKAN `.ini` + `/resources`), and +`ckan.storage_path` setting in your CKAN `.ini`), and cloudstorage will take care of the rest. Ex: - paster cloudstorage migrate -c ../ckan/development.ini +Before running etl script make sure you have setup this config values : + +```bash +ckanext.cloudstorage.service_account_key_path= {PATH_TO_SECRET_KEY_FILE} +ckanext.cloudstorage.gcp_base_url= {GCP_BASE_URL} +ckan.site_url= {SITE_URL} +ckan.root_path= {ROOT_PATH} +ckan.storage_path={STORAGE_PATH} +ckanext.cloudstorage.prefix={PREFIX} +ckanext.cloudstorage.domain={DOMAIN} +``` -# Notes +from `ckanext-cloudstorage` folder execute this command: + +```bash +cd ckanext/cloudstorage/etl +``` + +and then from `etl` folder run the command below: + +```bash + +python etl_run.py organization_name ckan_api_key configuration_file +``` +- Replace `organization_name` with the actual name of the organization you want to process. +- Replace `ckan_api_key` with the actual sysadmin api key of your ckan instance. +- Replace `configuration_file` with the path of your production.ini file. + + +## Notes 1. You should disable public listing on the cloud service provider you're using, if supported. 2. Currently, only resources are supported. This means that things like group and organization images still use CKAN's local file storage. +3. Make sure the vm instance has the correct scopes. If not use this command below to set right scopes: + + ```bash + gcloud beta compute instances set-scopes [INSTANCE_NAME] --scopes=https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/devstorage.full_control [--zone=[ZONE]] + + ``` + and restart the vm instance after to allow changes to be applied. + +4. Check if scopes has been apply correctly by using this command below: -# FAQ + ```bash -- *DataViews aren't showing my data!* - did you setup CORS rules properly on - your hosting service? ckanext-cloudstorage can try to fix them for you automatically, - run: + gcloud compute instances describe [INSTANCE_NAME] --format='get(serviceAccounts[].scopes[])' - paster cloudstorage fix-cors -c= + ``` -- *Help! I can't seem to get it working!* - send me a mail! tk@tkte.ch +## License +This project is licensed under the MIT License - see the LICENSE file for details. -[libcloud]: https://libcloud.apache.org/ -[ckan]: http://ckan.org/ -[storage]: https://libcloud.readthedocs.io/en/latest/storage/supported_providers.html -[ckanstorage]: http://docs.ckan.org/en/latest/maintaining/filestore.html#setup-file-uploads +## Acknowledgements +- [Google APIs Client Library for Python](https://github.com/googleapis/google-api-python-client) +- [libcloud](https://libcloud.apache.org/) +- [ckan](http://ckan.org/) +- [storage](https://libcloud.readthedocs.io/en/latest/storage/supported_providers.html) +- [ckanstorage](http://docs.ckan.org/en/latest/maintaining/filestore.html#setup-file-uploads) diff --git a/ckanext/cloudstorage/authorization.py b/ckanext/cloudstorage/authorization.py new file mode 100644 index 0000000..dd3523a --- /dev/null +++ b/ckanext/cloudstorage/authorization.py @@ -0,0 +1,37 @@ +from google.oauth2 import service_account +from google.auth.transport.requests import AuthorizedSession + + +class AuthorizedSessionError(Exception): + """Custom exception for upload failures.""" + pass + + +def create_id_token_and_auth_session(service_account_json_file, target_audience="https://groups.fao.org"): + """ + Generates an ID token using a GCP service account and makes a POST request. + + This function creates an ID token using Google Cloud Platform service account credentials, + and returns an authorized session for making HTTP requests, particularly POST requests. + + :param service_account_json_file: Path to the service account key file in JSON format. + It contains credentials for the service account. + :param target_audience: The intended audience (URL) for the ID token. This specifies + the target service or API that the token is intended for. + + :return: An instance of `AuthorizedSession` with ID token credentials. This session + can be used for authenticated HTTP requests to the specified target audience. + """ + # Load the service account credentials and create an ID token + try: + credentials = service_account.IDTokenCredentials.from_service_account_file( + service_account_json_file, + target_audience=target_audience + ) + + # Create an authorized session using the credentials + auth_session = AuthorizedSession(credentials) + + return auth_session + except Exception as e: + raise AuthorizedSessionError("Error creating authorized session: {}".format(e)) diff --git a/ckanext/cloudstorage/bucket.py b/ckanext/cloudstorage/bucket.py new file mode 100644 index 0000000..971d238 --- /dev/null +++ b/ckanext/cloudstorage/bucket.py @@ -0,0 +1,141 @@ +import logging +import os + +from google.cloud import storage +from google.cloud.exceptions import NotFound, GoogleCloudError + + +# Configure logging +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + + +class UploadError(Exception): + """Custom exception for upload failures.""" + pass + + +class BucketError(Exception): + """Custom exception for upload failures.""" + pass + + +def create_bucket(bucket_name, cloud_storage=None): + """ + Create a Google Cloud Storage bucket and optionally update CloudStorage instance. + + Args: + bucket_name (str): The name of the bucket to be created. + cloud_storage (CloudStorage, optional): Instance to update with the new bucket name. + + Returns: + bool: True if bucket is created successfully, False if an error occurs. + """ + try: + storage_client = storage.Client() + bucket = storage_client.create_bucket(bucket_name) + log.info("Bucket {} created".format(bucket.name)) + + if cloud_storage: + from ckanext.cloudstorage.storage import CloudStorage + if isinstance(cloud_storage, CloudStorage): + cloud_storage.container_name = bucket_name + + except Exception as e: + log.error("Error creating bucket: {}".format(e)) + raise BucketError("Error creating bucket: {}".format(e)) + + +def check_err_response_from_gcp(response, err_msg): + if "error" in response: + log.error("{}: {}".format(err_msg, response)) + raise Exception(response["error"]) + return response + + +def add_group_iam_permissions(bucket_name, group_email): + """ + Grant read and list permissions to a group for a specific Google Cloud Storage bucket. + + Args: + bucket_name (str): Name of the Google Cloud Storage bucket. + group_email (str): Email address of the group to grant permissions. + + Returns: + bool: True if permissions are added successfully, False otherwise. + """ + storage_client = storage.Client() + try: + # Attempt to get the bucket + bucket = storage_client.get_bucket(bucket_name) + except NotFound: + # This block will execute if the bucket is not found + raise RuntimeError("Bucket '{}' not found.".format(bucket_name)) + except Exception as e: + # This block will execute for any other exceptions + raise RuntimeError( + "An error occurred getting bucket info: {}".format(e)) + + policy = bucket.get_iam_policy() + response = check_err_response_from_gcp(policy, "Error getting Iam policiy") + log.info("Iam policy {}".format(response)) + + viewer_role = "roles/storage.objectViewer" + policy[viewer_role].add("group:" + group_email) + response = bucket.set_iam_policy(policy) + response = check_err_response_from_gcp( + response, "Error modifying bucket IAM policy") + log.info("Read and list permissions granted to group {} on bucket {}: IAM Policy is now:\n{}" + .format(group_email, bucket_name, response)) + + +def upload_to_gcp_bucket( + bucket_name, + destination_blob_name, + source_file_name, + cloud_storage=None, + group_email=None +): + """ + Uploads a file to the bucket. + + :param bucket_name: Name of your bucket. + :param destination_blob_name: Blob name to use for the uploaded file. + :param source_file_name: File to upload. + """ + storage_client = storage.Client() + + try: + # Ensure the source file exists + if not os.path.exists(source_file_name): + raise IOError( + "The source file {} does not exist.".format(source_file_name)) + + # Try to get the bucket, create it if it does not exist + try: + bucket = storage_client.get_bucket(bucket_name) + except NotFound: + log.warning("Bucket {} does not exist, creating it.".format(bucket_name)) + create_bucket(bucket_name,cloud_storage) + bucket = storage_client.get_bucket(bucket_name) + add_group_iam_permissions(bucket_name, group_email) + + # Create a blob object + blob = bucket.blob(destination_blob_name) + + # Attempt to upload the file + blob.upload_from_filename(source_file_name) + except IOError as e: + if e.errno == 2: + # Handle the file not found error specifically + log.error("File not found: {}".format(e)) + raise + except GoogleCloudError as e: + # Handle Google Cloud specific exceptions + log.error("An error occurred with Google Cloud Storage: {}".format(e)) + raise UploadError("Failed to upload {} to {}/{}".format( + source_file_name, bucket_name, destination_blob_name)) + except Exception as e: + # Handle any other exceptions + log.error("An unexpected error occurred: {}".format(e)) + raise UploadError("Unexpected error during upload: {}".format(e)) diff --git a/ckanext/cloudstorage/controller.py b/ckanext/cloudstorage/controller.py index 992b0c1..4cc7a7d 100644 --- a/ckanext/cloudstorage/controller.py +++ b/ckanext/cloudstorage/controller.py @@ -5,10 +5,14 @@ from pylons import c from pylons.i18n import _ +from ckan.common import request, response from ckan import logic, model from ckan.lib import base, uploader import ckan.lib.helpers as h +import ckanext.cloudstorage.storage as _storage +storage = _storage.CloudStorage +is_proxy_download=storage.proxy_download.fget(storage) class StorageController(base.BaseController): def resource_download(self, id, resource_id, filename=None): @@ -48,12 +52,28 @@ def resource_download(self, id, resource_id, filename=None): # if the client requests with a Content-Type header (e.g. Text preview) # we have to add the header to the signature try: - content_type = getattr(c.pylons.request, "content_type", None) + content_type = getattr(request, "content_type", None) except AttributeError: content_type = None - uploaded_url = upload.get_url_from_filename(resource['id'], filename, - content_type=content_type) + + # If the repository is private you may want to use ckan accout to proxy + # protected contents + # ckanext.cloudstorage.proxy_download = [False|True] + # Default: False + if is_proxy_download: + # remote object + obj = upload.get_object(resource['id'],filename) + # metaadta + extra = obj.extra + if extra: + # let's leverage on external mimetype if present + response.headers['Content-Type'] = extra.get('content_type',content_type) + # return stream back + return upload.get_object_as_stream(obj) + uploaded_url = upload.get_url_from_filename(resource['id'], filename, + content_type=content_type) + # The uploaded file is missing for some reason, such as the # provider being down. if uploaded_url is None: diff --git a/ckanext/cloudstorage/etl/__init__.py b/ckanext/cloudstorage/etl/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ckanext/cloudstorage/etl/ckan_manager.py b/ckanext/cloudstorage/etl/ckan_manager.py new file mode 100644 index 0000000..a7f40e2 --- /dev/null +++ b/ckanext/cloudstorage/etl/ckan_manager.py @@ -0,0 +1,425 @@ +import requests +import logging +import json +import os + + +# Logging Configuration +log = logging.getLogger('CKAN_Manager') + +class CKANManager: + """ + A class to manage interactions with a CKAN instance. + + Attributes: + ckan_url (str): URL of the CKAN instance. + api_key (str): API key for authentication (optional). + """ + ORGANIZATION_DETAIL_ENDPOINT = '/api/3/action/organization_show?id=' + ORGANIZATION_USER_MEMBERS_LIST = '/api/3/action/organization_show?id={}&include_users=true' + ORGANIZATION_LIST_ENDPOINT = '/api/3/action/organization_list' + ORGANIZATION_DESC_ENDPOINT = '/api/3/action/organization_list?all_fields=true' + ORGANIZATION_SHOW_ENDPOINT = '/api/3/action/organization_show?id={}&include_datasets=true' + PACKAGE_SHOW_ENDPOINT = '/api/3/action/package_show?id={}' + USER_LIST_ENDPOINT = '/api/3/action/user_list?all_fields=true' + + def __init__(self, ckan_url, ckan_storage_dir, api_key=None, prefix="", domain=None): + """ + Initialize the CKANManager with CKAN instance URL and API key. + + Args: + ckan_url (str): URL of the CKAN instance. + api_key (str): API key for authentication (optional). + """ + self.ckan_url = ckan_url + self.api_key = api_key + self.ckan_storage_dir = ckan_storage_dir + self.prefix = prefix + self.doamin = domain + + def get_request(self, url): + """ + Perform a GET request to a specified URL. + + Args: + url (str): The URL to make the GET request to. + + Returns: + dict: JSON response data or None in case of an error. + """ + headers = {'Authorization': self.api_key} if self.api_key else {} + try: + response = requests.get(url, headers=headers) + response.raise_for_status() + return response.json() + except requests.RequestException as e: + log.error("Request error: {0}".format(e)) + return None + + def download_file(self, url, save_path): + """ + Download a file from a given URL and save it to a specified path. + + Args: + url (str): URL of the file to download. + save_path (str): Path where the file will be saved. + """ + try: + response = requests.get(url, stream=True) + response.raise_for_status() + + with open(save_path, 'wb') as file: + for chunk in response.iter_content(chunk_size=8192): + file.write(chunk) + return True + except requests.RequestException as e: + log.error("Error downloading file: {}".format(e)) + return False + + def get_active_users(self): + """ + Retrieves all active users of the CKAN instance with their usernames and emails. + + Returns: + dict: A dictionary mapping usernames to their emails. + """ + endpoint = self.ckan_url.rstrip('/') + self.USER_LIST_ENDPOINT + data = self.get_request(endpoint) + + if data and data.get('success'): + log.info("Successfully retrieved all users.") + active_users = {user['name']: user['email'] for user in data['result'] if user['state'] == 'active'} + return active_users + else: + log.error("Failed to retrieve users.") + return None + + def get_organization_description(self, organization_name): + """ + Retrieves the name and description for a specific organization. + + Args: + organization_name (str): The name of the organization to retrieve. + + Returns: + dict or None: A dictionary with the organization name as key and its description as value, + or None if the organization is not found or an error occurs. + """ + endpoint = self.ckan_url.rstrip('/') + self.ORGANIZATION_DETAIL_ENDPOINT + organization_name + data = self.get_request(endpoint) + + if data and data.get('success'): + org = data['result'] + log.info("Successfully retrieved organization: {}.".format(organization_name)) + return {org['name']: org['description']} + else: + log.error("Failed to retrieve organization: {}.".format(organization_name)) + return None + + + def get_all_organizations(self): + """ + Retrieve all organizations from the CKAN instance. + + Returns: + list: List of organizations or None if an error occurs. + """ + endpoint = self.ckan_url.rstrip('/') + self.ORGANIZATION_LIST_ENDPOINT + data = self.get_request(endpoint) + + if data and data.get('success'): + log.info("Successfully retrieved organizations.") + return data['result'] + else: + log.error("Failed to retrieve organizations.") + return None + + def get_organizations_with_descriptions(self): + """ + Retrieves all organizations with their names and descriptions. + + Returns: + dict: A dictionary with organization names as keys and their descriptions as values. + """ + endpoint = self.ckan_url.rstrip('/') + self.ORGANIZATION_DESC_ENDPOINT + data = self.get_request(endpoint) + + if data and data.get('success'): + log.info("Successfully retrieved all organizations.") + return {org['name']: org['description'] for org in data['result']} + else: + log.error("Failed to retrieve organizations.") + return None + + def get_organization_members(self, organization_name): + """ + Retrieves all user members of a specific CKAN organization with their roles. + """ + endpoint = self.ckan_url.rstrip('/') + self.ORGANIZATION_USER_MEMBERS_LIST.format(organization_name) + data = self.get_request(endpoint) + + if data and data.get('success'): + log.info("Successfully retrieved user members of {}.".format(organization_name)) + users = data['result']['users'] + return {user['name']: user['capacity'] for user in users} + else: + log.error("Failed to retrieve organization members for {}.".format(organization_name)) + return None + + def get_all_organization_members(self): + """ + Retrieves all user members with their roles for each CKAN organization. + + Returns: + dict: A dictionary with organization names as keys and dicts of user members + and their roles as values. + """ + organizations = self.get_all_organizations() + all_members = {} + + if not organizations: + log.error("No organizations found or failed to retrieve organizations.") + return None + + for org_name in organizations: + if org_name: + members = self.get_organization_members(org_name) + if members: + all_members[org_name] = members + + return all_members + + + def get_members_for_single_org(self, organziation): + """ + Retrieves all user members with their roles for given organization. + + Returns: + dict: A dictionary with organization names as keys and dicts of user members + and their roles as values. + """ + all_members = {} + + members = self.get_organization_members(organziation) + if members: + all_members[organziation] = members + + return all_members + + def get_packages_for_organization(self, organization): + """ + Retrieve all packages for a given organization. + + Args: + organization (str): Name of the organization. + + Returns: + list: List of packages or None if an error occurs. + """ + endpoint = self.ckan_url.rstrip('/') + self.ORGANIZATION_SHOW_ENDPOINT.format(organization) + data = self.get_request(endpoint) + + if data and data.get('success'): + log.info("Successfully retrieved packages for {0}.".format(organization)) + return [dataset['name'] for dataset in data['result']['packages']] + else: + log.error("Failed to retrieve packages for {0}.".format(organization)) + return None + + def get_resources_for_package(self, package_id): + """ + Retrieve all resources for a given package. + + Args: + package_id (str): ID of the package. + + Returns: + list: List of resources or None if an error occurs. + """ + endpoint = self.ckan_url.rstrip('/') + self.PACKAGE_SHOW_ENDPOINT.format(package_id) + data = self.get_request(endpoint) + + if data and data.get('success'): + log.info("Successfully retrieved resources for package {0}.".format(package_id)) + return data['result']['resources'] + else: + log.error("Failed to retrieve resources for package {0}.".format(package_id)) + return None + + def get_all_resources_for_packages(self, packages): + """ + Retrieve all resources for a list of packages. + + Args: + packages (list): List of package IDs. + + Returns: + dict: Dictionary mapping package IDs to their resources. + """ + all_resources = {} + for package in packages: + resources = self.get_resources_for_package(package) + all_resources[package] = resources or [] + return all_resources + + def get_all_data(self): + """ + Retrieve all organizations, their packages, and resources. + + Returns: + dict: Data containing organizations, packages, and resources. + """ + organizations_data = {} + organizations = self.get_all_organizations() + + if not organizations: + return None + + for organization in organizations: + packages = self.get_packages_for_organization(organization) + if packages: + resources = self.get_all_resources_for_packages(packages) + organizations_data[organization] = resources + + return organizations_data + + def get_data_for_single_org(self, organization): + """ + Retrieve all packages, and resources for single organization + + Returns: + dict: Data containing organization, packages, and resources. + """ + organizations_data = {} + + if organization == "": + return None + + packages = self.get_packages_for_organization(organization) + if packages: + resources = self.get_all_resources_for_packages(packages) + organizations_data[organization] = resources + + return organizations_data + + def save_to_json(self, data, filename): + """ + Save data to a JSON file. + + Args: + data (dict): The data to be saved. + filename (str): Name of the file to save the data in. + """ + with open(filename, 'w') as file: + json.dump(data, file, indent=4) + log.info("Data saved to {0}.".format(filename)) + + def ensure_dir(self, file_path): + directory = os.path.dirname(file_path) + if not os.path.exists(directory): + os.makedirs(directory) + + def check_resource_directories(self, base_path, resource_id): + """ + Check if specific subdirectories based on a resource ID exist in the base path. + + :param base_path: The base directory path. + :param resource_id: The resource ID to create the subdirectory paths. + """ + if len(resource_id) < 6: + log.error("Resource ID is too short.") + return + + first_dir = resource_id[:3] + second_dir = resource_id[3:6] + + first_path = os.path.join(base_path, first_dir) + second_path = os.path.join(first_path, second_dir) + + if not os.path.isdir(first_path): + log.error("Directory does not exist: {}".format(first_path)) + elif not os.path.isdir(second_path): + log.error("Directory does not exist: {}".format(second_path)) + else: + log.info("Both directories exist: {}, {}".format(first_path, second_path)) + return True + return False + + def delete_file(self, file_path): + """ + Deletes a file from the given file path. + + Args: + file_path (str): The path of the file to be deleted. + """ + try: + if os.path.isfile(file_path): + os.remove(file_path) + log.info("File '{}' successfully deleted.".format(file_path)) + else: + log.warning("File '{}' not found.".format(file_path)) + except Exception as e: + log.error("Error occurred while deleting file '{}': {}".format(file_path, e)) + + def process_resources(self, data, upload_to_gcp_bucket): + """ + Processes resources in the provided data, downloads files, uploads to GCP, and deletes local copies. + + Args: + data (dict): A dictionary containing organization, package, and resource details. + upload_to_gcp_bucket (function): Function to upload file to GCP. + """ + for organization, packages in data.items(): + for package_id, resources in packages.items(): + for resource in resources: + if resource: + resource_id = resource.get("id", "") + url = resource.get("url", "") + if url: + file_name = url.split('/')[-1] + # Get the current working directory + current_directory = os.getcwd() + + # Get the parent directory + parent_directory = os.path.dirname(current_directory) + download_dir = os.path.join(parent_directory, "") + log.info("download_dir: {}".format(download_dir)) + self.ensure_dir(download_dir) + + file_path = os.path.join(download_dir, file_name) + log.info("file_path: {}".format(file_path)) + + if url.startswith(self.ckan_url): + base_resource_dir= "{}/resources/".format(self.ckan_storage_dir) + bucket_name = self.prefix + organization + group_email = bucket_name + "@" + self.doamin + destination_blob_name = os.path.join( + 'packages', + package_id, + 'resources', + resource_id, + file_name + ) + if self.check_resource_directories(base_resource_dir, resource_id) is False: + success = self.download_file(url, file_path) + if success: + log.info("Downloaded {} to {}".format(file_name, file_path)) + # upload file to bucket + upload_to_gcp_bucket(bucket_name, destination_blob_name, file_path, group_email=group_email) + self.delete_file(file_path) + else: + log.error("Failed to download {}".format(url)) + else: + # build full path resource on file system + first_dir = resource_id[:3] + second_dir = resource_id[3:6] + resource = resource_id[6:] + first_path = os.path.join(base_resource_dir, first_dir) + second_path = os.path.join(first_path, second_dir) + full_resource_path = os.path.join(second_path, resource) + log.info("the full resource path on file system: {}".format(full_resource_path)) + # upload file to bucket + upload_to_gcp_bucket(bucket_name, destination_blob_name, full_resource_path, group_email=group_email) + + else: + log.warning('Skipping external URL: {}'.format(url)) diff --git a/ckanext/cloudstorage/etl/constants.py b/ckanext/cloudstorage/etl/constants.py new file mode 100644 index 0000000..d006c96 --- /dev/null +++ b/ckanext/cloudstorage/etl/constants.py @@ -0,0 +1,77 @@ +import logging +import configparser + +# Configure Logging +log = logging.getLogger('Config') + + +class ConfigNotFound(Exception): + """Exception raised for missing configuration. + Attributes: + message -- explanation of the error + """ + def __init__(self, message): + self.message = message + super(ConfigNotFound, self).__init__(self.message) + + +class ConfigurationManager(object): + """ + Manages loading and accessing configuration settings from a file. + + This class uses a class method to load configuration settings from a specified + file and provides a method to retrieve individual configuration values. + """ + + _config = None + _loaded = False + + @classmethod + def load_config(cls, config_file): + """ + Load the configuration from a file. + + Args: + config_file (str): The path to the configuration file. + + Raises: + ConfigNotFound: If the file is not found. + Exception: For any other issues encountered during file loading. + """ + if not cls._loaded: + cls._config = configparser.ConfigParser() + try: + with open(config_file) as f: + cls._config.readfp(f) + cls._loaded = True + log.info("Configuration loaded successfully from {}".format(config_file)) + except IOError: + log.error("Unable to find '{}'".format(config_file)) + raise Exception("Unable to find '{}'".format(config_file)) + except Exception as e: + log.error("Unexpected error while attempting to load '{}': {}".format(config_file, e)) + raise Exception("Unexpected error while attempting to load '{}': {}".format(config_file, e)) + + @classmethod + def get_config_value(cls, section, option, error_msg): + """ + Retrieve a configuration value from the loaded configuration. + + Args: + section (str): The section in the configuration file. + option (str): The option key to retrieve. + error_msg (str): Error message to display if the option is not found. + + Returns: + str: The value of the configuration option. + + Raises: + Exception: If the configuration is not loaded or the option is not found. + """ + if not cls._loaded: + raise Exception("Configuration not loaded. Call 'load_config' first.") + try: + return cls._config.get(section, option) + except configparser.NoOptionError: + log.error(error_msg) + raise ConfigNotFound(message=error_msg) diff --git a/ckanext/cloudstorage/etl/debug.sh b/ckanext/cloudstorage/etl/debug.sh new file mode 100644 index 0000000..ee13544 --- /dev/null +++ b/ckanext/cloudstorage/etl/debug.sh @@ -0,0 +1,4 @@ +# run this script inside etl folder for debugging purpose +# Don't forget to add these 3 arguments: organization, apikey , and configuration file +# test_ckan_1 32597cc5-8918-496c-a3e9-604de0d4fc0e /etc/ckan/production.ini +python -m debugpy --log-to-stderr --wait-for-client --listen 0.0.0.0:5675 etl_run.py diff --git a/ckanext/cloudstorage/etl/etl_run.py b/ckanext/cloudstorage/etl/etl_run.py new file mode 100644 index 0000000..24f2f00 --- /dev/null +++ b/ckanext/cloudstorage/etl/etl_run.py @@ -0,0 +1,97 @@ +import logging +import argparse +from constants import ConfigurationManager +import os + +from ckanext.cloudstorage.authorization import create_id_token_and_auth_session +from ckanext.cloudstorage.etl.org_group_manager import OrganizationGroupManager +from ckanext.cloudstorage.etl.ckan_manager import CKANManager +from ckanext.cloudstorage.bucket import upload_to_gcp_bucket + + +# Logging Configuration +log = logging.getLogger('ETL') +log.setLevel(logging.DEBUG) +ch = logging.StreamHandler() +ch.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +ch.setFormatter(formatter) +log.addHandler(ch) +log.propagate = False + +def run(): + parser = argparse.ArgumentParser(description='Run ETL process for a specific organization.') + parser.add_argument('organization', help='Name of the organization') + parser.add_argument('ckan_api_key', help='ckan api key value') + parser.add_argument('config_file', help='Configuration ini file') + args = parser.parse_args() + + # Load configuration + ConfigurationManager.load_config(args.config_file) + + log.info("="*100) + CKAN_ROOT_PATH = ConfigurationManager.get_config_value('app:main', 'ckan.root_path', + "CKAN ROOT PATH not defined in production.ini") + log.info("CKAN_ROOT_PATH = {}".format(CKAN_ROOT_PATH)) + + CKAN_BASE_URL = ConfigurationManager.get_config_value('app:main', 'ckan.site_url', + "CKAN site URL not defined in production.ini") + CKAN_ROOT_PATH + log.info("CKAN_BASE_URL = {}".format(CKAN_BASE_URL)) + + SERVICE_ACCOUNT_KEY_PATH = ConfigurationManager.get_config_value('app:main', 'ckanext.cloudstorage.service_account_key_path', + "CKAN cloudstorage service account path not defined in production.ini") + log.info("SERVICE_ACCOUNT_KEY_PATH = {}".format(SERVICE_ACCOUNT_KEY_PATH)) + + GCP_BASE_URL = ConfigurationManager.get_config_value('app:main', 'ckanext.cloudstorage.gcp_base_url', + "CKAN cloudstorage GCP base URL not defined in production.ini") + log.info("GCP_BASE_URL = {}".format(GCP_BASE_URL)) + + STORAGE_DIR = ConfigurationManager.get_config_value('app:main', 'ckan.storage_path', + "CKAN storage path not defined in production.ini") + log.info("STORAGE_DIR = {}".format(STORAGE_DIR)) + + PREFIX = ConfigurationManager.get_config_value('app:main', 'ckanext.cloudstorage.prefix', + "CKAN cloudstorage prefix not defined in production.ini") + log.info("PREFIX = {}".format(PREFIX)) + + DOMAIN = ConfigurationManager.get_config_value('app:main', 'ckanext.cloudstorage.domain', + "CKAN cloudstorage domain not defined in production.ini") + log.info("DOMAIN = {}".format(DOMAIN)) + log.info("="*100) + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = SERVICE_ACCOUNT_KEY_PATH + + ckan_manager = CKANManager(CKAN_BASE_URL, STORAGE_DIR, args.ckan_api_key, PREFIX, DOMAIN) + + org_members = ckan_manager.get_members_for_single_org(args.organization) + orgs_with_desc = ckan_manager.get_organization_description(args.organization) + active_users = ckan_manager.get_active_users() + log.info("="*100) + log.info("org_members: {}".format(org_members)) + log.info("orgs_with_desc: {}".format(orgs_with_desc)) + log.info("="*100) + + service_account_key_path = SERVICE_ACCOUNT_KEY_PATH + auth_session = create_id_token_and_auth_session(service_account_key_path) + + manager = OrganizationGroupManager(auth_session, GCP_BASE_URL, DOMAIN, PREFIX) + log.info("Process {} to create groups and add members".format(args.organization)) + log.info("="*100) + if orgs_with_desc and org_members and active_users: + manager.process_organizations(orgs_with_desc, org_members, active_users) + log.info("Sucessfully process organization {}.".format(args.organization)) + log.info("="*100) + else: + log.warning("Not all data available for organization {}. Skipping...".format(args.organization)) + exit(-1) + + log.info("Retrieve packages, and resources for {}".format(args.organization)) + log.info("="*100) + data = ckan_manager.get_data_for_single_org(args.organization) + if data: + ckan_manager.process_resources(data, upload_to_gcp_bucket) + else: + log.error("Failed to retrieve data.") + + +if __name__ == '__main__': + run() diff --git a/ckanext/cloudstorage/etl/org_group_manager.py b/ckanext/cloudstorage/etl/org_group_manager.py new file mode 100644 index 0000000..3c8efe3 --- /dev/null +++ b/ckanext/cloudstorage/etl/org_group_manager.py @@ -0,0 +1,119 @@ +import logging + +from ckanext.cloudstorage.group_service import CreateGroupsCommand +from ckanext.cloudstorage.group_service import GetGroupMembersCommand +from ckanext.cloudstorage.group_service import AddMemberGroupCommand + +# Configure Logging +log = logging.getLogger('OrganizationGroupManager') + + +class ProcessOrganizationError(Exception): + pass + + +class AddingMembersError(Exception): + pass + + +class OrganizationGroupManager: + """ + Manages group creation and member addition for each CKAN organization. + """ + + def __init__(self, auth_session, base_url, domain, prefix=""): + """ + Initialize the manager with an authenticated session and base URL. + + Args: + auth_session: Authenticated session for making API calls. + base_url: Base URL for the group service API. + """ + self.auth_session = auth_session + self.base_url = base_url + self.domain = domain + self.prefix = prefix + + def process_organizations(self, orgs_with_desc, org_members, active_users): + """ + Process each organization to create groups and add members. + + Args: + orgs_with_desc (dict): A list of organizations to process. + """ + responses = [] + try: + for organization, description in orgs_with_desc.items(): + log.info("organization : {}".format(organization)) + group_name = self.prefix + organization + group_email = self.prefix + organization + "@" + self.domain + log.info("group_email : {}".format(group_email)) + payload = { + "name": group_name, + "email": group_email, + "description": description + } + get_group_response = self.get_group_members(group_email) + if get_group_response["success"] == True: + if get_group_response[u"response"][u"status"] == 200: + log.warning("Group <{}> already exists.".format(group_name)) + elif get_group_response[u"response"][u"status"] == 404: + log.info("Group <{}> does not exist yet.".format(group_name)) + self.create_group(payload) + else: + log.error("Group <{}> has not been created.".format(group_name)) + + members_response = self.add_members(organization, group_email, org_members, active_users) + responses.append(members_response) + return {"success": True, "response": responses} + except Exception as e: + log.error("Error processing organization {0}: {1}".format(organization, e)) + raise ProcessOrganizationError("Error processing organization {0}: {1}".format(organization, e)) + + + def create_group(self, payload): + """ + Create a group for the organization. + + Args: + organization (dict): Organization data. + """ + create_group_url = self.base_url + '/groups' + create_group_command = CreateGroupsCommand(self.auth_session, create_group_url, payload) + return create_group_command.execute() + + def get_group_members(self, group_email): + """ + Retrieve a group members for the organization. + + Args: + organization (dict): Organization data. + """ + get_group_url = self.base_url + "/groups/{}/members".format(group_email) + get_group_command = GetGroupMembersCommand(self.auth_session, get_group_url) + return get_group_command.execute() + + def add_members(self, organization, group_email, org_members, active_users): + """ + Add members to the organization's group. + + Args: + organization (dict): Organization data. + """ + add_member_url = self.base_url + '/groups/{}/members'.format(group_email) + member_responses = [] + try: + for org, users_roles in org_members.items(): + if org == organization: + for username, role in users_roles.items(): + member_email = active_users.get(username, '') + log.info("Adding {} to group {}.".format(member_email, group_email)) + # Mapping of internal roles to GCP group roles + map_role = {"admin": "MANAGER", "editor": "MEMBER", "member": "MEMBER", "sysadmin": "OWNER"} + payload = {"email": member_email, "role": map_role[role], "deliverySettings": "NONE"} + add_member_command = AddMemberGroupCommand(self.auth_session, add_member_url, payload) + member_response = add_member_command.execute() + member_responses.append(member_response) + return member_responses + except Exception as e: + raise AddingMembersError("Error adding member to group: {}".format(e)) diff --git a/ckanext/cloudstorage/exception.py b/ckanext/cloudstorage/exception.py new file mode 100644 index 0000000..2787a98 --- /dev/null +++ b/ckanext/cloudstorage/exception.py @@ -0,0 +1,101 @@ +class GCPGroupDeletionError(Exception): + """Exception raised for errors in the GCP group deletion process. + + Attributes: + message -- explanation of the error + """ + + def __init__(self, message="Error occurred during GCP group deletion"): + self.message = message + super(GCPGroupDeletionError, self).__init__(self.message) + + +class GCPGroupCreationError(Exception): + """Exception raised for errors in the GCP group deletion process. + + Attributes: + message -- explanation of the error + """ + + def __init__(self, message="Error occurred during GCP group creation"): + self.message = message + super(GCPGroupCreationError, self).__init__(self.message) + + +class GCPGroupMemberAdditionError(Exception): + """ + Exception raised for errors in adding a member to a GCP group. + + Attributes: + message (str): Explanation of the error + member_email (str): Email of the member being added + group_name (str): Name of the GCP group + """ + + def __init__(self, member_email, group_name, message=None): + self.member_email = member_email + self.group_name = group_name + if message is None: + message = "Failed to add member {} to GCP group {}.".format(member_email, group_name) + super(GCPGroupMemberAdditionError, self).__init__(message) + + +class GCPGroupMemberRemovalError(Exception): + """ + Exception raised for errors in removing a member from a GCP group. + + Attributes: + member_email (str): Email of the member being removed. + group_name (str): Name of the GCP group. + message (str): Explanation of the error. + """ + + def __init__(self, member_email, group_name, message=None): + self.member_email = member_email + self.group_name = group_name + if message is None: + message = "Failed to remove member {} from GCP group {}.".format(member_email, group_name) + super(GCPGroupMemberRemovalError, self).__init__(message) + + +class GetMemberGroupCommandError(Exception): + """Exception raised for errors retrieving member info from group. + + Attributes: + message -- explanation of the error + """ + + def __init__(self, message="Error occurred retrieving member info from group"): + self.message = message + super(GetMemberGroupCommandError, self).__init__(self.message) + + + +class GetGroupMembersCommandError(Exception): + """Exception raised for errors retrieving group info. + + Attributes: + message -- explanation of the error + """ + + def __init__(self, message="Error occurred retrieving group info"): + self.message = message + super(GetGroupMembersCommandError, self).__init__(self.message) + + +class GCPGroupMemberUpdateError(Exception): + """ + Exception raised for errors in updating a member's information in a GCP group. + + Attributes: + member_email (str): Email of the member being updated. + group_name (str): Name of the GCP group. + message (str): Explanation of the error. + """ + + def __init__(self, member_email, group_name, message=None): + self.member_email = member_email + self.group_name = group_name + if message is None: + message = "Failed to update member {} in GCP group {}.".format(member_email, group_name) + super(GCPGroupMemberUpdateError, self).__init__(message) diff --git a/ckanext/cloudstorage/google_storage.py b/ckanext/cloudstorage/google_storage.py new file mode 100644 index 0000000..ef56ddd --- /dev/null +++ b/ckanext/cloudstorage/google_storage.py @@ -0,0 +1,249 @@ +import binascii +import collections +import datetime +import hashlib +from six.moves.urllib.parse import quote + +from oauth2client.service_account import ServiceAccountCredentials +# Required for Google Cloud authentication +from google.oauth2 import service_account + +# Required for handling Python 2 and 3 compatibility issues +import six + +def generate_signed_url( + service_account_file, # Path to the service account file + bucket_name, # Name of the GCS bucket + object_name, # Name of the object in the bucket + subresource=None, # Optional subresource of the object + expiration=604800, # Expiration time of the URL in seconds (default 7 days) + http_method="GET", # HTTP method for the access (default GET) + query_parameters=None, # Additional query parameters for the URL + headers=None # HTTP headers for the request +): + """ + Generate a signed URL to access a Google Cloud Storage object. + + This function creates a signed URL that provides temporary access to a specific object + in a Google Cloud Storage bucket. The URL will be valid for the specified duration. + + Args: + service_account_file (str): Path to the service account JSON file. + bucket_name (str): Name of the Google Cloud Storage bucket. + object_name (str): Name of the object in the bucket. + subresource (str, optional): Subresource of the object, if any. Defaults to None. + expiration (int, optional): Time in seconds until the URL expires. Defaults to 604800 (7 days). + http_method (str, optional): HTTP method for the access. Defaults to "GET". + query_parameters (dict, optional): Additional query parameters. Defaults to None. + headers (dict, optional): HTTP headers for the request. Defaults to None. + + Returns: + str: A signed URL for accessing the specified storage object. + + Raises: + ValueError: If the expiration time exceeds 7 days (604800 seconds). + """ + + # Validate expiration time + if expiration > 604800: + raise ValueError("Expiration time can't be longer than 604800 seconds (7 days).") + + # Encode the object name for URL + escaped_object_name = quote(six.ensure_binary(object_name), safe=b"/~") + canonical_uri = "/{}".format(escaped_object_name) + + # Get the current time in UTC + datetime_now = datetime.datetime.utcnow() + request_timestamp = datetime_now.strftime('%Y%m%dT%H%M%SZ') + datestamp = datetime_now.strftime('%Y%m%d') + + # Load Google credentials + google_credentials = service_account.Credentials.from_service_account_file( + service_account_file + ) + client_email = google_credentials.service_account_email + credential_scope = "{}/auto/storage/goog4_request".format(datestamp) + credential = "{}/{}".format(client_email, credential_scope) + + # Set default headers if not provided + if headers is None: + headers = dict() + host = "{}.storage.googleapis.com".format(bucket_name) + headers["host"] = host + + # Create canonical headers string + canonical_headers = "" + ordered_headers = collections.OrderedDict(sorted(headers.items())) + for k, v in ordered_headers.items(): + lower_k = str(k).lower() + strip_v = str(v).strip() + canonical_headers += "{}:{}\n".format(lower_k, strip_v) + + # Create signed headers string + signed_headers = ";".join(ordered_headers.keys()).lower() + + # Set default query parameters if not provided + if query_parameters is None: + query_parameters = dict() + query_parameters.update({ + "X-Goog-Algorithm": "GOOG4-RSA-SHA256", + "X-Goog-Credential": credential, + "X-Goog-Date": request_timestamp, + "X-Goog-Expires": str(expiration), + "X-Goog-SignedHeaders": signed_headers + }) + if subresource: + query_parameters[subresource] = "" + + # Create canonical query string + canonical_query_string = "&".join( + "{}={}".format(quote(str(k), safe=''), quote(str(v), safe='')) + for k, v in sorted(query_parameters.items()) + ).rstrip('&') + + # Create canonical request + canonical_request = "\n".join([ + http_method, + canonical_uri, + canonical_query_string, + canonical_headers, + signed_headers, + "UNSIGNED-PAYLOAD", + ]) + + # Hash the canonical request + canonical_request_hash = hashlib.sha256(canonical_request.encode()).hexdigest() + + # Create the string to sign + string_to_sign = "\n".join([ + "GOOG4-RSA-SHA256", + request_timestamp, + credential_scope, + canonical_request_hash, + ]) + + # Sign the string to sign using the service account's private key + signature = binascii.hexlify( + google_credentials.signer.sign(string_to_sign.encode()) + ).decode() + + # Construct the final signed URL + scheme_and_host = "https://{}".format(host) + signed_url = "{}{}?{}&x-goog-signature={}".format( + scheme_and_host, canonical_uri, canonical_query_string, signature + ) + + return signed_url + +def generate_signed_url_with_impersonated_user( + service_account_file, # Path to the service account file + bucket_name, # Name of the GCS bucket + object_name, # Name of the object in the bucket + impersonate_user, # Email of the user to impersonate + subresource=None, # Optional subresource of the object + expiration=604800, # Expiration time of the URL in seconds (default 7 days) + http_method="GET", # HTTP method for the access (default GET) + query_parameters=None, # Additional query parameters for the URL + headers=None # HTTP headers for the request +): + """ + Generate a signed URL to access a Google Cloud Storage object with impersonation. + + This function creates a signed URL that provides temporary access to a specific object + in a Google Cloud Storage bucket. The URL will be valid for the specified duration. + The access is performed by impersonating a specific user via the service account. + """ + + # Validate expiration time + if expiration > 604800: + raise ValueError("Expiration time can't be longer than 604800 seconds (7 days).") + + # Encode the object name for URL + escaped_object_name = quote(six.ensure_binary(object_name), safe=b"/~") + canonical_uri = "/{}".format(escaped_object_name) + + # Get the current time in UTC + datetime_now = datetime.datetime.utcnow() + request_timestamp = datetime_now.strftime('%Y%m%dT%H%M%SZ') + datestamp = datetime_now.strftime('%Y%m%d') + + # Load Google credentials and enable domain-wide delegation for impersonation + scopes = ['https://www.googleapis.com/auth/devstorage.read_write'] + credentials = ServiceAccountCredentials.from_json_keyfile_name( + service_account_file, scopes=scopes + ) + credentials = credentials.create_delegated(impersonate_user) + + client_email = credentials.service_account_email + credential_scope = "{}/auto/storage/goog4_request".format(datestamp) + credential = "{}/{}".format(client_email, credential_scope) + + # Set default headers if not provided + if headers is None: + headers = dict() + host = "{}.storage.googleapis.com".format(bucket_name) + headers["host"] = host + + # Create canonical headers string + canonical_headers = "" + ordered_headers = collections.OrderedDict(sorted(headers.items())) + for k, v in ordered_headers.items(): + lower_k = str(k).lower() + strip_v = str(v).strip() + canonical_headers += "{}:{}\n".format(lower_k, strip_v) + + # Create signed headers string + signed_headers = ";".join(ordered_headers.keys()).lower() + + # Set default query parameters if not provided + if query_parameters is None: + query_parameters = dict() + query_parameters.update({ + "X-Goog-Algorithm": "GOOG4-RSA-SHA256", + "X-Goog-Credential": credential, + "X-Goog-Date": request_timestamp, + "X-Goog-Expires": str(expiration), + "X-Goog-SignedHeaders": signed_headers + }) + if subresource: + query_parameters[subresource] = "" + + # Create canonical query string + canonical_query_string = "&".join( + "{}={}".format(quote(str(k), safe=''), quote(str(v), safe='')) + for k, v in sorted(query_parameters.items()) + ).rstrip('&') + + # Create canonical request + canonical_request = "\n".join([ + http_method, + canonical_uri, + canonical_query_string, + canonical_headers, + signed_headers, + "UNSIGNED-PAYLOAD", + ]) + + # Hash the canonical request + canonical_request_hash = hashlib.sha256(canonical_request.encode()).hexdigest() + + # Create the string to sign + string_to_sign = "\n".join([ + "GOOG4-RSA-SHA256", + request_timestamp, + credential_scope, + canonical_request_hash, + ]) + + # Sign the string to sign using the service account's private key + signature = binascii.hexlify( + credentials.sign_blob(string_to_sign)[1] + ).decode() + + # Construct the final signed URL + scheme_and_host = "https://{}".format(host) + signed_url = "{}{}?{}&x-goog-signature={}".format( + scheme_and_host, canonical_uri, canonical_query_string, signature + ) + + return signed_url diff --git a/ckanext/cloudstorage/group_service.py b/ckanext/cloudstorage/group_service.py new file mode 100644 index 0000000..524b1b4 --- /dev/null +++ b/ckanext/cloudstorage/group_service.py @@ -0,0 +1,193 @@ +from time import sleep +import logging +from abc import ABCMeta, abstractmethod + +from ckanext.cloudstorage.bucket import create_bucket +from ckanext.cloudstorage.bucket import add_group_iam_permissions +from ckanext.cloudstorage.exception import GCPGroupCreationError +from ckanext.cloudstorage.exception import GCPGroupMemberAdditionError +from ckanext.cloudstorage.exception import GCPGroupDeletionError +from ckanext.cloudstorage.exception import GetMemberGroupCommandError +from ckanext.cloudstorage.exception import GCPGroupMemberUpdateError +from ckanext.cloudstorage.exception import GCPGroupMemberRemovalError +from ckanext.cloudstorage.exception import GetGroupMembersCommandError + +log = logging.getLogger(__name__) + +class Command: + """ + Abstract base class for all commands, requiring an execute method. + """ + + __metaclass__ = ABCMeta + + @abstractmethod + def execute(self): + pass + + +class CreateGroupsCommand(Command): + """ + Concrete command to create groups using a POST request. + """ + def __init__(self, auth_session, url, payload, cloud_storage=None): + self.auth_session = auth_session + self.url = url + self.payload = payload + self.cloud_storage = cloud_storage + + def execute(self): + try: + response = self.auth_session.post(url=self.url, json=self.payload) + response.raise_for_status() # Raises an exception for 4XX/5XX responses + + create_bucket(bucket_name=self.payload["name"], cloud_storage=self.cloud_storage) + # Wait 20s (required for Group OWNER registration) + sleep(20) + + add_group_iam_permissions( + bucket_name=self.payload["name"], + group_email=self.payload["email"] + ) + + except Exception as e: + if response.json()[u"status"] == 409: + log.warning("Group {} already exists".format(self.payload["name"])) + return + log.error("Error in group creation or bucket setup: {}".format(e)) + raise GCPGroupCreationError("Error in group creation or bucket setup: {}".format(e)) + +class GetGroupMembersCommand(Command): + """ + Concrete command to retrieve group members using a GET request. + """ + def __init__(self, auth_session, url): + self.auth_session = auth_session + self.url = url + + def execute(self): + try: + response = self.auth_session.get(url=self.url) + response.raise_for_status() # Raises an HTTPError for bad responses + return {"success": True, "response": response.json()} + except Exception as e: + if response.json()[u"status"] == 404: + log.warning("Group members not found: {}".format(e)) + return {"success": True, "response": response.json()} + log.error("Unexpected Error retrieving group memebers: {}".format(e)) + raise GetGroupMembersCommandError("Unexpected Error retrieving group: {}".format(e)) + + +class AddMemberGroupCommand(Command): + """ + Concrete command to add a member to a group using a POST request. + """ + def __init__(self, auth_session, url, payload): + self.auth_session = auth_session + self.url = url + self.payload = payload + + def execute(self): + try: + response = self.auth_session.post(url=self.url, json=self.payload) + response.raise_for_status() # Raises an HTTPError for bad responses + return {"success": True, "response": response.json()} + except Exception as e: + if response.json()[u"status"] == 409: + log.warning("member {} already exists".format(self.payload["email"])) + return + log.error("Unexpected error when adding member to a group: {}".format(e)) + raise GCPGroupMemberAdditionError("Unexpected error when adding member to a group: {}".format(e)) + + +class GetMemberGroupCommand(Command): + """ + Concrete command to retrieve a member from a group using a GET request. + """ + def __init__(self, auth_session, url): + self.auth_session = auth_session + self.url = url + + def execute(self): + try: + response = self.auth_session.get(url=self.url) + response.raise_for_status() # Raises an HTTPError for bad responses + return {"success": True, "response": response.json()} + except Exception as e: + if response.json()[u"status"] == 404: + log.warning("Group member not found: {}".format(e)) + return {"success": True, "response": response.json()} + log.error("Unexpected error when retrieving a member from a group: {}".format(e)) + raise GetMemberGroupCommandError("Unexpected error when retrieving a member from a group: {}".format(e)) + + +class UpdateMemberGroupCommand(Command): + """ + Concrete command to update a member that belongs to group using a PUT request. + """ + def __init__(self, auth_session, url, payload): + self.auth_session = auth_session + self.url = url + self.payload = payload + + def execute(self): + try: + response = self.auth_session.put(url=self.url, json=self.payload) + response.raise_for_status() # Raises an HTTPError for bad responses + return True + except Exception as e: + log.error("Unexpected error when updating member info to group: {}".format(e)) + raise GCPGroupMemberUpdateError("Unexpected error when updating member info to group: {}".format(e)) + + +class DeleteMemberGroupCommand(Command): + """ + Concrete command to delete a member from a group using a DELETE request. + """ + def __init__(self, auth_session, url): + self.auth_session = auth_session + self.url = url + + def execute(self): + try: + response = self.auth_session.delete(url=self.url) + response.raise_for_status() # Raises an HTTPError for bad responses + return True + except Exception as e: + log.error("Unexpected error when deleting member from a group: {}".format(e)) + raise GCPGroupMemberRemovalError("Unexpected error when deleting member from a group: {}".format(e)) + + +class DeleteGroupsCommand(Command): + """ + Concrete command to delete a group using a DELETE request. + """ + def __init__(self, auth_session, url): + self.auth_session = auth_session + self.url = url + + def execute(self): + try: + response = self.auth_session.delete(url=self.url) + response.raise_for_status() # Raises an HTTPError for bad responses + return True + except Exception as e: + log.error("Unexpected error when deleting group: {}".format(e)) + raise GCPGroupDeletionError("Unexpected error when deleting group: {}".format(e)) + + +class APIInvoker: + """ + Invoker class to execute a series of commands. + """ + def __init__(self): + self.commands = [] + + def add_command(self, command): + self.commands.append(command) + + def run(self): + responses = [] + for command in self.commands: + responses.append(command.execute()) + return responses diff --git a/ckanext/cloudstorage/logic/action/action.py b/ckanext/cloudstorage/logic/action/action.py new file mode 100644 index 0000000..eb06475 --- /dev/null +++ b/ckanext/cloudstorage/logic/action/action.py @@ -0,0 +1,359 @@ +# encoding: utf-8 +import logging +import os +import ckan.plugins as plugins +import ckan.logic as logic +from pylons import config + +from ckanext.cloudstorage.logic.auth.auth import can_create_gcp_group +from ckanext.cloudstorage.logic.auth.auth import can_delete_gcp_group +from ckanext.cloudstorage.logic.auth.auth import can_delete_member_from_gcp_group +from ckanext.cloudstorage.logic.auth.auth import can_create_member_from_gcp_group +from ckanext.cloudstorage.group_service import CreateGroupsCommand +from ckanext.cloudstorage.group_service import DeleteGroupsCommand +from ckanext.cloudstorage.group_service import AddMemberGroupCommand +from ckanext.cloudstorage.group_service import UpdateMemberGroupCommand +from ckanext.cloudstorage.group_service import GetMemberGroupCommand +from ckanext.cloudstorage.group_service import DeleteMemberGroupCommand +from ckanext.cloudstorage.storage import CloudStorage +from ckanext.cloudstorage.authorization import create_id_token_and_auth_session +from ckan.common import _, request + + +log = logging.getLogger(__name__) + + +# Define some shortcuts +# Ensure they are module-private so that they don't get loaded as available +# actions in the action API. +NotFound = logic.NotFound +NotAuthorized = logic.NotAuthorized +ValidationError = logic.ValidationError + +PREFIX = config['ckanext.cloudstorage.prefix'] +DOMAIN = config['ckanext.cloudstorage.domain'] +SERVICE_ACCOUNT_KEYPATH = config['ckanext.cloudstorage.service_account_key_path'] + + +@plugins.toolkit.chained_action +def organization_create(next_auth, context, data_dict): + """ + Extends CKAN organization creation with optional GCP group workspace creation. + + This function facilitates the creation of a Google Cloud Platform (GCP) group + workspace alongside a CKAN organization. It checks for the 'gcp_group_create' flag + in 'data_dict'. If true and the user is authorized, a GCP group workspace is + created using the organization's name and description. + + The function validates mandatory fields 'name' and 'description', checks user + authorization, and uses service account credentials for GCP group workspace creation. + It handles GCP API responses and errors, adjusting 'gcp_group_create' flag and + raising exceptions as needed. + + Args: + next_auth (function): The next authorization function in the chain. + context (dict): Context including environmental settings and user info. + data_dict (dict): Data for organization creation, including 'name', 'description', + and 'gcp_group_create'. + + Returns: + dict: Modified 'data_dict' after processing, passed to the next function in chain. + """ + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = SERVICE_ACCOUNT_KEYPATH + log.info("Starting organization creation process with GCP group workspace option.") + + url = "https://gcp.fao.org/groups-service/api/v1/groups" + + # Validate mandatory fields: 'name' and 'description' + name = str(data_dict.get('name')).encode('ascii', 'ignore') + if name is None: + log.error("Missing 'name' in data_dict") + raise ValidationError({'name': _('Missing value')}) + + description = str(data_dict.get('description')).encode('ascii', 'ignore') + if description is None: + log.error("Missing 'description' in data_dict") + raise ValidationError({'description': _('Missing value')}) + + log.info("Validated input data: name=%s, description=%s", name, description) + + # Authorization check for GCP group workspace creation + if not can_create_gcp_group(context['auth_user_obj']): + log.warning("User %s unauthorized to create GCP group workspace", context['auth_user_obj']) + raise NotAuthorized(_('User not authorized to create an organization or a gcp group workspace.')) + + username = str(context['user']).encode('ascii', 'ignore') + model = context['model'] + user = model.User.get(username) + member_email = str(user.email).encode('ascii', 'ignore') + role = "sysadmin" + group_email = PREFIX + name + "@" + DOMAIN + group_name = PREFIX + name + + log.info("Preparing to create GCP group workspace: %s", group_email) + + # Create GCP group workspace if authorized + auth_session = create_id_token_and_auth_session(SERVICE_ACCOUNT_KEYPATH) + payload = { + "name": group_name, + "email": group_email, + "description": description + } + cloud_storage = CloudStorage() + create_group = CreateGroupsCommand(auth_session, url, payload, cloud_storage) + create_group.execute() + log.info("GCP group %s created successfully.", group_email) + + # Add sysadmin user as owner of gcp group + map_role = {"sysadmin": "OWNER"} + url = "https://gcp.fao.org/groups-service/api/v1/groups/{}/members".format(group_email) + payload = {"email": member_email, "role": map_role[role], "deliverySettings": "NONE"} + add_member_to_gcp = AddMemberGroupCommand(auth_session, url, payload) + add_member_to_gcp.execute() + log.info("Added member %s to GCP group %s as %s.", member_email, group_email, map_role[role]) + log.info("Organization creation with GCP group workspace completed successfully.") + # Proceed to the next action in the chain + return next_auth(context, data_dict) + +@plugins.toolkit.chained_action +def organization_update(next_auth, context, data_dict): + """ + Updates an organization's details after performing authorization checks. + + This method is a chained action for updating organization details. It first + checks if the organization exists and if the user has the authorization to + edit it. If the organization does not exist or the user is not authorized, + it raises a ValidationError. Otherwise, it passes the call to the next + authorization function in the chain. + + :param next_auth: The next authorization function in the chain. + :param context: The context dictionary containing relevant information. + :param data_dict: The data dictionary containing organization details. + :return: The result of the next authorization function. + :raises ValidationError: If the organization does not exist or the user + is not authorized to edit it. + """ + model = context['model'] + name = data_dict.get('name') + title = data_dict.get('title') + + org = model.Group.get(name) + if org is None: + import ckan.lib.helpers as h + h.flash_error("You are not authorized to edit or update organization name") + raise ValidationError("You are not authorized to edit or update organization name") + + return next_auth(context,data_dict) + +@plugins.toolkit.chained_action +def organization_delete(next_auth, context, data_dict): + """ + Deletes an organization and its corresponding GCP group workspace. + + This function handles the deletion of a CKAN organization and its associated Google + Cloud Platform (GCP) group workspace. It checks if the user is authorized and + proceeds with the deletion process. + + Args: + next_auth (function): Next authorization function in the chain. + context (dict): Context dictionary with environment and user information. + data_dict (dict): Data dictionary containing organization ID. + + Raises: + ValidationError: If the organization ID is missing. + NotAuthorized: If user lacks permission to delete the organization or GCP group. + GCPGroupDeletionError: For errors during GCP group deletion. + + Returns: + dict: Result from the next authorization function in the chain. + """ + log.info("Starting organization deletion process with corresponding GCP group workspace.") + + # Extract the organization ID from the provided data + group_id = str(data_dict.get('id')).encode('ascii', 'ignore') + if group_id is None: + log.error("Missing 'id' in data_dict") + raise ValidationError({'id': _('Missing value')}) + + # Retrieve the organization object using the model from the context + model = context['model'] + org = model.Group.get(group_id) + + # Encode the organization name to ASCII, ignoring non-ASCII characters + group_name = str(org.name).encode('ascii', 'ignore') + + # Construct the email for the GCP group using the group name + group_email = PREFIX + group_name + "@" + DOMAIN + url = "https://gcp.fao.org/groups-service/api/v1/groups/{}".format(group_email) + + log.info("Prepared to delete GCP group workspace: %s", group_email) + + # Check if the user is authorized to delete the GCP group workspace + if not can_delete_gcp_group(context['auth_user_obj']): + log.warning("User %s unauthorized to delete GCP group workspace", context['auth_user_obj']) + raise NotAuthorized(_('User not authorized to delete an organization or a gcp group workspace.')) + + # Create an authenticated session + auth_session = create_id_token_and_auth_session(SERVICE_ACCOUNT_KEYPATH) + # Create a command to delete the group and execute it + delete_group = DeleteGroupsCommand(auth_session, url) + delete_group.execute() + log.info("GCP group %s deleted successfully.", group_email) + log.info("Organization deletion with corresponding GCP group workspace completed successfully.") + + # Proceed to the next authorization function + return next_auth(context, data_dict) + + + +@plugins.toolkit.chained_action +def organization_member_create(next_auth, context, data_dict): + """ + Creates or updates a member in a GCP group workspace based on their role. + + This function handles the creation or update of a member in a Google Cloud Platform (GCP) + group workspace. It sets the member's role and adds them to the group if they do not exist. + + Args: + next_auth: The next authorization function in the chain. + context: The context dictionary containing environment and user information. + data_dict: A dictionary containing the member's information. + + Raises: + NotAuthorized: If the user is not authorized to create/update a member in GCP group. + GCPGroupMemberUpdateError: If there is an error updating the member in GCP group. + GCPGroupMemberAdditionError: If there is an error adding the member to GCP group. + + Returns: + The result of the next authorization function. + """ + + log.info("Starting process to create or update a member in a GCP group workspace.") + + # Extracting member information from the provided data + role = data_dict.get("role") + if role is None: + log.error("Missing 'role' in data_dict") + raise ValidationError({'role': _('Missing value')}) + role = str(data_dict.get("role")).encode('ascii', 'ignore') + + org_id = data_dict.get("id") + if org_id is None: + log.error("Missing 'id' in data_dict") + raise ValidationError({'id': _('Missing value')}) + org_id = str(data_dict.get("id")).encode('ascii', 'ignore') + + username = str(context['user']).encode('ascii', 'ignore') + + log.info("Member data extracted: role=%s, username=%s, org_id=%s", role, username, org_id) + + # Retrieve the user object using the model from the context + model = context['model'] + user = model.User.get(username) + + if not user: + log.error("User not found in the model: %s", username) + raise NotFound(_('User not found.')) + + # Mapping of internal roles to GCP group roles + map_role = {"admin": "MANAGER", "editor": "MEMBER", "member": "MEMBER", "sysadmin": "OWNER"} + + # Construct the group email using the group name from context + group_name = str(context["group"].name).encode('ascii', 'ignore') + group_email = PREFIX + group_name + "@" + DOMAIN + member_email = str(user.email).encode('ascii', 'ignore') + + log.info("GCP group email constructed: %s", group_email) + + # Authorization check for creating/updating member in GCP group + if not can_create_member_from_gcp_group(context['auth_user_obj'], username, org_id): + log.warning("User %s unauthorized to create or update a member in GCP group workspace", context['auth_user_obj']) + raise NotAuthorized(_('User not authorized to create or update a member within gcp group workspace.')) + + # Construct the URL for GCP group service and check if the member exists + url = "https://gcp.fao.org/groups-service/api/v1/groups/{}/members/{}".format(group_email, member_email) + auth_session = create_id_token_and_auth_session(SERVICE_ACCOUNT_KEYPATH) + get_member_from_gcp = GetMemberGroupCommand(auth_session, url) + response = get_member_from_gcp.execute() + log.info("Response received for member existence check: %s", response) + + status = response["response"][u"status"] + if status == 200: + # If member exists, update their role + payload = {"email": member_email, "role": map_role[role], "deliverySettings": "NONE"} + update_member = UpdateMemberGroupCommand(auth_session, url, payload) + update_member.execute() + log.info("Member %s updated in GCP group %s with role %s.", member_email, group_email, map_role[role]) + else: + # If member does not exist, add them to the group + url = "https://gcp.fao.org/groups-service/api/v1/groups/{}/members".format(group_email) + payload = {"email": member_email, "role": map_role[role], "deliverySettings": "NONE"} + add_member_to_gcp = AddMemberGroupCommand(auth_session, url, payload) + response = add_member_to_gcp.execute() + log.info("Member %s added to GCP group %s with role %s.", member_email, group_email, map_role[role]) + + log.info("Process to create or update a member in a GCP group workspace completed successfully.") + + return next_auth(context, data_dict) + + +@plugins.toolkit.chained_action +def organization_member_delete(next_auth, context, data_dict): + """ + Deletes a member from a GCP group workspace based on organization and user ID. + + This function is responsible for removing a member from a GCP group workspace. It + utilizes the member's user ID and the organization ID for this operation. Proper + authorization is required to execute this function. + + Args: + next_auth (function): Next authorization function in the chain. + context (dict): Context containing environment and user information. + data_dict (dict): Dictionary with the member's user ID and organization ID. + + Raises: + ValidationError: If the 'id' field is missing in data_dict. + NotAuthorized: If user lacks permission to delete the member from GCP group. + GCPGroupMemberRemovalError: For errors during removal of the member from GCP group. + + Returns: + dict: Result from the next authorization function in the chain. + """ + + log.info("Starting process to delete a member from a GCP group workspace.") + + # Extracting member and organization information + username = str(context['user']).encode('ascii', 'ignore') + + group_id = data_dict.get("id") + if group_id is None: + log.error("Missing 'id' in data_dict") + raise ValidationError({'id': _('Missing value')}) + group_id = str(group_id).encode('ascii', 'ignore') + + log.info("Extracted member and organization information: username=%s, group_id=%s", username, group_id) + + # Retrieve user and group objects using the model from the context + model = context['model'] + user = model.User.get(username) + group = model.Group.get(group_id) + group_name = str(group.name).encode('ascii', 'ignore') + group_email = PREFIX + group_name + "@" + DOMAIN + member_email = str(user.email).encode('ascii', 'ignore') + + log.info("Prepared member and group emails for deletion: group_email=%s, member_email=%s", group_email, member_email) + + # Authorization check for deleting member from GCP group + if not can_delete_member_from_gcp_group(context['auth_user_obj'], username, group_id): + log.warning("User %s unauthorized to delete a member of GCP group workspace", context['auth_user_obj']) + raise NotAuthorized(_('User not authorized to delete a member of a gcp group workspace.')) + + # Construct the URL for GCP group service and execute the delete command + url = "https://gcp.fao.org/groups-service/api/v1/groups/{}/members/{}".format(group_email, member_email) + auth_session = create_id_token_and_auth_session(SERVICE_ACCOUNT_KEYPATH) + delete_member_from_gcp = DeleteMemberGroupCommand(auth_session, url) + delete_member_from_gcp.execute() + log.info("Member %s deleted from GCP group %s successfully.", member_email, group_email) + log.info("Member deletion from GCP group workspace completed successfully.") + + return next_auth(context, data_dict) diff --git a/ckanext/cloudstorage/logic/auth/auth.py b/ckanext/cloudstorage/logic/auth/auth.py new file mode 100644 index 0000000..435bdd2 --- /dev/null +++ b/ckanext/cloudstorage/logic/auth/auth.py @@ -0,0 +1,50 @@ +import ckan.logic as logic +import ckan.authz as authz +import ckan.plugins.toolkit as t +_ = t._ +c = t.c +import ckan.model as model +import ckan.logic as logic + +ValidationError = logic.ValidationError +NotFound = logic.NotFound + + +def users_role_for_group_or_org(user_name, org_id): + ''' Returns the user's role for the group. (Ignores privileges that cascade + in a group hierarchy.) + + ''' + + user_id = authz.get_user_id_for_username(user_name, allow_none=True) + if not user_id: + return None + # get any roles the user has for the group + q = model.Session.query(model.Member) \ + .filter(model.Member.table_name == 'user') \ + .filter(model.Member.group_id == org_id) \ + .filter(model.Member.state == 'active') \ + .filter(model.Member.table_id == user_id) + # return the first role we find + for row in q.all(): + return row.capacity + return None + +def is_sysadmin(username): + authz.is_sysadmin(username) + +def is_admin_of_org(username, org_id): + ''' Returns True is username is admin of an organization ''' + return users_role_for_group_or_org(username, org_id) == 'admin' + +def can_create_gcp_group(user): + return user.sysadmin + +def can_delete_gcp_group(user): + return user.sysadmin + +def can_create_member_from_gcp_group(user, username, org_id): + return user.sysadmin or is_admin_of_org(username, org_id) + +def can_delete_member_from_gcp_group(user, username, org_id): + return user.sysadmin or is_admin_of_org(username, org_id) diff --git a/ckanext/cloudstorage/plugin.py b/ckanext/cloudstorage/plugin.py index 5d7a939..94ccfe1 100644 --- a/ckanext/cloudstorage/plugin.py +++ b/ckanext/cloudstorage/plugin.py @@ -6,7 +6,9 @@ from ckanext.cloudstorage import storage from ckanext.cloudstorage import helpers import ckanext.cloudstorage.logic.action.multipart as m_action +import ckanext.cloudstorage.logic.action.action as org_action import ckanext.cloudstorage.logic.auth.multipart as m_auth +import ckanext.cloudstorage.logic.auth.auth as org_auth class CloudStoragePlugin(plugins.SingletonPlugin): @@ -89,6 +91,11 @@ def get_actions(self): 'cloudstorage_abort_multipart': m_action.abort_multipart, 'cloudstorage_check_multipart': m_action.check_multipart, 'cloudstorage_clean_multipart': m_action.clean_multipart, + 'organization_create': org_action.organization_create, + 'organization_update': org_action.organization_update, + 'organization_delete': org_action.organization_delete, + 'organization_member_create': org_action.organization_member_create, + 'organization_member_delete': org_action.organization_member_delete, } # IAuthFunctions diff --git a/ckanext/cloudstorage/storage.py b/ckanext/cloudstorage/storage.py index 903390b..0bf3c24 100644 --- a/ckanext/cloudstorage/storage.py +++ b/ckanext/cloudstorage/storage.py @@ -7,19 +7,27 @@ from ast import literal_eval from datetime import datetime, timedelta from tempfile import SpooledTemporaryFile +import logging from pylons import config from ckan import model from ckan.lib import munge import ckan.plugins as p +import ckan.model as model +from ckan.plugins import toolkit +import ckan.authz as authz +import ckan.logic as logic from libcloud.storage.types import Provider, ObjectDoesNotExistError from libcloud.storage.providers import get_driver - from werkzeug.datastructures import FileStorage as FlaskFileStorage + ALLOWED_UPLOAD_TYPES = (cgi.FieldStorage, FlaskFileStorage) +NotAuthorized = logic.NotAuthorized + +log = logging.getLogger(__name__) def _get_underlying_file(wrapper): if isinstance(wrapper, FlaskFileStorage): @@ -29,16 +37,20 @@ def _get_underlying_file(wrapper): class CloudStorage(object): def __init__(self): - self.driver = get_driver( - getattr( - Provider, - self.driver_name - ) - )(**self.driver_options) - self._container = None + """ + Initialize the CloudStorage with a specific storage driver. + """ + try: + # Dynamically get the driver class from the Provider. + driver_class = get_driver(getattr(Provider, self.driver_name)) + # Initialize the driver with the provided options. + self.driver = driver_class(**self.driver_options) + except AttributeError: + raise ValueError("Invalid driver name: {}".format(self.driver_name)) + except Exception as e: + raise ConnectionError("Failed to initialize driver: {}".format(e)) - def path_from_filename(self, rid, filename): - raise NotImplemented + self._container = None @property def container(self): @@ -67,12 +79,27 @@ def driver_name(self): is configured to use. - .. note:: + .. note:: This value is used to lookup the apache-libcloud driver to use based on the Provider enum. """ return config['ckanext.cloudstorage.driver'] + + @property + def prefix(self): + """ + The prefix of container or group name + """ + return config['ckanext.cloudstorage.prefix'] + + + @property + def domain(self): + """ + gcp domain + """ + return config['ckanext.cloudstorage.domain'] @property def container_name(self): @@ -82,6 +109,16 @@ def container_name(self): """ return config['ckanext.cloudstorage.container_name'] + @container_name.setter + def container_name(self, value): + """ + Set the name of the container. + """ + # Optional: Add validation or processing here + self._container_name = value + # Optional: Reset or update the container if necessary + self._container = None + @property def use_secure_urls(self): """ @@ -103,6 +140,55 @@ def leave_files(self): config.get('ckanext.cloudstorage.leave_files', False) ) + + @property + def guess_mimetype(self): + """ + `True` if ckanext-cloudstorage is configured to guess mime types, + `False` otherwise. + """ + return p.toolkit.asbool( + config.get('ckanext.cloudstorage.guess_mimetype', False) + ) + + @property + def proxy_download(self): + """ + If the ckan may stream the object (will use service account to download + from private storages) + """ + return p.toolkit.asbool( + config.get('ckanext.cloudstorage.proxy_download', False) + ) + + @property + def can_use_advanced_google(self): + """ + `True` if the `google-auth` module is installed and + ckanext-cloudstorage has been configured to use Google, otherwise + `False`. + """ + # Are we even using GOOGLE? + if self.driver_name == 'GOOGLE_STORAGE': + try: + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.driver_options["secret"] + # Yes? Is the 'google-auth' package available? + from google.auth import crypt + assert crypt + # check six >=1.5 + import six + assert six.ensure_binary + return True + except ImportError: + # fail fast + # if we configure a google storage and we have secure_urls, + # we may want to be sure to have it installed at runtime + if self.use_secure_urls: + raise + + return False + + @property def can_use_advanced_azure(self): """ @@ -142,60 +228,97 @@ def can_use_advanced_aws(self): return False - @property - def guess_mimetype(self): - """ - `True` if ckanext-cloudstorage is configured to guess mime types, - `False` otherwise. - """ - return p.toolkit.asbool( - config.get('ckanext.cloudstorage.guess_mimetype', False) - ) - class ResourceCloudStorage(CloudStorage): def __init__(self, resource): - """ - Support for uploading resources to any storage provider - implemented by the apache-libcloud library. + """ + Support for uploading resources to any storage provider + implemented by the apache-libcloud library. - :param resource: The resource dict. - """ - super(ResourceCloudStorage, self).__init__() + :param resource: The resource dict. + """ + log.info("Initializing ResourceCloudStorage with resource: %s", resource) + super(ResourceCloudStorage, self).__init__() - self.filename = None - self.old_filename = None - self.file = None - self.resource = resource + self.resource = resource + self.filename = None + self.old_filename = None + self.file_upload = None + self._initialize_storage_settings() + self._handle_file_upload(resource) + self._handle_clear_upload(resource) + + def _initialize_storage_settings(self): + """ + Initialize storage settings from the resource. + """ + self.role = str(self.get_user_role_in_organization()).encode('ascii', 'ignore') + self.container_name = self.get_container_name_of_current_org() + self.group_email = self.container_name + "@" + self.domain + + def _handle_file_upload(self, resource): + """ + Handle the file upload process. + """ + log.info("Handling file upload for resource: %s", resource) upload_field_storage = resource.pop('upload', None) - self._clear = resource.pop('clear_upload', None) multipart_name = resource.pop('multipart_name', None) - # Check to see if a file has been provided if isinstance(upload_field_storage, (ALLOWED_UPLOAD_TYPES)): - self.filename = munge.munge_filename(upload_field_storage.filename) - self.file_upload = _get_underlying_file(upload_field_storage) - resource['url'] = self.filename - resource['url_type'] = 'upload' + self._process_file_upload(upload_field_storage, resource) elif multipart_name and self.can_use_advanced_aws: - # This means that file was successfully uploaded and stored - # at cloud. - # Currently implemented just AWS version - resource['url'] = munge.munge_filename(multipart_name) - resource['url_type'] = 'upload' - elif self._clear and resource.get('id'): - # Apparently, this is a created-but-not-commited resource whose - # file upload has been canceled. We're copying the behaviour of - # ckaenxt-s3filestore here. - old_resource = model.Session.query( - model.Resource - ).get( - resource['id'] - ) + self._process_multipart_upload(multipart_name, resource) - self.old_filename = old_resource.url - resource['url_type'] = '' + log.info("File upload handled successfully for resource: %s", resource) + + def _process_file_upload(self, upload_field_storage, resource): + """ + Process a standard file upload. + """ + log.info("Processing file upload: %s", upload_field_storage.filename) + self.filename = munge.munge_filename(upload_field_storage.filename) + self.file_upload = _get_underlying_file(upload_field_storage) + resource['url'] = self.filename + resource['url_type'] = 'upload' + log.info("File uploaded successfully: %s", self.filename) + + def _process_multipart_upload(self, multipart_name, resource): + """ + Process a multipart upload, specifically for AWS. + """ + resource['url'] = munge.munge_filename(multipart_name) + resource['url_type'] = 'upload' + + def _handle_clear_upload(self, resource): + """ + Handle clearing of an upload. + """ + self._clear = resource.pop('clear_upload', None) + if self._clear and resource.get('id'): + self._clear_old_upload(resource) + + def _clear_old_upload(self, resource): + """ + Clear an old upload when a new file is uploaded. + """ + old_resource = model.Session.query(model.Resource).get(resource['id']) + self.old_filename = old_resource.url + resource['url_type'] = '' + + @property + def container_name(self): + """ + Overridden container_name property. + """ + return self._container_name + + @container_name.setter + def container_name(self, value): + """ + Overridden setter for container_name. + """ + self._container_name = value def path_from_filename(self, rid, filename): """ @@ -205,10 +328,58 @@ def path_from_filename(self, rid, filename): :param filename: The unmunged resource filename. """ return os.path.join( + 'packages', + self.package.id, 'resources', rid, munge.munge_filename(filename) ) + + def get_container_name_of_current_org(self): + """ + Generates the container name for the current organization. + + It retrieves the organization from the database using the package's + owner organization ID and constructs a container name using a predefined + prefix and the organization's name. + + :return: A string representing the container name. + """ + log.info("Retrieving container name for current organization") + owner_org = str(self.package.owner_org).encode('ascii', 'ignore') + org = model.Session.query(model.Group) \ + .filter(model.Group.id == owner_org).first() + + name = self.prefix + str(org.name).encode('ascii', 'ignore') + log.info("Container name retrieved: %s", name) + return name + + def get_user_role_in_organization(self): + """ + Determines the user's role in the current organization. + + This method retrieves the role of the currently logged-in user in the + organization that owns the package. It checks the user's membership in + the organization and returns their role if found. + + :return: A string representing the user's role in the organization, or + None if the user has no role or is not found. + """ + org_id = str(self.package.owner_org).encode('ascii', 'ignore') + user_name = toolkit.c.user + user_id = authz.get_user_id_for_username(user_name, allow_none=True) + if not user_id: + return None + # get any roles the user has for the group + q = model.Session.query(model.Member) \ + .filter(model.Member.table_name == 'user') \ + .filter(model.Member.group_id == org_id) \ + .filter(model.Member.state == 'active') \ + .filter(model.Member.table_id == user_id) + # return the first role we find + for row in q.all(): + return row.capacity + return None def upload(self, id, max_size=10): """ @@ -218,142 +389,334 @@ def upload(self, id, max_size=10): :param max_size: Ignored. """ if self.filename: - if self.can_use_advanced_azure: - from azure.storage import blob as azure_blob - from azure.storage.blob.models import ContentSettings + self._upload_file(id) + elif self._clear and self.old_filename and not self.leave_files: + self._delete_old_file(id) - blob_service = azure_blob.BlockBlobService( - self.driver_options['key'], - self.driver_options['secret'] - ) - content_settings = None - if self.guess_mimetype: - content_type, _ = mimetypes.guess_type(self.filename) - if content_type: - content_settings = ContentSettings( - content_type=content_type - ) - - return blob_service.create_blob_from_stream( - container_name=self.container_name, - blob_name=self.path_from_filename( - id, - self.filename - ), - stream=self.file_upload, - content_settings=content_settings - ) - else: + def _upload_file(self, id): + """ + Handles the file uploading process. + + :param id: The resource_id. + """ + if self.can_use_advanced_azure: + self._upload_to_azure(id) + else: + self._upload_to_libcloud(id) - # TODO: This might not be needed once libcloud is upgraded - if isinstance(self.file_upload, SpooledTemporaryFile): - self.file_upload.next = self.file_upload.next() + def _upload_to_azure(self, id): + """ + Uploads a file to Azure blob storage. - self.container.upload_object_via_stream( - self.file_upload, - object_name=self.path_from_filename( - id, - self.filename - ) - ) + This method uploads the file associated with the given resource ID + to Azure Blob Storage, using the configured container and filename. + It handles content settings for the file based on its MIME type. - elif self._clear and self.old_filename and not self.leave_files: - # This is only set when a previously-uploaded file is replace - # by a link. We want to delete the previously-uploaded file. - try: - self.container.delete_object( - self.container.get_object( - self.path_from_filename( - id, - self.old_filename - ) - ) - ) - except ObjectDoesNotExistError: - # It's possible for the object to have already been deleted, or - # for it to not yet exist in a committed state due to an - # outstanding lease. - return + :param id: The resource_id associated with the file to be uploaded. + :return: The response from the Azure blob service after upload. + """ + from azure.storage import blob as azure_blob - def get_url_from_filename(self, rid, filename, content_type=None): + blob_service = azure_blob.BlockBlobService(**self.driver_options) + content_settings = self._get_content_settings_for_azure() + + return blob_service.create_blob_from_stream( + container_name=self.container_name, + blob_name=self.path_from_filename(id, self.filename), + stream=self.file_upload, + content_settings=content_settings + ) + + def _get_content_settings_for_azure(self): """ - Retrieve a publically accessible URL for the given resource_id - and filename. + Determines the content settings for Azure based on the file's mimetype. + """ + from azure.storage.blob.models import ContentSettings - .. note:: + content_settings = None + if self.guess_mimetype: + content_type, _ = mimetypes.guess_type(self.filename) + if content_type: + content_settings = ContentSettings(content_type=content_type) + return content_settings - Works for Azure and any libcloud driver that implements - support for get_object_cdn_url (ex: AWS S3). + def _upload_to_libcloud(self, id): + """ + Uploads a file using the libcloud driver to the configured storage. - :param rid: The resource ID. - :param filename: The resource filename. - :param content_type: Optionally a Content-Type header. + This method handles the file upload process for various cloud storage + services using the libcloud driver. It supports 'SpooledTemporaryFile' + for handling file uploads and streams the file to the designated + container and object name based on the resource ID and filename. - :returns: Externally accessible URL or None. + :param id: The resource_id associated with the file to be uploaded. """ - # Find the key the file *should* be stored at. - path = self.path_from_filename(rid, filename) + # Specific handling for SpooledTemporaryFile + if isinstance(self.file_upload, SpooledTemporaryFile): + self.file_upload.next = self.file_upload.next() - # If advanced azure features are enabled, generate a temporary - # shared access link instead of simply redirecting to the file. - if self.can_use_advanced_azure and self.use_secure_urls: - from azure.storage import blob as azure_blob + self.container.upload_object_via_stream( + self.file_upload, + object_name=self.path_from_filename(id, self.filename) + ) - blob_service = azure_blob.BlockBlobService( - self.driver_options['key'], - self.driver_options['secret'] + def _delete_old_file(self, id): + """ + Deletes an old file when a new file is uploaded. + + This method is invoked when a previously uploaded file is replaced + by a new file or a link. It attempts to delete the old file from + the storage container. If the file does not exist or has already + been deleted, the method will silently complete without errors. + + :param id: The resource_id associated with the file to be deleted. + """ + # This is only set when a previously-uploaded file is replace + # by a link. We want to delete the previously-uploaded file. + log.info("Deleting old file: %s", self.old_filename) + try: + self.container.delete_object( + self.container.get_object( + self.path_from_filename(id, self.old_filename) + ) ) + log.info("Old file deleted: %s", self.old_filename) + except ObjectDoesNotExistError: + # It's possible for the object to have already been deleted, or + # for it to not yet exist in a committed state due to an + # outstanding lease. + return + + def _generate_azure_url(self, path): + """ + Generates a signed URL for an Azure Blob Storage object. + + This method creates a URL with a Shared Access Signature (SAS) for + secure access to a blob in Azure Blob Storage. The SAS is set to expire + in 1 hour and grants read-only access to the blob. - return blob_service.make_blob_url( + :param path: The path to the blob within the Azure container. + :return: A string representing the SAS URL to the Azure blob. + """ + from azure.storage import blob as azure_blob + + blob_service = azure_blob.BlockBlobService( + self.driver_options['key'], + self.driver_options['secret'] + ) + + return blob_service.make_blob_url( + container_name=self.container_name, + blob_name=path, + sas_token=blob_service.generate_blob_shared_access_signature( container_name=self.container_name, blob_name=path, - sas_token=blob_service.generate_blob_shared_access_signature( - container_name=self.container_name, - blob_name=path, - expiry=datetime.utcnow() + timedelta(hours=1), - permission=azure_blob.BlobPermissions.READ + expiry=datetime.utcnow() + timedelta(hours=1), + permission=azure_blob.BlobPermissions.READ + ) + ) + + def _generate_aws_url(self, path, content_type): + """ + Generates a signed URL for an AWS S3 object. + + This method creates a presigned URL for an object stored in Amazon S3, + allowing secure, temporary access. The URL expires in 1 hour and is + configured for HTTP GET requests. If a content type is provided, it's + included in the URL's headers. + + :param path: The path to the object in the S3 bucket. + :param content_type: Optional. The MIME type of the object. Used to set + the 'Content-Type' header in the generated URL. + :return: A string representing the presigned URL to the S3 object. + """ + from boto.s3.connection import S3Connection + s3_connection = S3Connection(**self.driver_options) + generate_url_params = {"expires_in": 3600, "method": "GET", + "bucket": self.container_name, "query_auth": True, + "key": path} + if content_type: + generate_url_params['headers'] = {"Content-Type": content_type} + + return s3_connection.generate_url(**generate_url_params) + + def _generate_public_google_url(self, obj, user_obj, user_email): + """ + Generates a signed URL for public Google Cloud Storage objects. + + For anonymous users, uses a service account to impersonate a group. + For authenticated users with admin or editor roles, grants direct access. + + :param obj: The GCS object for which to generate the URL. + :param user_obj: The user object of the currently logged-in user. + :param user_email: The email address of the user. + :return: A signed URL string. + """ + import ckanext.cloudstorage.google_storage as storage + + if user_obj is None: + # Use service account for anonymous users + return storage.generate_signed_url_with_impersonated_user( + self.driver_options['secret'], + self.container_name, + object_name=obj.name, + impersonate_user=self.group_email, + expiration=3600 + ) + else: + if self.role in ("admin", "editor"): + # Direct signed URL for admin and editor + return storage.generate_signed_url( + self.driver_options['secret'], + self.container_name, + object_name=obj.name, + expiration=3600 + ) + else: + # Impersonate a user for other roles + return storage.generate_signed_url_with_impersonated_user( + self.driver_options['secret'], + self.container_name, + object_name=obj.name, + impersonate_user=user_email, + expiration=3600 ) + + def _generate_private_google_url(self, obj, user_role, user_email): + """ + Generates a signed URL for private Google Cloud Storage objects. + + Access is based on the user's role. Admin and editor roles are given + direct access, while member roles are handled through impersonation. + + :param obj: The GCS object for which to generate the URL. + :param user_role: The role of the user in the organization. + :param user_email: The email address of the user. + :return: A signed URL string. + """ + import ckanext.cloudstorage.google_storage as storage + + if user_role in ("admin", "editor"): + # Direct signed URL for admin and editor + return storage.generate_signed_url( + self.driver_options['secret'], + self.container_name, + object_name=obj.name, + expiration=3600 ) - elif self.can_use_advanced_aws and self.use_secure_urls: - from boto.s3.connection import S3Connection - s3_connection = S3Connection( - self.driver_options['key'], - self.driver_options['secret'] + elif user_role == "member": + # Impersonate a user for member role + return storage.generate_signed_url_with_impersonated_user( + self.driver_options['secret'], + self.container_name, + object_name=obj.name, + impersonate_user=user_email, + expiration=3600 ) + else: + raise NotAuthorized("User not authorized to read or download this file") - generate_url_params = {"expires_in": 60 * 60, - "method": "GET", - "bucket": self.container_name, - "query_auth": True, - "key": path} - if content_type: - generate_url_params['headers'] = {"Content-Type": content_type} + def _generate_google_url(self, path): + """ + Generates a signed URL for a Google Cloud Storage object. + + This method creates a signed URL for a GCS object, considering the + package's privacy status and the user's role. For public packages, it + either uses a service account for anonymous users or grants direct access + for admin/editor roles. For private packages, access is based on user roles. + The URL expires in 1 hour. + + :param path: The path to the object in the GCS bucket. + :return: A signed URL for the GCS object. Raises NotAuthorized for + unauthorized users. + """ + import ckanext.cloudstorage.google_storage as storage - return s3_connection.generate_url(**generate_url_params) + obj=self.container.get_object(path) + user_name = toolkit.c.user + user_obj = toolkit.c.userobj + + is_private_package = self.package.is_private + user_role = self.role + user_email = str(user_obj.email).encode('ascii', 'ignore') if user_obj else None + + # For public packages + if not is_private_package: + return self._generate_public_google_url(obj,user_obj,user_email) + # For private packages + else: + return self._generate_private_google_url(obj,user_role, user_email) + + def _generate_default_url(self, path): + """ + Generate a default URL for storage providers that do not require special handling. + + :param path: The path of the object in the storage. + :returns: A URL for the object or None if not applicable. + """ # Find the object for the given key. obj = self.container.get_object(path) if obj is None: return - # Not supported by all providers! try: + # Attempt to use the provider's CDN URL generation method return self.driver.get_object_cdn_url(obj) except NotImplementedError: - if 'S3' in self.driver_name: - return urlparse.urljoin( - 'https://' + self.driver.connection.host, - '{container}/{path}'.format( - container=self.container_name, - path=path - ) + # Handle storage providers like S3 or Google Cloud using known URL patterns + if 'S3' in self.driver_name or 'GOOGLE_STORAGE' in self.driver_name: + return 'https://{host}/{container}/{path}'.format( + host=self.driver.connection.host, + container=self.container_name, + path=path ) - # This extra 'url' property isn't documented anywhere, sadly. - # See azure_blobs.py:_xml_to_object for more. + # For Azure and others, check for an 'url' property in the object's extra attributes elif 'url' in obj.extra: return obj.extra['url'] - raise + # If none of the above, return None or raise an appropriate exception + else: + return None # or raise an appropriate exception + + def get_url_from_filename(self, rid, filename, content_type=None): + """ + Retrieve a publically accessible URL for the given resource_id + and filename. + + .. note:: + + Works for Azure and any libcloud driver that implements + support for get_object_cdn_url (ex: AWS S3). + + :param rid: The resource ID. + :param filename: The resource filename. + :param content_type: Optionally a Content-Type header. + + :returns: Externally accessible URL or None. + """ + # Find the key the file *should* be stored at. + path = self.path_from_filename(rid, filename) + + # If advanced azure features are enabled, generate a temporary + # shared access link instead of simply redirecting to the file. + if self.can_use_advanced_azure and self.use_secure_urls: + return self._generate_azure_url(path) + elif self.can_use_advanced_aws and self.use_secure_urls: + return self._generate_aws_url(path, content_type) + elif self.can_use_advanced_google and self.use_secure_urls: + return self._generate_google_url(path) + else: + return self._generate_default_url(path) + + def get_object(self, rid, filename): + # Find the key the file *should* be stored at. + path = self.path_from_filename(rid, filename) + # Find the object for the given key. + return self.container.get_object(path) + def get_object_as_stream(self, obj): + return self.driver.download_object_as_stream(obj) + @property def package(self): return model.Package.get(self.resource['package_id']) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..9f9d93d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +apache-libcloud==2.8.3 +ckanapi>=1.0,<5 + # only needed when safe url is required +six>=1.13.0 +google-auth>=2.18.1 +google-cloud-storage==1.13.1 +oauth2client==4.1.3 diff --git a/setup.py b/setup.py index 77f72af..280139e 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ include_package_data=True, zip_safe=False, install_requires=[ - 'apache-libcloud==1.5', + 'apache-libcloud==2.8.3', 'ckanapi>=1.0,<5' ], entry_points=(