diff --git a/socs/agents/suprsync/agent.py b/socs/agents/suprsync/agent.py index 15fb96912..62ebe0ebd 100644 --- a/socs/agents/suprsync/agent.py +++ b/socs/agents/suprsync/agent.py @@ -2,7 +2,9 @@ import os import subprocess import time +import traceback +import sqlalchemy import txaio from ocs import ocs_agent, site_config @@ -111,11 +113,14 @@ def run(self, session, params=None): "iterations": 1, "copies": 0, "errors_timeout": 0, - "errors_nonzero": 0 + "errors_nonzero": 0, + "errors_sqlite": 0 }, } """ + # DB: These two objects don't establish a DB session on creation, just + # within their respective methods. srfm = SupRsyncFilesManager( self.db_path, create_all=True, echo=self.db_echo, pool_size=self.db_pool_size, max_overflow=self.db_pool_max_overflow @@ -136,6 +141,7 @@ def run(self, session, params=None): 'copies': 0, 'errors_timeout': 0, 'errors_nonzero': 0, + 'errors_sqlite': 0, } session.data = { @@ -155,11 +161,14 @@ def run(self, session, params=None): op = {'start_time': time.time()} + # Copy files to remote, check remote md5sums. try: session.data['activity'] = 'copying' + # DB: Begins a session, queries, and writes when done. op['files'] = handler.copy_files(max_copy_attempts=self.max_copy_attempts, num_files=self.files_per_batch) counters['copies'] += len(op['files']) + session.degraded = False except subprocess.TimeoutExpired as e: self.log.error("Timeout when copying files! {e}", e=e) op['error'] = 'timed out' @@ -168,19 +177,53 @@ def run(self, session, params=None): self.log.error("rsync returned non-zero exit code! {e}", e=e) op['error'] = 'nonzero exit' counters['errors_nonzero'] += 1 + except sqlalchemy.exc.OperationalError as e: # database is locked + session.degraded = True + counters['errors_sqlite'] += 1 + self.log.warn(f"Operational error: {e}") + self.log.debug("{e}", e=traceback.format_exc()) + self.log.info("Waiting 5 seconds before continuing...") + time.sleep(5) + continue now = time.time() + # Record 5-digit timecode dirs in DB. (Doesn't actually create any + # directories on disk.) if now - last_tcdir_update > tcdir_update_interval: # add timecode-dirs for all files from the last week self.log.info("Creating timecode dirs for recent files.....") - srfm.create_all_timecode_dirs( - self.archive_name, min_ctime=now - (7 * 24 * 3600) - ) - self.log.info("Finished creating tcdirs") - last_tcdir_update = now + try: + # DB: Queries w/occasional writes when a new dir is found. + srfm.create_all_timecode_dirs( + self.archive_name, min_ctime=now - (7 * 24 * 3600) + ) + self.log.info("Finished creating tcdirs") + last_tcdir_update = now + session.degraded = False + except sqlalchemy.exc.OperationalError as e: # database is locked + session.degraded = True + counters['errors_sqlite'] += 1 + self.log.warn(f"Operational error: {e}") + self.log.debug("{e}", e=traceback.format_exc()) + self.log.info("Waiting 5 seconds before continuing...") + time.sleep(5) + continue + + # Compute archive statistics. + try: + # DB: Query only. + archive_stats = srfm.get_archive_stats(self.archive_name) + session.degraded = False + except sqlalchemy.exc.OperationalError as e: # database is locked + session.degraded = True + counters['errors_sqlite'] += 1 + self.log.warn(f"Operational error: {e}") + self.log.debug("{e}", e=traceback.format_exc()) + self.log.info("Waiting 5 seconds before continuing...") + time.sleep(5) + continue - archive_stats = srfm.get_archive_stats(self.archive_name) if archive_stats is not None: self.agent.publish_to_feed('archive_stats', { 'block_name': self.archive_name, @@ -200,13 +243,36 @@ def run(self, session, params=None): 'data': counters}) next_feed_update = now + 10 * 60 + # Delete transferred files from disk after specified time. if self.delete_after is not None: session.data['activity'] = 'deleting' - handler.delete_files(self.delete_after) + try: + # DB: Begins a session, queries, and writes. + handler.delete_files(self.delete_after) + session.degraded = False + except sqlalchemy.exc.OperationalError as e: # database is locked + session.degraded = True + counters['errors_sqlite'] += 1 + self.log.warn(f"Operational error: {e}") + self.log.debug("{e}", e=traceback.format_exc()) + self.log.info("Waiting 5 seconds before continuing...") + time.sleep(5) + continue # After handling files, update the timecode dirs - srfm.update_all_timecode_dirs( - self.archive_name, self.suprsync_file_root, self.instance_id) + try: + # DB: Mostly queries, w/occasional writes. + srfm.update_all_timecode_dirs( + self.archive_name, self.suprsync_file_root, self.instance_id) + session.degraded = False + except sqlalchemy.exc.OperationalError as e: # database is locked + session.degraded = True + counters['errors_sqlite'] += 1 + self.log.warn(f"Operational error: {e}") + self.log.debug("{e}", e=traceback.format_exc()) + self.log.info("Waiting 5 seconds before continuing...") + time.sleep(5) + continue session.data['activity'] = 'idle' time.sleep(self.sleep_time) diff --git a/socs/db/suprsync.py b/socs/db/suprsync.py index 2c99e7036..376f79b00 100644 --- a/socs/db/suprsync.py +++ b/socs/db/suprsync.py @@ -669,6 +669,7 @@ def copy_files(self, max_copy_attempts=None, num_files=None): subprocess.run(cmd, check=True, timeout=self.copy_timeout) + # Mark all files as 'copied' to remote. for file in files: file.copied = time.time()