From 6dbe02c67c16becdfab812a2aa461f1cc60b64e3 Mon Sep 17 00:00:00 2001 From: Nestor Rodriguez Date: Wed, 1 Apr 2026 11:40:12 +0200 Subject: [PATCH] feat: [document-compression-updater]- Improvements: Auto-remove dummy compression field, drop tracker collection and more - Add connection retry logic and startup validation - Fix UnboundLocalError on empty collection in setup() - Replace bare print() calls with printLog() throughout - Add per-batch progress bar, elapsed time, rate, and ETA logging - Auto-remove dummy compression field after each batch via $unset - Add --skip-cleanup and --append-log flags - Drop tracker collection on successful completion - Remove dead multiprocessing queue code and unused imports --- .../document-compression-updater/README.md | 16 +- .../update_apply_compression.py | 402 +++++++++++------- 2 files changed, 260 insertions(+), 158 deletions(-) diff --git a/operations/document-compression-updater/README.md b/operations/document-compression-updater/README.md index 9601d5e..ab926d5 100644 --- a/operations/document-compression-updater/README.md +++ b/operations/document-compression-updater/README.md @@ -1,13 +1,15 @@ -# Python Updater tool -This sample applications compresses pre-existing documents in an existing collection after compression is turned on that existing collection. +# Python Updater tool +This sample application compresses pre-existing documents in an existing collection after compression is turned on that existing collection. Single threaded application - issues **5000** (controlled by argument --batch-size) updates serially in a _round_, and sleeps for **60** (controlled by argument --wait-period) seconds before starting next _round_. -Status of the updates are maintained in database **tracker_db** - for each collection there is a tracker collection named **<< collection >>__tracker_col**. +After each batch, the temporary dummy field used to trigger compression is automatically removed from all updated documents. Use `--skip-cleanup` to disable this behaviour. -The application can be restarted if it crashes and it will pick up from last successful _round_ based on data in **<< collection >>__tracker_col**. +Status of the updates are maintained in database **tracker_db** - for each collection there is a tracker collection named **<< collection >>__tracker_col**. Each tracker entry includes a `cleanupComplete` flag indicating whether the dummy field was removed for that batch. -The update statements use field **6nh63** (controlled by argument --update-field), for triggering compression on existing records. +The application can be restarted if it crashes and it will pick up from last successful _round_ based on data in **<< collection >>__tracker_col**. On successful completion the tracker collection is automatically dropped, as it is no longer needed. + +The update statements use field **6nh63** (controlled by argument --update-field), for triggering compression on existing records. This field is removed from each document after compression is applied unless `--skip-cleanup` is set. The application uses **_id** field for tracking and updating existing documents. If you are using a custom value _id, the value should be sort-able. @@ -24,7 +26,7 @@ cd amazon-documentdb-tools/operations/document-compression-updater ## Usage/Examples ``` - python3 update_apply_compression.py --uri "<>" --database <> --collection <> --update-field << field_name >> --wait-period << int >>> --batch-size << int >> + python3 update_apply_compression.py --uri "<>" --database <> --collection <> --update-field << field_name >> --wait-period << int >> --batch-size << int >> ``` The application has the following arguments: @@ -40,4 +42,6 @@ Optional parameters --update-field Field used for updating an existing document. This should not conflict with any fieldname you are already using --wait-period Number of seconds to wait between each batch --batch-size Number of documents to update in a single batch + --append-log Append to existing log file instead of overwriting it on startup + --skip-cleanup Skip removing the dummy field after each batch (leaves update field permanently on documents) ``` diff --git a/operations/document-compression-updater/update_apply_compression.py b/operations/document-compression-updater/update_apply_compression.py index cefd1c4..86cdbe4 100644 --- a/operations/document-compression-updater/update_apply_compression.py +++ b/operations/document-compression-updater/update_apply_compression.py @@ -1,231 +1,329 @@ import datetime import sys -import random import json import pymongo import time -import threading import os import multiprocessing as mp import argparse -import string import math +MAX_RETRIES = 3 +RETRY_DELAY = 5 # seconds between retry attempts + def deleteLog(appConfig): if os.path.exists(appConfig['logFileName']): os.remove(appConfig['logFileName']) -def printLog(thisMessage,appConfig): +def printLog(thisMessage, appConfig): print("{}".format(thisMessage)) with open(appConfig['logFileName'], 'a') as fp: fp.write("{}\n".format(thisMessage)) +def get_mongo_client(uri, appConfig=None): + """Create a MongoClient, retrying up to MAX_RETRIES times on failure.""" + def log(msg): + if appConfig: + printLog(msg, appConfig) + else: + print(msg) + + for attempt in range(1, MAX_RETRIES + 1): + try: + client = pymongo.MongoClient(host=uri, appname='compupd', serverSelectionTimeoutMS=5000) + client.admin.command('ping') + return client + except pymongo.errors.PyMongoError as e: + if attempt < MAX_RETRIES: + log("Connection attempt {} failed: {}. Retrying in {} seconds...".format(attempt, e, RETRY_DELAY)) + time.sleep(RETRY_DELAY) + else: + raise + +def validate_connection(appConfig): + """Validate that the URI is reachable and that the target database/collection exist.""" + try: + client = get_mongo_client(appConfig['uri']) + except pymongo.errors.PyMongoError as e: + sys.exit("Error: Unable to connect to DocumentDB: {}".format(e)) + + try: + db_names = client.list_database_names() + if appConfig['databaseName'] not in db_names: + sys.exit("Error: Database '{}' does not exist.".format(appConfig['databaseName'])) + + col_names = client[appConfig['databaseName']].list_collection_names() + if appConfig['collectionName'] not in col_names: + sys.exit("Error: Collection '{}' does not exist in database '{}'.".format( + appConfig['collectionName'], appConfig['databaseName'])) + finally: + client.close() + def setup(appConfig): - if sys.version_info < (3,7): + if sys.version_info < (3, 7): sys.exit('Sorry, Python < 3.7 is not supported') databaseName = appConfig['databaseName'] collectionName = appConfig['collectionName'] - client = pymongo.MongoClient(host=appConfig['uri'],appname='compupd') - - # database and collection for compression + try: + client = get_mongo_client(appConfig['uri']) + except pymongo.errors.PyMongoError as e: + sys.exit("Error connecting during setup: {}".format(e)) db = client[databaseName] - adminDb = client['admin'] col = db[collectionName] - - # database and collection for tracking - - tracker_db=client['tracker_db'] - trackerCollectionName = databaseName+'_'+collectionName+'_tracker_col' - tracker_col=tracker_db[trackerCollectionName] - - list_of_collections = tracker_db.list_collection_names() # Return a list of collections in 'tracker_db' - print("list_of_collections {}".format(list_of_collections)) - - if trackerCollectionName in list_of_collections : - - # tracker db already has entry for collection - - result = tracker_col.find({}).sort({ "_id" : -1}).limit(1) - - for lastEntry in result : - numExistingDocuments = lastEntry["numExistingDocuments"] - maxObjectIdToTouch = lastEntry["maxObjectIdToTouch"] - lastScannedObjectId = lastEntry["lastScannedObjectId"] - numDocumentsUpdated = lastEntry["numDocumentsUpdated"] - print("Found existing record: {}".format(str(lastEntry))) - - else : - - # create first entry in tracker db for collection - result = col.find({},{ "_id" :1}).sort({ "_id" :-1}).limit(1) - - for id in result : - print("result {}".format(result)) - maxObjectIdToTouch = id["_id"] - - lastScannedObjectId = 0 - numDocumentsUpdated = 0 - numExistingDocuments = col.estimated_document_count() - - first_entry = { - "collection_name": appConfig['collectionName'], - "lastScannedObjectId" : lastScannedObjectId, - "ts": datetime.datetime.now(tz=datetime.timezone.utc), - "maxObjectIdToTouch" : maxObjectIdToTouch, - "numExistingDocuments" : numExistingDocuments, - "numDocumentsUpdated" : numDocumentsUpdated - # scan fields in future, for now we use _id - } - tracker_col.insert_one(first_entry) - - printLog("create first entry in tracker db for collection {}".format(first_entry),appConfig) - - client.close() - returnData = {} - returnData["numExistingDocuments"] = numExistingDocuments - returnData["maxObjectIdToTouch"] = maxObjectIdToTouch - returnData["lastScannedObjectId"] = lastScannedObjectId - returnData["numDocumentsUpdated"] = numDocumentsUpdated - - return returnData + tracker_db = client['tracker_db'] + trackerCollectionName = databaseName + '_' + collectionName + '_tracker_col' + tracker_col = tracker_db[trackerCollectionName] + + list_of_collections = tracker_db.list_collection_names() + printLog("list_of_collections {}".format(list_of_collections), appConfig) + + try: + if trackerCollectionName in list_of_collections: + result = tracker_col.find({}).sort({"_id": -1}).limit(1) + for lastEntry in result: + numExistingDocuments = lastEntry["numExistingDocuments"] + maxObjectIdToTouch = lastEntry["maxObjectIdToTouch"] + lastScannedObjectId = lastEntry["lastScannedObjectId"] + numDocumentsUpdated = lastEntry["numDocumentsUpdated"] + printLog("Found existing record: {}".format(str(lastEntry)), appConfig) + + else: + result = col.find({}, {"_id": 1}).sort({"_id": -1}).limit(1) + + maxObjectIdToTouch = None + for doc in result: + maxObjectIdToTouch = doc["_id"] + + if maxObjectIdToTouch is None: + sys.exit("Error: Collection '{}' is empty, nothing to compress.".format(collectionName)) + + lastScannedObjectId = 0 + numDocumentsUpdated = 0 + numExistingDocuments = col.estimated_document_count() + + first_entry = { + "collection_name": appConfig['collectionName'], + "lastScannedObjectId": lastScannedObjectId, + "ts": datetime.datetime.now(tz=datetime.timezone.utc), + "maxObjectIdToTouch": maxObjectIdToTouch, + "numExistingDocuments": numExistingDocuments, + "numDocumentsUpdated": numDocumentsUpdated + } + tracker_col.insert_one(first_entry) + printLog("create first entry in tracker db for collection {}".format(first_entry), appConfig) -def task_worker(threadNum,perfQ,appConfig): - maxObjectIdToTouch = appConfig['maxObjectIdToTouch'] - lastScannedObjectId = appConfig['lastScannedObjectId'] - numInsertProcesses = appConfig['numInsertProcesses'] + except pymongo.errors.PyMongoError as e: + sys.exit("Error during setup: {}".format(e)) + finally: + client.close() + return { + "numExistingDocuments": numExistingDocuments, + "maxObjectIdToTouch": maxObjectIdToTouch, + "lastScannedObjectId": lastScannedObjectId, + "numDocumentsUpdated": numDocumentsUpdated + } + +def task_worker(threadNum, appConfig): numExistingDocuments = appConfig["numExistingDocuments"] maxObjectIdToTouch = appConfig["maxObjectIdToTouch"] lastScannedObjectId = appConfig["lastScannedObjectId"] numDocumentsUpdated = appConfig["numDocumentsUpdated"] - client = pymongo.MongoClient(appConfig['uri']) - myDatabaseName = appConfig['databaseName'] - db = client[myDatabaseName] myCollectionName = appConfig['collectionName'] + trackerCollectionName = myDatabaseName + '_' + myCollectionName + '_tracker_col' + + try: + client = get_mongo_client(appConfig['uri']) + except pymongo.errors.PyMongoError as e: + printLog("Fatal: could not connect in worker: {}".format(e), appConfig) + return + + db = client[myDatabaseName] col = db[myCollectionName] - tracker_db=client['tracker_db'] - trackerCollectionName = myDatabaseName+'_'+myCollectionName+'_tracker_col' - tracker_col=tracker_db[trackerCollectionName] - + tracker_db = client['tracker_db'] + tracker_col = tracker_db[trackerCollectionName] + allDone = False + completedSuccessfully = False tempLastScannedObjectId = lastScannedObjectId - + overall_start_time = time.time() + while not allDone: + try: + batch_start_time = time.time() + + if lastScannedObjectId != 0: + batch = col.find({"_id": {"$gt": lastScannedObjectId}}, {"_id": 1}).sort({"_id": 1}).limit(appConfig['batchSize']) + else: + batch = col.find({}, {"_id": 1}).sort({"_id": 1}).limit(appConfig['batchSize']) + + batch_count = 0 + updateList = [] + + for doc in batch: + if doc["_id"] <= maxObjectIdToTouch: + updateList.append(pymongo.UpdateOne({"_id": doc["_id"]}, {"$set": {appConfig['updateField']: 1}})) + tempLastScannedObjectId = doc["_id"] + batch_count += 1 + else: + allDone = True + completedSuccessfully = True + printLog("found id {} higher than maxObjectIdToTouch {}. all done. stopping".format( + str(doc["_id"]), str(maxObjectIdToTouch)), appConfig) + break + + if batch_count > 0: + col.bulk_write(updateList) + numDocumentsUpdated += batch_count - #start and go through all the docs using _id - - if lastScannedObjectId != 0 : - batch = col.find({"_id" : { "$gt" : lastScannedObjectId }},{ "_id" :1}).sort({"_id" :1}).limit(appConfig['batchSize']) - else : - batch = col.find({},{ "_id" :1}).sort({ "_id" :1}).limit(appConfig['batchSize']) - - batch_count = 0 - updateList = [] - - for id in batch : - if id["_id"]<=maxObjectIdToTouch: - # print("found id {} lesser than maxObjectIdToTouch {}.".format(str(id["_id"]),str(maxObjectIdToTouch))) - updateList.append(pymongo.UpdateOne({ "_id" : id["_id"] } , { "$set": { appConfig['updateField']: 1 } } )) - tempLastScannedObjectId = id["_id"] - batch_count = batch_count + 1 + if not appConfig['skipCleanup']: + cleanupList = [pymongo.UpdateOne({"_id": op._filter["_id"]}, {"$unset": {appConfig['updateField']: ""}}) for op in updateList] + col.bulk_write(cleanupList) + printLog("cleanup: removed dummy field from {:,} docs".format(batch_count), appConfig) + + batch_elapsed = time.time() - batch_start_time + overall_elapsed = time.time() - overall_start_time + progress_pct = (numDocumentsUpdated / numExistingDocuments * 100) if numExistingDocuments > 0 else 0 + docs_remaining = numExistingDocuments - numDocumentsUpdated + rate = numDocumentsUpdated / overall_elapsed if overall_elapsed > 0 else 0 + eta_seconds = int(docs_remaining / rate) if rate > 0 else 0 + eta_str = str(datetime.timedelta(seconds=eta_seconds)) + + tracker_entry = { + "collection_name": appConfig['collectionName'], + "lastScannedObjectId": tempLastScannedObjectId, + "date": datetime.datetime.now(tz=datetime.timezone.utc), + "maxObjectIdToTouch": maxObjectIdToTouch, + "numExistingDocuments": numExistingDocuments, + "numDocumentsUpdated": numDocumentsUpdated, + "cleanupComplete": not appConfig['skipCleanup'] + } + tracker_col.insert_one(tracker_entry) + + bar_width = 20 + filled = int(bar_width * progress_pct / 100) + bar = chr(9608) * filled + chr(9617) * (bar_width - filled) + printLog( + "[{}] {:.1f}% | {:,}/{:,} | batch: {:,} docs in {:.1f}s | rate: {:.0f} docs/s | ETA: {}".format( + bar, progress_pct, + numDocumentsUpdated, numExistingDocuments, + batch_count, batch_elapsed, + rate, eta_str), + appConfig) + + lastScannedObjectId = tempLastScannedObjectId + + printLog("sleeping for {} seconds".format(appConfig['waitPeriod']), appConfig) + time.sleep(appConfig['waitPeriod']) else: + printLog("No updates in batch", appConfig) allDone = True - print("found id {} higher than maxObjectIdToTouch {}. all done .stopping)".format(str(id["_id"]),str(maxObjectIdToTouch))) + completedSuccessfully = True break - - if batch_count > 0 : - result = col.bulk_write(updateList) - numDocumentsUpdated = numDocumentsUpdated + batch_count - - tracker_entry = { - "collection_name": appConfig['collectionName'], - "lastScannedObjectId" : tempLastScannedObjectId, - "date": datetime.datetime.now(tz=datetime.timezone.utc), - "maxObjectIdToTouch" : maxObjectIdToTouch, - "numExistingDocuments" : numExistingDocuments, - "numDocumentsUpdated" : numDocumentsUpdated - # scan fields in future, for now we use _id - } - tracker_col.insert_one(tracker_entry) - - printLog( " Last updates applied : {}".format(str(tracker_entry)),appConfig) - - lastScannedObjectId = tempLastScannedObjectId - - printLog("sleeping for {} seconds".format(appConfig['waitPeriod']),appConfig) - time.sleep(appConfig['waitPeriod']) - else : - print("No updates in batch") - allDone = True - break - + + except pymongo.errors.PyMongoError as e: + printLog("MongoDB error: {}. Retrying in {} seconds...".format(e, RETRY_DELAY), appConfig) + time.sleep(RETRY_DELAY) + try: + client.close() + except Exception: + pass + try: + client = get_mongo_client(appConfig['uri'], appConfig) + db = client[myDatabaseName] + col = db[myCollectionName] + tracker_db = client['tracker_db'] + tracker_col = tracker_db[trackerCollectionName] + except pymongo.errors.PyMongoError as reconnect_err: + printLog("Fatal: could not reconnect after error: {}".format(reconnect_err), appConfig) + break + + if completedSuccessfully: + overall_elapsed = time.time() - overall_start_time + printLog("completed | totalDocumentsUpdated: {:,} | elapsed: {} | cleanupComplete: {}".format( + numDocumentsUpdated, + str(datetime.timedelta(seconds=int(overall_elapsed))), + not appConfig['skipCleanup']), + appConfig) + try: + tracker_col.drop() + printLog("tracker collection '{}' dropped".format(trackerCollectionName), appConfig) + except pymongo.errors.PyMongoError as e: + printLog("Warning: could not drop tracker collection: {}".format(e), appConfig) + client.close() def main(): parser = argparse.ArgumentParser(description='Update and Apply Compression') - parser.add_argument('--uri',required=True,type=str,help='URI (connection string)') - parser.add_argument('--database',required=True,type=str,help='Database') - parser.add_argument('--collection',required=True,type=str,help='Collection') - parser.add_argument('--file-name',required=False,type=str,default='compressor',help='Starting name of the created log files') - parser.add_argument('--update-field',required=False,type=str,default='6nh63',help='Field used for updating an existing document. This should not conflict with any fieldname you are already using ') - parser.add_argument('--wait-period',required=False,type=int,default=60,help='Number of seconds to wait between each batch') - parser.add_argument('--batch-size',required=False,type=int,default=5000,help='Number of documents to update in a single batch') + parser.add_argument('--uri', required=True, type=str, help='URI (connection string)') + parser.add_argument('--database', required=True, type=str, help='Database') + parser.add_argument('--collection', required=True, type=str, help='Collection') + parser.add_argument('--file-name', required=False, type=str, default='compressor', help='Starting name of the created log files') + parser.add_argument('--update-field', required=False, type=str, default='6nh63', help='Field used for updating an existing document. This should not conflict with any fieldname you are already using') + parser.add_argument('--wait-period', required=False, type=int, default=60, help='Number of seconds to wait between each batch') + parser.add_argument('--batch-size', required=False, type=int, default=5000, help='Number of documents to update in a single batch') + parser.add_argument('--append-log', required=False, action='store_true', default=False, help='Append to existing log file instead of overwriting it on startup') + parser.add_argument('--skip-cleanup', required=False, action='store_true', default=False, help='Skip removing the dummy field after each batch (leaves update field permanently on documents)') args = parser.parse_args() - + appConfig = {} appConfig['uri'] = args.uri - appConfig['numInsertProcesses'] = 1 #int(args.processes) + appConfig['numInsertProcesses'] = 1 appConfig['databaseName'] = args.database appConfig['collectionName'] = args.collection appConfig['updateField'] = args.update_field appConfig['batchSize'] = int(args.batch_size) appConfig['waitPeriod'] = int(args.wait_period) appConfig['logFileName'] = "{}.log".format(args.file_name) + appConfig['appendLog'] = args.append_log + appConfig['skipCleanup'] = args.skip_cleanup + + validate_connection(appConfig) setUpdata = setup(appConfig) - - appConfig['numExistingDocuments'] = setUpdata["numExistingDocuments"] - appConfig['maxObjectIdToTouch'] = setUpdata["maxObjectIdToTouch"] - appConfig['lastScannedObjectId'] = setUpdata["lastScannedObjectId"] - appConfig['numDocumentsUpdated'] = setUpdata["numDocumentsUpdated"] - - deleteLog(appConfig) - - printLog('---------------------------------------------------------------------------------------',appConfig) + + appConfig['numExistingDocuments'] = setUpdata["numExistingDocuments"] + appConfig['maxObjectIdToTouch'] = setUpdata["maxObjectIdToTouch"] + appConfig['lastScannedObjectId'] = setUpdata["lastScannedObjectId"] + appConfig['numDocumentsUpdated'] = setUpdata["numDocumentsUpdated"] + + if not appConfig['appendLog']: + deleteLog(appConfig) + + printLog('---------------------------------------------------------------------------------------', appConfig) for thisKey in sorted(appConfig): - if (thisKey == 'uri'): + if thisKey == 'uri': thisUri = appConfig[thisKey] thisParsedUri = pymongo.uri_parser.parse_uri(thisUri) thisUsername = thisParsedUri['username'] thisPassword = thisParsedUri['password'] - thisUri = thisUri.replace(thisUsername,'') - thisUri = thisUri.replace(thisPassword,'') - printLog(" config | {} | {}".format(thisKey,thisUri),appConfig) + thisUri = thisUri.replace(thisUsername, '') + thisUri = thisUri.replace(thisPassword, '') + printLog(" config | {} | {}".format(thisKey, thisUri), appConfig) else: - printLog(" config | {} | {}".format(thisKey,appConfig[thisKey]),appConfig) - printLog('---------------------------------------------------------------------------------------',appConfig) - + printLog(" config | {} | {}".format(thisKey, appConfig[thisKey]), appConfig) + printLog('---------------------------------------------------------------------------------------', appConfig) + mp.set_start_method('spawn') - q = mp.Manager().Queue() processList = [] for loop in range(appConfig['numInsertProcesses']): - p = mp.Process(target=task_worker,args=(loop,q,appConfig)) + p = mp.Process(target=task_worker, args=(loop, appConfig)) processList.append(p) for process in processList: process.start() - + for process in processList: process.join() - - printLog("Created {} with results".format(appConfig['logFileName']),appConfig) + + printLog("Created {} with results".format(appConfig['logFileName']), appConfig) if __name__ == "__main__": main()