Skip to content

Commit a0abfde

Browse files
committed
- Full record retrieval and smart joins now implemented.
1 parent 69cbfac commit a0abfde

2 files changed

Lines changed: 28 additions & 137 deletions

File tree

task/api_model.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11

22
async_operations_db_columns = (
3+
'async_operation_id',
34
'cumulus_id',
45
'id',
56
'description',
@@ -31,6 +32,7 @@
3132
)
3233

3334
executions_db_columns = (
35+
'execution',
3436
'cumulus_id',
3537
'arn',
3638
'async_operation_cumulus_id',
@@ -110,6 +112,7 @@
110112

111113
providers_db_columns = (
112114
# 'cumulus_id',
115+
'id',
113116
'name',
114117
'protocol',
115118
'host',

task/main.py

Lines changed: 25 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,6 @@
99
import psycopg2
1010
from psycopg2 import sql
1111

12-
# CUMULUS_DB_COLUMNS = (
13-
# 'cumulus_id', 'granule_id', 'status', 'collection_cumulus_id', 'created_at', 'updated_at', 'published', 'duration',
14-
# 'time_to_archive', 'time_to_process', 'product_volume', 'error', 'cmr_link', 'pdr_cumulus_id',
15-
# 'provider_cumulus_id', 'beginning_date_time', 'ending_date_time', 'last_update_date_time',
16-
# 'processing_end_date_time', 'processing_start_date_time', 'production_date_time', 'query_fields', 'timestamp',
17-
# 'cumulus_id', 'name', 'version', 'sample_file_name', 'granule_id_validation_regex', 'granule_id_extraction_regex',
18-
# 'files', 'process', 'url_path', 'duplicate_handling', 'report_to_ems', 'ignore_files_config_for_discovery', 'meta',
19-
# 'tags', 'created_at', 'updated_at', 'collection_id'
20-
# )
21-
22-
# CUMULUS_DB_TABLES = (
23-
# 'granules', 'collections', 'rules', 'files', 'granules_executions', 'executions', 'async_operations', 'providers', 'pdrs'
24-
# )
2512

2613
def get_db_params():
2714
sm = boto3.client('secretsmanager')
@@ -141,7 +128,7 @@ def get_async_join(columns, where, right_table):
141128
collections_join = sql.SQL(
142129
'''
143130
LEFT JOIN (
144-
SELECT cumulus_id, id
131+
SELECT cumulus_id, id AS async_operation_id
145132
FROM async_operations
146133
) AS async_operations ON async_operations.cumulus_id={}
147134
'''
@@ -163,8 +150,6 @@ def get_collection_json_join(columns, where, right_table):
163150

164151
return collections_join
165152

166-
167-
168153
def get_collection_id_join(columns, where, right_table):
169154
collections_join = sql.SQL('')
170155
if join_check(columns, where, collections_db_columns):
@@ -236,164 +221,67 @@ def build_granules_query(records, columns, where='', limit=-1):
236221
for get_join in [get_collection_id_join, get_executions_join, get_files_array_join, get_providers_join]:
237222
joins.append(get_join(columns, where, records))
238223

239-
query = sql.SQL(
240-
'''
241-
SELECT {}
242-
FROM {}
243-
{}
244-
{}
245-
{}
246-
'''
247-
).format(
248-
sql.SQL(columns if columns else '*'),
249-
sql.Identifier(records),
250-
sql.SQL(' ').join(joins),
251-
build_where(where),
252-
sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('')
253-
)
224+
joins = sql.SQL(' ').join(joins)
254225

255-
return query
226+
return joins
256227

257228

258229
def build_rules_query(records, columns=None, where=None, limit=-1):
259230
joins = []
260231
for get_join in [get_collection_json_join, get_providers_join]:
261232
joins.append(get_join(columns, where, records))
262233

263-
query = sql.SQL(
264-
'''
265-
SELECT {}
266-
FROM {}
267-
{}
268-
{}
269-
{}
270-
'''
271-
).format(
272-
sql.SQL(columns if columns else '*'),
273-
sql.Identifier(records),
274-
sql.SQL(' ').join(joins),
275-
build_where(where),
276-
sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('')
277-
)
278-
279-
return query
280-
281-
def build_collections_query(records, columns=None, where=None, limit=-1):
282-
query = sql.SQL(
283-
'''
284-
SELECT {}
285-
FROM {}
286-
{}
287-
{}
288-
'''
289-
).format(
290-
sql.SQL(columns if columns else '*'),
291-
sql.Identifier(records),
292-
build_where(where),
293-
sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('')
294-
)
234+
joins = sql.SQL(' ').join(joins)
295235

296-
return query
236+
return joins
297237

298238
def build_executions_query(records, columns=None, where=None, limit=-1):
299239
joins = []
300240
for get_join in [get_async_join, get_collection_id_join, get_executions_join]:
301241
joins.append(get_join(columns, where, records))
302242

303-
query = sql.SQL(
304-
'''
305-
SELECT {}
306-
FROM {}
307-
{}
308-
{}
309-
{}
310-
'''
311-
).format(
312-
# sql.SQL(', ').join([sql.Identifier(column) for column in columns]) if columns else sql.SQL('*'),
313-
sql.SQL(columns if columns else '*'),
314-
sql.Identifier(records),
315-
sql.SQL(' ').join(joins),
316-
build_where(where),
317-
sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('')
318-
)
319-
320-
return query
243+
joins = sql.SQL(' ').join(joins)
321244

322-
def build_providers_query(table, columns=None, where=None, limit=-1):
323-
query = sql.SQL(
324-
'''
325-
SELECT {}
326-
FROM {}
327-
{}
328-
{}
329-
'''
330-
).format(
331-
# sql.SQL(', ').join([sql.Identifier(column) for column in columns]) if columns else sql.SQL('*'),
332-
sql.SQL(columns if columns else '*'),
333-
sql.Identifier(table),
334-
build_where(where),
335-
sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('')
336-
)
337-
338-
return query
245+
return joins
339246

340247
def build_pdrs_query(records, columns=None, where=None, limit=-1):
341248
joins = []
342249
for get_join in [get_collection_id_join, get_providers_join, get_executions_join]:
343250
joins.append(get_join(columns, where, records))
344251

345-
query = sql.SQL(
346-
'''
347-
SELECT {}
348-
FROM {}
349-
{}
350-
{}
351-
{}
352-
'''
353-
).format(
354-
# sql.SQL(', ').join([sql.Identifier(column) for column in columns]) if columns else sql.SQL('*'),
355-
sql.SQL(columns if columns else '*'),
356-
sql.Identifier(records),
357-
sql.SQL(' ').join(joins),
358-
build_where(where),
359-
sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('')
360-
)
252+
joins = sql.SQL(' ').join(joins)
361253

362-
return query
254+
return joins
255+
256+
def build_query_new(records, columns=None, where=None, limit=0):
257+
if not columns:
258+
columns = '*'
259+
260+
joins_switch = {
261+
'granules': build_granules_query,
262+
'rules': build_rules_query,
263+
'executions': build_executions_query,
264+
'pdrs': build_pdrs_query
265+
}
363266

364-
def build_async_query(table, columns=None, where=None, limit=-1):
365267
query = sql.SQL(
366268
'''
367269
SELECT {}
368270
FROM {}
369271
{}
370272
{}
273+
{}
371274
'''
372275
).format(
373-
# sql.SQL(', ').join([sql.Identifier(column) for column in columns]) if columns else sql.SQL('*'),
374276
sql.SQL(columns if columns else '*'),
375-
sql.Identifier(table),
277+
sql.Identifier(records),
278+
joins_switch.get(records, sql.SQL(''))(records, columns, where),
376279
build_where(where),
377280
sql.SQL('LIMIT {}').format(sql.SQL(str(limit))) if limit >= 0 else sql.SQL('')
378281
)
379282

380283
return query
381-
382-
def build_query_new(records, columns=None, where=None, limit=0):
383-
if not columns:
384-
columns = '*'
385-
386-
switch = {
387-
'granules': build_granules_query,
388-
'rules': build_rules_query,
389-
'collections': build_collections_query,
390-
'executions': build_executions_query,
391-
'providers': build_providers_query,
392-
'pdrs': build_pdrs_query,
393-
'async_operations': build_async_query
394-
}
395-
396-
return switch.get(records)(records, columns, where, limit)
284+
# return switch.get(records)(records, columns, where, limit)
397285

398286
def main(event, context):
399287
rds_config = event.get('rds_config')
@@ -409,7 +297,7 @@ def main(event, context):
409297
with psycopg2.connect(**get_db_params()) as db_conn:
410298
with db_conn.cursor(name='rds-cursor') as curs:
411299
curs.itersize = event.get('size', 10000)
412-
print(query.as_string(curs)) # Uncomment when troubleshooting queries
300+
# print(query.as_string(curs)) # Uncomment when troubleshooting queries
413301
# print(curs.mogrify(query, vars))
414302
curs.execute(query=query)
415303

0 commit comments

Comments
 (0)