From 69cbfacc0ee8ed6569e4390e94467ff50f201983 Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Thu, 6 Mar 2025 08:28:00 -0600 Subject: [PATCH 1/4] - Initial implementation to fetch full records. --- task/api_model.py | 256 +++++++++++++++++++++++++++++ task/main.py | 411 ++++++++++++++++++++++++++++++++++++---------- 2 files changed, 576 insertions(+), 91 deletions(-) create mode 100644 task/api_model.py diff --git a/task/api_model.py b/task/api_model.py new file mode 100644 index 0000000..6645935 --- /dev/null +++ b/task/api_model.py @@ -0,0 +1,256 @@ + +async_operations_db_columns = ( + 'cumulus_id', + 'id', + 'description', + 'operation_type', + 'output', + 'status', + 'task_arn', + 'created_at', + 'updated_at' +) + +collections_db_columns = ( + 'collection_id', + 'name', + 'version', + 'sample_file_name', + 'granule_id_validation_regex', + 'granule_id_extraction_regex', + 'files', + 'process', + 'url_path', + 'duplicate_handling', + 'report_to_ems', + 'ignore_files_config_for_discovery', + 'meta', + 'tags', + 'created_at', + 'updated_at' +) + +executions_db_columns = ( + 'cumulus_id', + 'arn', + 'async_operation_cumulus_id', + 'collection_cumulus_id', + 'parent_cumulus_id', + 'cumulus_version', + 'url', + 'status', + 'tasks', + 'error', + 'workflow_name', + 'duration', + 'original_payload' +) + +files_db_columns = ( + 'files', + 'cumulus_id', + 'granule_cumulus_id', + 'created_at', + 'updated_at', + 'file_size', + 'bucket', + 'checksum_type', + 'checksum_value', + 'file_name', + 'key', + 'path', + 'source', + 'type' +) + + +granules_db_columns = ( + # 'granules.cumulus_id', + 'granule_id', + 'status', + 'files', + # 'granules.collection_cumulus_id', + 'created_at', + 'updated_at', + 'published', + 'duration', + 'time_to_archive', + 'time_to_process', + 'product_volume', + 'error', + 'cmr_link', + # 'granules.pdr_cumulus_id', + # 'granules.provider_cumulus_id', + 'beginning_date_time', + 'ending_date_time', + 'last_update_date_time', + 'processing_end_date_time', + 'processing_start_date_time', + 'collection_id' +) + +pdrs_db_columns = ( + # 'cumulus_id', + # 'collection_cumulus_id', + # 'provider_cumulus_id', + # 'execution_cumulus_id', + 'status', + 'name', + 'progress', + 'pan_sent', + 'pan_message', + 'stats', + 'address', + 'original_url', + 'duration', + 'timestamp', + 'created_at', + 'updated_at' +) + +providers_db_columns = ( + # 'cumulus_id', + 'name', + 'protocol', + 'host', + 'port', + 'username', + 'password', + 'global_connection_limit', + 'private_key', + 'cm_key_id', + 'certificate_uri', + 'created_at', + 'updated_at', + 'allowed_redirects', + 'max_download_time' +) + +rules_db_columns = ( + # 'cumulus_id', + 'name', + 'workflow', + # 'collection_cumulus_id', + # 'provider_cumulus_id', + 'type', + 'enabled', + 'value', + 'arn', + 'log_event_arn', + 'execution_name_prefix', + 'payload', + 'meta', + 'tags', + 'queue_url' +) + +granule_model_fields = ( + 'beginningDateTime', + 'cmrLink', + 'collectionId', + 'createdAt', + 'duration', + 'endingDateTime', + 'error', + 'execution', + 'files', + 'granuleId', + 'lastUpdateDateTime', + 'pdrName', + 'processingEndDateTime', + 'processingStartDateTime', + 'productVolume', + 'productionDateTime', + 'provider', + 'published', + 'queryFields', + 'status', + 'timeToArchive', + 'timeToPreprocess', + 'timestamp', + 'updatedAt' +) + +def api_field_names_to_db_column_names(field_names): + db_column_names = [] + for field_name in field_names: + db_column_names.append(api_field_name_to_db_column(field_name)) + + return db_column_names + +def api_field_name_to_db_column(field_name): + db_column_name = '' + if field_name in granule_model_fields: + for character in field_name: + new_character = character + if character.isupper(): + new_character = f'_{character.lower()}' + db_column_name = f'{db_column_name}{new_character}' + + print(f'{field_name} -> {db_column_name}') + + return db_column_name + +def db_column_names_to_api_keys(column_names): + api_keys = [] + for column_name in column_names: + new_str = db_column_names_to_api_keys(column_name) + api_keys.append(new_str) + + return api_keys + +def db_column_name_to_api_field_name(db_column_name): + new_str = '' + capitalize_character = False + for character in db_column_name: + new_character = character + if new_character == '_': + capitalize_character = True + continue + elif capitalize_character: + new_character = new_character.upper() + capitalize_character = False + else: # Case handled by new_character=character + pass + new_str = f'{new_str}{new_character}' + + if new_str not in granule_model_fields: + new_str = '' + + return new_str + + +def test_column_api(): + keys = ['beginningDateTime', 'fieldTwo', 'fieldThree', 'something_else'] + print(keys) + column_names = api_field_names_to_db_column_names(keys) + print(column_names) + keys = db_column_names_to_api_keys(column_names) + print(keys) + +def parse_where_clause(query_str): + parsed_query = '' + query_term = '' + for character in query_str: + if character.isalnum(): + query_term = f'{query_term}{character}' + else: + db_column_name = api_field_name_to_db_column(query_term) + if db_column_name: + query_term = db_column_name + + parsed_query = f'{parsed_query}{query_term}{character}' + query_term = '' + + parsed_query = f'{parsed_query}{query_term}' + + return parsed_query + +def test_parse_where_clause(): + query = 'SELECT * where granules.granuleId LIKE someValue AND collectionId=anotherValue' + parsed_query = parse_where_clause(query) + print(parsed_query) + + +if __name__ == '__main__': + pass \ No newline at end of file diff --git a/task/main.py b/task/main.py index 8e17900..951fb70 100644 --- a/task/main.py +++ b/task/main.py @@ -1,60 +1,27 @@ import datetime import json -import math import os -import re import time from abc import ABC +from task.api_model import * import boto3 import psycopg2 from psycopg2 import sql -CUMULUS_DB_COLUMNS = { - 'cumulus_id', 'granule_id', 'status', 'collection_cumulus_id', 'created_at', 'updated_at', 'published', 'duration', - 'time_to_archive', 'time_to_process', 'product_volume', 'error', 'cmr_link', 'pdr_cumulus_id', - 'provider_cumulus_id', 'beginning_date_time', 'ending_date_time', 'last_update_date_time', - 'processing_end_date_time', 'processing_start_date_time', 'production_date_time', 'query_fields', 'timestamp', - 'cumulus_id', 'name', 'version', 'sample_file_name', 'granule_id_validation_regex', 'granule_id_extraction_regex', - 'files', 'process', 'url_path', 'duplicate_handling', 'report_to_ems', 'ignore_files_config_for_discovery', 'meta', - 'tags', 'created_at', 'updated_at' -} - - -def build_query(records, where=None, columns=None, limit=100, **kwargs): - keywords = {'and', 'or', 'not', 'in', 'like'} - - query_args = [] - if where: - res = re.findall(r'[\w+\%]+', where) - for word in res: - lower_word = word.lower() - if lower_word not in CUMULUS_DB_COLUMNS and lower_word not in keywords: - where = where.replace(word, '%s', 1) - query_args.append(word) - sql_where = sql.SQL('WHERE {}').format(sql.SQL(where)) - else: - sql_where = sql.SQL('') - - # TODO: Remove once we are sure the user input doesn't need to be escaped when a literal % is present. - # As of 08/31/2023 no issue has been discovered - # if '%' in where: - # where = where + ' ESCAPE \'\'' - - query = sql.SQL( - 'SELECT {} FROM {} ' - 'JOIN collections ON granules.collection_cumulus_id=collections.cumulus_id ' - '{} ' - '{}' - ).format( - sql.SQL(', ').join([sql.Identifier(column) for column in columns]) if columns else sql.SQL('*'), - sql.Identifier(records), - sql_where, - sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit else sql.SQL('') - ) - - return {'query': query, 'args': query_args} +# CUMULUS_DB_COLUMNS = ( +# 'cumulus_id', 'granule_id', 'status', 'collection_cumulus_id', 'created_at', 'updated_at', 'published', 'duration', +# 'time_to_archive', 'time_to_process', 'product_volume', 'error', 'cmr_link', 'pdr_cumulus_id', +# 'provider_cumulus_id', 'beginning_date_time', 'ending_date_time', 'last_update_date_time', +# 'processing_end_date_time', 'processing_start_date_time', 'production_date_time', 'query_fields', 'timestamp', +# 'cumulus_id', 'name', 'version', 'sample_file_name', 'granule_id_validation_regex', 'granule_id_extraction_regex', +# 'files', 'process', 'url_path', 'duplicate_handling', 'report_to_ems', 'ignore_files_config_for_discovery', 'meta', +# 'tags', 'created_at', 'updated_at', 'collection_id' +# ) +# CUMULUS_DB_TABLES = ( +# 'granules', 'collections', 'rules', 'files', 'granules_executions', 'executions', 'async_operations', 'providers', 'pdrs' +# ) def get_db_params(): sm = boto3.client('secretsmanager') @@ -79,17 +46,6 @@ def handle_row(self, row, selected_columns): def complete_upload(self): raise NotImplementedError - @staticmethod - def convert_tuple_to_json(row, selected_columns): - record_dict = {} - for value, index in zip(row, range(len(row))): - if isinstance(value, datetime.datetime): - value = str(value) - elif isinstance(value, bool): - value = json.dumps(value) - record_dict.update({selected_columns[index]: value}) - return json.dumps(record_dict) - class MPUHandler(UploadHandlerBase): def __init__(self, bucket, key): @@ -102,9 +58,9 @@ def __init__(self, bucket, key): self.s3_part_size = 20971520 # 20MBit self.rows = [] - def handle_row(self, row, selected_columns): - self.rows.append(self.convert_tuple_to_json(row, selected_columns)) - column_count = len(self.rows) * len(selected_columns) + def handle_row(self, row, column_description): + self.rows.append(convert_tuple_to_json(row, column_description)) + column_count = len(self.rows) * len(column_description) if column_count >= 360000: self.upload_part(f'{",".join(self.rows)}') self.rows.clear() @@ -131,6 +87,19 @@ def complete_upload(self): } return self.s3_client.complete_multipart_upload(**complete_mpu_dict) +def convert_tuple_to_json(row, selected_columns): + # print(f'selected_columns: {selected_columns}') + record_dict = {} + for value, index in zip(row, range(len(row))): + if isinstance(value, datetime.datetime): + value = str(value) + elif isinstance(value, bool): + value = json.dumps(value) + + # print(f'selected_columns[index]: {selected_columns[index]}') + record_dict.update({selected_columns[index].name: value}) + return json.dumps(record_dict) + class UploadHandler(UploadHandlerBase): def __init__(self, bucket, key): @@ -138,7 +107,7 @@ def __init__(self, bucket, key): self.rows = [] def handle_row(self, row, selected_columns): - self.rows.append(self.convert_tuple_to_json(row, selected_columns)) + self.rows.append(convert_tuple_to_json(row, selected_columns)) def complete_upload(self): s3_client = boto3.client('s3') @@ -158,40 +127,300 @@ def get_upload_handler(total_columns, handler_args): return upload_handler -def main(event, context): - rds_config = event.get('rds_config') - db_conn = psycopg2.connect(**get_db_params()) - curs = None - handler_args = {} - try: - db_conn.set_session(readonly=True) - db_conn.commit() - curs = db_conn.cursor() - query_dict = build_query(**rds_config) - # print(query_dict.get('query').as_string(curs)) # Uncomment when troubleshooting queries - curs.execute(query_dict.get('query'), query_dict.get('args')) - - handler_args = { - 'bucket': os.getenv('BUCKET_NAME'), - 'key': f'{os.getenv("S3_KEY_PREFIX")}query_results_{time.time_ns()}.json' - } +def join_check(selected_columns, where, table_columns): + ret = False + if selected_columns == '*' or any([column in table_columns for column in selected_columns.replace(' ', '').split(',')]): + ret = True + elif any([column in where for column in table_columns]): + ret = True + return ret + +def get_async_join(columns, where, right_table): + collections_join = sql.SQL('') + if join_check(columns, where, async_operations_db_columns): + collections_join = sql.SQL( + ''' + LEFT JOIN ( + SELECT cumulus_id, id + FROM async_operations + ) AS async_operations ON async_operations.cumulus_id={} + ''' + ).format(sql.Identifier(right_table, 'async_operation_cumulus_id')) + + return collections_join + +def get_collection_json_join(columns, where, right_table): + collections_join = sql.SQL('') + if join_check(columns, where, collections_db_columns): + collections_join = sql.SQL( + ''' + JOIN ( + SELECT cumulus_id, json_build_object('collection', json_build_object('name', name, 'version', version)) + FROM collections + ) AS GC ON GC.cumulus_id={} + ''' + ).format(sql.Identifier(right_table, 'collection_cumulus_id')) + + return collections_join + + + +def get_collection_id_join(columns, where, right_table): + collections_join = sql.SQL('') + if join_check(columns, where, collections_db_columns): + collections_join = sql.SQL( + ''' + JOIN ( + SELECT cumulus_id, concat(collections.name, '___', collections.version) AS collection_id + FROM collections + ) AS GC ON GC.cumulus_id={} + ''' + ).format(sql.Identifier(right_table, 'collection_cumulus_id')) + + return collections_join + +def get_executions_join(columns, where, right_table): + executions_join = sql.SQL('') + if join_check(columns, where, executions_db_columns): + executions_join = sql.SQL( + ''' + LEFT JOIN ( + SELECT DISTINCT ON (granule_cumulus_id) granule_cumulus_id, url AS execution + FROM executions + JOIN granules_executions ON executions.cumulus_id=granules_executions.execution_cumulus_id + ORDER BY granule_cumulus_id, executions.timestamp + ) AS execution_arns ON execution_arns.granule_cumulus_id={} + ''' + ).format(sql.Identifier(right_table, 'cumulus_id')) + + return executions_join + +def get_files_array_join(columns, where, right_table): + files_join = sql.SQL('') + if join_check(columns, where, files_db_columns): + files_join = sql.SQL( + ''' + LEFT JOIN ( + SELECT granule_cumulus_id, json_agg(files) AS files + FROM files + GROUP BY granule_cumulus_id + ) AS granule_files on granule_files.granule_cumulus_id={} + ''' + ).format(sql.Identifier(right_table, 'cumulus_id')) + + return files_join + +def get_providers_join(columns, where, right_table): + providers_join = sql.SQL('') + if join_check(columns, where, providers_db_columns): + providers_join = sql.SQL( + ''' + LEFT JOIN ( + SELECT name AS provider, providers.cumulus_id + FROM providers + ) AS provider_names ON provider_names.cumulus_id={} + ''' + ).format(sql.Identifier(right_table, 'provider_cumulus_id')) + + return providers_join + +def build_where(where=''): + sql_where = sql.SQL('') + if where: + sql_where = sql.SQL('WHERE {}').format(sql.SQL(where)) + + return sql_where + +def build_granules_query(records, columns, where='', limit=-1): + joins = [] + for get_join in [get_collection_id_join, get_executions_join, get_files_array_join, get_providers_join]: + joins.append(get_join(columns, where, records)) + + query = sql.SQL( + ''' + SELECT {} + FROM {} + {} + {} + {} + ''' + ).format( + sql.SQL(columns if columns else '*'), + sql.Identifier(records), + sql.SQL(' ').join(joins), + build_where(where), + sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('') + ) + + return query + + +def build_rules_query(records, columns=None, where=None, limit=-1): + joins = [] + for get_join in [get_collection_json_join, get_providers_join]: + joins.append(get_join(columns, where, records)) + + query = sql.SQL( + ''' + SELECT {} + FROM {} + {} + {} + {} + ''' + ).format( + sql.SQL(columns if columns else '*'), + sql.Identifier(records), + sql.SQL(' ').join(joins), + build_where(where), + sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('') + ) + + return query + +def build_collections_query(records, columns=None, where=None, limit=-1): + query = sql.SQL( + ''' + SELECT {} + FROM {} + {} + {} + ''' + ).format( + sql.SQL(columns if columns else '*'), + sql.Identifier(records), + build_where(where), + sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('') + ) + + return query + +def build_executions_query(records, columns=None, where=None, limit=-1): + joins = [] + for get_join in [get_async_join, get_collection_id_join, get_executions_join]: + joins.append(get_join(columns, where, records)) + + query = sql.SQL( + ''' + SELECT {} + FROM {} + {} + {} + {} + ''' + ).format( + # sql.SQL(', ').join([sql.Identifier(column) for column in columns]) if columns else sql.SQL('*'), + sql.SQL(columns if columns else '*'), + sql.Identifier(records), + sql.SQL(' ').join(joins), + build_where(where), + sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('') + ) + + return query + +def build_providers_query(table, columns=None, where=None, limit=-1): + query = sql.SQL( + ''' + SELECT {} + FROM {} + {} + {} + ''' + ).format( + # sql.SQL(', ').join([sql.Identifier(column) for column in columns]) if columns else sql.SQL('*'), + sql.SQL(columns if columns else '*'), + sql.Identifier(table), + build_where(where), + sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('') + ) - selected_columns = ([desc[0] for desc in curs.description]) - upload_handler = get_upload_handler(len(selected_columns) * curs.rowcount, handler_args) - handler_args.update({'count': curs.rowcount}) + return query - size = event.get('size', 10000) - for _ in range(math.ceil(curs.rowcount / size)): - for row in curs.fetchmany(size=size): - upload_handler.handle_row(row, selected_columns) +def build_pdrs_query(records, columns=None, where=None, limit=-1): + joins = [] + for get_join in [get_collection_id_join, get_providers_join, get_executions_join]: + joins.append(get_join(columns, where, records)) + + query = sql.SQL( + ''' + SELECT {} + FROM {} + {} + {} + {} + ''' + ).format( + # sql.SQL(', ').join([sql.Identifier(column) for column in columns]) if columns else sql.SQL('*'), + sql.SQL(columns if columns else '*'), + sql.Identifier(records), + sql.SQL(' ').join(joins), + build_where(where), + sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('') + ) - upload_handler.complete_upload() - curs.close() + return query - finally: - if curs and not curs.closed: - curs.close() - db_conn.close() +def build_async_query(table, columns=None, where=None, limit=-1): + query = sql.SQL( + ''' + SELECT {} + FROM {} + {} + {} + ''' + ).format( + # sql.SQL(', ').join([sql.Identifier(column) for column in columns]) if columns else sql.SQL('*'), + sql.SQL(columns if columns else '*'), + sql.Identifier(table), + build_where(where), + sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('') + ) + + return query + +def build_query_new(records, columns=None, where=None, limit=0): + if not columns: + columns = '*' + + switch = { + 'granules': build_granules_query, + 'rules': build_rules_query, + 'collections': build_collections_query, + 'executions': build_executions_query, + 'providers': build_providers_query, + 'pdrs': build_pdrs_query, + 'async_operations': build_async_query + } + + return switch.get(records)(records, columns, where, limit) + +def main(event, context): + rds_config = event.get('rds_config') + handler_args = { + 'bucket': os.getenv('BUCKET_NAME'), + 'key': f'{os.getenv("S3_KEY_PREFIX")}query_results_{time.time_ns()}.json' + } + + client = boto3.client('s3') + client.put_object(Bucket=handler_args['bucket'], Key=handler_args['key'], Body=b'[]') + + query = build_query_new(**rds_config) + with psycopg2.connect(**get_db_params()) as db_conn: + with db_conn.cursor(name='rds-cursor') as curs: + curs.itersize = event.get('size', 10000) + print(query.as_string(curs)) # Uncomment when troubleshooting queries + # print(curs.mogrify(query, vars)) + curs.execute(query=query) + + upload_handler = MPUHandler(**handler_args) + rowcount = 0 + for row in curs: + upload_handler.handle_row(row, curs.description) + rowcount += 1 + handler_args.update({'count': rowcount}) + + upload_handler.complete_upload() return handler_args From a0abfde70734dd54f2bc25595fb3093d563fb99f Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Thu, 6 Mar 2025 15:25:26 -0600 Subject: [PATCH 2/4] - Full record retrieval and smart joins now implemented. --- task/api_model.py | 3 + task/main.py | 162 +++++++--------------------------------------- 2 files changed, 28 insertions(+), 137 deletions(-) diff --git a/task/api_model.py b/task/api_model.py index 6645935..6b7ccd5 100644 --- a/task/api_model.py +++ b/task/api_model.py @@ -1,5 +1,6 @@ async_operations_db_columns = ( + 'async_operation_id', 'cumulus_id', 'id', 'description', @@ -31,6 +32,7 @@ ) executions_db_columns = ( + 'execution', 'cumulus_id', 'arn', 'async_operation_cumulus_id', @@ -110,6 +112,7 @@ providers_db_columns = ( # 'cumulus_id', + 'id', 'name', 'protocol', 'host', diff --git a/task/main.py b/task/main.py index 951fb70..9befdc6 100644 --- a/task/main.py +++ b/task/main.py @@ -9,19 +9,6 @@ import psycopg2 from psycopg2 import sql -# CUMULUS_DB_COLUMNS = ( -# 'cumulus_id', 'granule_id', 'status', 'collection_cumulus_id', 'created_at', 'updated_at', 'published', 'duration', -# 'time_to_archive', 'time_to_process', 'product_volume', 'error', 'cmr_link', 'pdr_cumulus_id', -# 'provider_cumulus_id', 'beginning_date_time', 'ending_date_time', 'last_update_date_time', -# 'processing_end_date_time', 'processing_start_date_time', 'production_date_time', 'query_fields', 'timestamp', -# 'cumulus_id', 'name', 'version', 'sample_file_name', 'granule_id_validation_regex', 'granule_id_extraction_regex', -# 'files', 'process', 'url_path', 'duplicate_handling', 'report_to_ems', 'ignore_files_config_for_discovery', 'meta', -# 'tags', 'created_at', 'updated_at', 'collection_id' -# ) - -# CUMULUS_DB_TABLES = ( -# 'granules', 'collections', 'rules', 'files', 'granules_executions', 'executions', 'async_operations', 'providers', 'pdrs' -# ) def get_db_params(): sm = boto3.client('secretsmanager') @@ -141,7 +128,7 @@ def get_async_join(columns, where, right_table): collections_join = sql.SQL( ''' LEFT JOIN ( - SELECT cumulus_id, id + SELECT cumulus_id, id AS async_operation_id FROM async_operations ) AS async_operations ON async_operations.cumulus_id={} ''' @@ -163,8 +150,6 @@ def get_collection_json_join(columns, where, right_table): return collections_join - - def get_collection_id_join(columns, where, right_table): collections_join = sql.SQL('') if join_check(columns, where, collections_db_columns): @@ -236,23 +221,9 @@ def build_granules_query(records, columns, where='', limit=-1): for get_join in [get_collection_id_join, get_executions_join, get_files_array_join, get_providers_join]: joins.append(get_join(columns, where, records)) - query = sql.SQL( - ''' - SELECT {} - FROM {} - {} - {} - {} - ''' - ).format( - sql.SQL(columns if columns else '*'), - sql.Identifier(records), - sql.SQL(' ').join(joins), - build_where(where), - sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('') - ) + joins = sql.SQL(' ').join(joins) - return query + return joins def build_rules_query(records, columns=None, where=None, limit=-1): @@ -260,140 +231,57 @@ def build_rules_query(records, columns=None, where=None, limit=-1): for get_join in [get_collection_json_join, get_providers_join]: joins.append(get_join(columns, where, records)) - query = sql.SQL( - ''' - SELECT {} - FROM {} - {} - {} - {} - ''' - ).format( - sql.SQL(columns if columns else '*'), - sql.Identifier(records), - sql.SQL(' ').join(joins), - build_where(where), - sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('') - ) - - return query - -def build_collections_query(records, columns=None, where=None, limit=-1): - query = sql.SQL( - ''' - SELECT {} - FROM {} - {} - {} - ''' - ).format( - sql.SQL(columns if columns else '*'), - sql.Identifier(records), - build_where(where), - sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('') - ) + joins = sql.SQL(' ').join(joins) - return query + return joins def build_executions_query(records, columns=None, where=None, limit=-1): joins = [] for get_join in [get_async_join, get_collection_id_join, get_executions_join]: joins.append(get_join(columns, where, records)) - query = sql.SQL( - ''' - SELECT {} - FROM {} - {} - {} - {} - ''' - ).format( - # sql.SQL(', ').join([sql.Identifier(column) for column in columns]) if columns else sql.SQL('*'), - sql.SQL(columns if columns else '*'), - sql.Identifier(records), - sql.SQL(' ').join(joins), - build_where(where), - sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('') - ) - - return query + joins = sql.SQL(' ').join(joins) -def build_providers_query(table, columns=None, where=None, limit=-1): - query = sql.SQL( - ''' - SELECT {} - FROM {} - {} - {} - ''' - ).format( - # sql.SQL(', ').join([sql.Identifier(column) for column in columns]) if columns else sql.SQL('*'), - sql.SQL(columns if columns else '*'), - sql.Identifier(table), - build_where(where), - sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('') - ) - - return query + return joins def build_pdrs_query(records, columns=None, where=None, limit=-1): joins = [] for get_join in [get_collection_id_join, get_providers_join, get_executions_join]: joins.append(get_join(columns, where, records)) - query = sql.SQL( - ''' - SELECT {} - FROM {} - {} - {} - {} - ''' - ).format( - # sql.SQL(', ').join([sql.Identifier(column) for column in columns]) if columns else sql.SQL('*'), - sql.SQL(columns if columns else '*'), - sql.Identifier(records), - sql.SQL(' ').join(joins), - build_where(where), - sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('') - ) + joins = sql.SQL(' ').join(joins) - return query + return joins + +def build_query_new(records, columns=None, where=None, limit=0): + if not columns: + columns = '*' + + joins_switch = { + 'granules': build_granules_query, + 'rules': build_rules_query, + 'executions': build_executions_query, + 'pdrs': build_pdrs_query + } -def build_async_query(table, columns=None, where=None, limit=-1): query = sql.SQL( ''' SELECT {} FROM {} {} {} + {} ''' ).format( - # sql.SQL(', ').join([sql.Identifier(column) for column in columns]) if columns else sql.SQL('*'), sql.SQL(columns if columns else '*'), - sql.Identifier(table), + sql.Identifier(records), + joins_switch.get(records, sql.SQL(''))(records, columns, where), build_where(where), sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('') ) return query - -def build_query_new(records, columns=None, where=None, limit=0): - if not columns: - columns = '*' - - switch = { - 'granules': build_granules_query, - 'rules': build_rules_query, - 'collections': build_collections_query, - 'executions': build_executions_query, - 'providers': build_providers_query, - 'pdrs': build_pdrs_query, - 'async_operations': build_async_query - } - - return switch.get(records)(records, columns, where, limit) + # return switch.get(records)(records, columns, where, limit) def main(event, context): rds_config = event.get('rds_config') @@ -409,7 +297,7 @@ def main(event, context): with psycopg2.connect(**get_db_params()) as db_conn: with db_conn.cursor(name='rds-cursor') as curs: curs.itersize = event.get('size', 10000) - print(query.as_string(curs)) # Uncomment when troubleshooting queries + # print(query.as_string(curs)) # Uncomment when troubleshooting queries # print(curs.mogrify(query, vars)) curs.execute(query=query) From 84f73c4a17f241b8598f1e0fe917497a485d0b34 Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Tue, 11 Mar 2025 12:48:22 -0500 Subject: [PATCH 3/4] - Fixing issue with missing comma --- task/main.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/task/main.py b/task/main.py index 9befdc6..f7c0ab7 100644 --- a/task/main.py +++ b/task/main.py @@ -55,6 +55,8 @@ def handle_row(self, row, column_description): def upload_part(self, body_string): if not self.s3_parts: body_string = f'[{body_string}' + else: + body_string = f', {body_string}' part_number_dict = {'PartNumber': len(self.s3_parts) + 1} mpu_upload_dict = {**part_number_dict, **self.s3_mpu_dict} mpu_upload_dict.update({'Body': body_string.encode()}) From 4db41efa550654d5c16527b9e2a9aead7eff1680 Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Tue, 11 Mar 2025 13:15:23 -0500 Subject: [PATCH 4/4] - Updaing readme --- readme.md | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/readme.md b/readme.md index 6e58a67..0802500 100644 --- a/readme.md +++ b/readme.md @@ -48,17 +48,19 @@ Below is an example AWS lambda test event that shows the format of the event tha "is_test": true, "rds_config": { "records": "", - "columns": [], + "columns": "", "where": "", - "limit": 0 + "limit": -1 } } ``` - `rds_config`: Block required to contain the query items. - - `records`: The Cumulus database table name to get records for. - - `columns`: The columns to request from the database. This will default to `*` if nothing is provided. - - `where`: A Postgresql compliant where clause. - - `limit`: The number of records to return. 0 means return all records that match the query and will default to 100 if not provided. + - `records`: The Cumulus database table name to get records for (providers, collections, rules, granules, executions, async_operations, pdrs). + - `columns`: The columns to request from the database `"column_1, column_2"`. This will default to `*` if nothing is provided. + - `where`: A Postgresql compliant where clause: + - `"granule_id LIKE '%value' AND collection_id = 'value'"` + - `"collection_id='rssmif17d3d___7' and status IN('failed', 'queued', 'running') and published = true"`. + - `limit`: The number of records to return. `-1` means return all records that match the query and will default to 100 if not provided. - `is_test`: If true, the code will not be run as a `cumulus_task` and the input event will not go through the CMA. The `columns`, `where`, and `limit` keys are optional.