From 61022ba054c8bdeb11af57181b5224fbffdc192d Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Wed, 21 Jan 2026 15:45:57 -0500 Subject: [PATCH 1/2] Add comments describing database interactions --- socs/agents/suprsync/agent.py | 12 ++++++++++++ socs/db/suprsync.py | 1 + 2 files changed, 13 insertions(+) diff --git a/socs/agents/suprsync/agent.py b/socs/agents/suprsync/agent.py index 15fb96912..a583c7f39 100644 --- a/socs/agents/suprsync/agent.py +++ b/socs/agents/suprsync/agent.py @@ -116,6 +116,8 @@ def run(self, session, params=None): } """ + # DB: These two objects don't establish a DB session on creation, just + # within their respective methods. srfm = SupRsyncFilesManager( self.db_path, create_all=True, echo=self.db_echo, pool_size=self.db_pool_size, max_overflow=self.db_pool_max_overflow @@ -155,8 +157,10 @@ def run(self, session, params=None): op = {'start_time': time.time()} + # Copy files to remote, check remote md5sums. try: session.data['activity'] = 'copying' + # DB: Begins a session, queries, and writes when done. op['files'] = handler.copy_files(max_copy_attempts=self.max_copy_attempts, num_files=self.files_per_batch) counters['copies'] += len(op['files']) @@ -171,15 +175,20 @@ def run(self, session, params=None): now = time.time() + # Record 5-digit timecode dirs in DB. (Doesn't actually create any + # directories on disk.) if now - last_tcdir_update > tcdir_update_interval: # add timecode-dirs for all files from the last week self.log.info("Creating timecode dirs for recent files.....") + # DB: Queries w/occasional writes when a new dir is found. srfm.create_all_timecode_dirs( self.archive_name, min_ctime=now - (7 * 24 * 3600) ) self.log.info("Finished creating tcdirs") last_tcdir_update = now + # Compute archive statistics. + # DB: Query only. archive_stats = srfm.get_archive_stats(self.archive_name) if archive_stats is not None: self.agent.publish_to_feed('archive_stats', { @@ -200,11 +209,14 @@ def run(self, session, params=None): 'data': counters}) next_feed_update = now + 10 * 60 + # Delete transferred files from disk after specified time. if self.delete_after is not None: session.data['activity'] = 'deleting' + # DB: Begins a session, queries, and writes. handler.delete_files(self.delete_after) # After handling files, update the timecode dirs + # DB: Mostly queries, w/occasional writes. srfm.update_all_timecode_dirs( self.archive_name, self.suprsync_file_root, self.instance_id) diff --git a/socs/db/suprsync.py b/socs/db/suprsync.py index 2c99e7036..376f79b00 100644 --- a/socs/db/suprsync.py +++ b/socs/db/suprsync.py @@ -669,6 +669,7 @@ def copy_files(self, max_copy_attempts=None, num_files=None): subprocess.run(cmd, check=True, timeout=self.copy_timeout) + # Mark all files as 'copied' to remote. for file in files: file.copied = time.time() From 288549e9736714e0af1b5100307d9955208e1661 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Wed, 21 Jan 2026 16:13:24 -0500 Subject: [PATCH 2/2] Add error handling for locked DB to all transactions --- socs/agents/suprsync/agent.py | 82 +++++++++++++++++++++++++++++------ 1 file changed, 68 insertions(+), 14 deletions(-) diff --git a/socs/agents/suprsync/agent.py b/socs/agents/suprsync/agent.py index a583c7f39..62ebe0ebd 100644 --- a/socs/agents/suprsync/agent.py +++ b/socs/agents/suprsync/agent.py @@ -2,7 +2,9 @@ import os import subprocess import time +import traceback +import sqlalchemy import txaio from ocs import ocs_agent, site_config @@ -111,7 +113,8 @@ def run(self, session, params=None): "iterations": 1, "copies": 0, "errors_timeout": 0, - "errors_nonzero": 0 + "errors_nonzero": 0, + "errors_sqlite": 0 }, } """ @@ -138,6 +141,7 @@ def run(self, session, params=None): 'copies': 0, 'errors_timeout': 0, 'errors_nonzero': 0, + 'errors_sqlite': 0, } session.data = { @@ -164,6 +168,7 @@ def run(self, session, params=None): op['files'] = handler.copy_files(max_copy_attempts=self.max_copy_attempts, num_files=self.files_per_batch) counters['copies'] += len(op['files']) + session.degraded = False except subprocess.TimeoutExpired as e: self.log.error("Timeout when copying files! {e}", e=e) op['error'] = 'timed out' @@ -172,6 +177,14 @@ def run(self, session, params=None): self.log.error("rsync returned non-zero exit code! {e}", e=e) op['error'] = 'nonzero exit' counters['errors_nonzero'] += 1 + except sqlalchemy.exc.OperationalError as e: # database is locked + session.degraded = True + counters['errors_sqlite'] += 1 + self.log.warn(f"Operational error: {e}") + self.log.debug("{e}", e=traceback.format_exc()) + self.log.info("Waiting 5 seconds before continuing...") + time.sleep(5) + continue now = time.time() @@ -180,16 +193,37 @@ def run(self, session, params=None): if now - last_tcdir_update > tcdir_update_interval: # add timecode-dirs for all files from the last week self.log.info("Creating timecode dirs for recent files.....") - # DB: Queries w/occasional writes when a new dir is found. - srfm.create_all_timecode_dirs( - self.archive_name, min_ctime=now - (7 * 24 * 3600) - ) - self.log.info("Finished creating tcdirs") - last_tcdir_update = now + try: + # DB: Queries w/occasional writes when a new dir is found. + srfm.create_all_timecode_dirs( + self.archive_name, min_ctime=now - (7 * 24 * 3600) + ) + self.log.info("Finished creating tcdirs") + last_tcdir_update = now + session.degraded = False + except sqlalchemy.exc.OperationalError as e: # database is locked + session.degraded = True + counters['errors_sqlite'] += 1 + self.log.warn(f"Operational error: {e}") + self.log.debug("{e}", e=traceback.format_exc()) + self.log.info("Waiting 5 seconds before continuing...") + time.sleep(5) + continue # Compute archive statistics. - # DB: Query only. - archive_stats = srfm.get_archive_stats(self.archive_name) + try: + # DB: Query only. + archive_stats = srfm.get_archive_stats(self.archive_name) + session.degraded = False + except sqlalchemy.exc.OperationalError as e: # database is locked + session.degraded = True + counters['errors_sqlite'] += 1 + self.log.warn(f"Operational error: {e}") + self.log.debug("{e}", e=traceback.format_exc()) + self.log.info("Waiting 5 seconds before continuing...") + time.sleep(5) + continue + if archive_stats is not None: self.agent.publish_to_feed('archive_stats', { 'block_name': self.archive_name, @@ -212,13 +246,33 @@ def run(self, session, params=None): # Delete transferred files from disk after specified time. if self.delete_after is not None: session.data['activity'] = 'deleting' - # DB: Begins a session, queries, and writes. - handler.delete_files(self.delete_after) + try: + # DB: Begins a session, queries, and writes. + handler.delete_files(self.delete_after) + session.degraded = False + except sqlalchemy.exc.OperationalError as e: # database is locked + session.degraded = True + counters['errors_sqlite'] += 1 + self.log.warn(f"Operational error: {e}") + self.log.debug("{e}", e=traceback.format_exc()) + self.log.info("Waiting 5 seconds before continuing...") + time.sleep(5) + continue # After handling files, update the timecode dirs - # DB: Mostly queries, w/occasional writes. - srfm.update_all_timecode_dirs( - self.archive_name, self.suprsync_file_root, self.instance_id) + try: + # DB: Mostly queries, w/occasional writes. + srfm.update_all_timecode_dirs( + self.archive_name, self.suprsync_file_root, self.instance_id) + session.degraded = False + except sqlalchemy.exc.OperationalError as e: # database is locked + session.degraded = True + counters['errors_sqlite'] += 1 + self.log.warn(f"Operational error: {e}") + self.log.debug("{e}", e=traceback.format_exc()) + self.log.info("Waiting 5 seconds before continuing...") + time.sleep(5) + continue session.data['activity'] = 'idle' time.sleep(self.sleep_time)