Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 76 additions & 10 deletions socs/agents/suprsync/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import os
import subprocess
import time
import traceback

import sqlalchemy
import txaio
from ocs import ocs_agent, site_config

Expand Down Expand Up @@ -111,11 +113,14 @@ def run(self, session, params=None):
"iterations": 1,
"copies": 0,
"errors_timeout": 0,
"errors_nonzero": 0
"errors_nonzero": 0,
"errors_sqlite": 0
},
}
"""

# DB: These two objects don't establish a DB session on creation, just
# within their respective methods.
srfm = SupRsyncFilesManager(
self.db_path, create_all=True, echo=self.db_echo,
pool_size=self.db_pool_size, max_overflow=self.db_pool_max_overflow
Expand All @@ -136,6 +141,7 @@ def run(self, session, params=None):
'copies': 0,
'errors_timeout': 0,
'errors_nonzero': 0,
'errors_sqlite': 0,
}

session.data = {
Expand All @@ -155,11 +161,14 @@ def run(self, session, params=None):

op = {'start_time': time.time()}

# Copy files to remote, check remote md5sums.
try:
session.data['activity'] = 'copying'
# DB: Begins a session, queries, and writes when done.
op['files'] = handler.copy_files(max_copy_attempts=self.max_copy_attempts,
num_files=self.files_per_batch)
counters['copies'] += len(op['files'])
session.degraded = False
except subprocess.TimeoutExpired as e:
self.log.error("Timeout when copying files! {e}", e=e)
op['error'] = 'timed out'
Expand All @@ -168,19 +177,53 @@ def run(self, session, params=None):
self.log.error("rsync returned non-zero exit code! {e}", e=e)
op['error'] = 'nonzero exit'
counters['errors_nonzero'] += 1
except sqlalchemy.exc.OperationalError as e: # database is locked
session.degraded = True
counters['errors_sqlite'] += 1
self.log.warn(f"Operational error: {e}")
self.log.debug("{e}", e=traceback.format_exc())
self.log.info("Waiting 5 seconds before continuing...")
time.sleep(5)
continue

now = time.time()

# Record 5-digit timecode dirs in DB. (Doesn't actually create any
# directories on disk.)
if now - last_tcdir_update > tcdir_update_interval:
# add timecode-dirs for all files from the last week
self.log.info("Creating timecode dirs for recent files.....")
srfm.create_all_timecode_dirs(
self.archive_name, min_ctime=now - (7 * 24 * 3600)
)
self.log.info("Finished creating tcdirs")
last_tcdir_update = now
try:
# DB: Queries w/occasional writes when a new dir is found.
srfm.create_all_timecode_dirs(
self.archive_name, min_ctime=now - (7 * 24 * 3600)
)
self.log.info("Finished creating tcdirs")
last_tcdir_update = now
session.degraded = False
except sqlalchemy.exc.OperationalError as e: # database is locked
session.degraded = True
counters['errors_sqlite'] += 1
self.log.warn(f"Operational error: {e}")
self.log.debug("{e}", e=traceback.format_exc())
self.log.info("Waiting 5 seconds before continuing...")
time.sleep(5)
continue

# Compute archive statistics.
try:
# DB: Query only.
archive_stats = srfm.get_archive_stats(self.archive_name)
session.degraded = False
except sqlalchemy.exc.OperationalError as e: # database is locked
session.degraded = True
counters['errors_sqlite'] += 1
self.log.warn(f"Operational error: {e}")
self.log.debug("{e}", e=traceback.format_exc())
self.log.info("Waiting 5 seconds before continuing...")
time.sleep(5)
continue

archive_stats = srfm.get_archive_stats(self.archive_name)
if archive_stats is not None:
self.agent.publish_to_feed('archive_stats', {
'block_name': self.archive_name,
Expand All @@ -200,13 +243,36 @@ def run(self, session, params=None):
'data': counters})
next_feed_update = now + 10 * 60

# Delete transferred files from disk after specified time.
if self.delete_after is not None:
session.data['activity'] = 'deleting'
handler.delete_files(self.delete_after)
try:
# DB: Begins a session, queries, and writes.
handler.delete_files(self.delete_after)
session.degraded = False
except sqlalchemy.exc.OperationalError as e: # database is locked
session.degraded = True
counters['errors_sqlite'] += 1
self.log.warn(f"Operational error: {e}")
self.log.debug("{e}", e=traceback.format_exc())
self.log.info("Waiting 5 seconds before continuing...")
time.sleep(5)
continue

# After handling files, update the timecode dirs
srfm.update_all_timecode_dirs(
self.archive_name, self.suprsync_file_root, self.instance_id)
try:
# DB: Mostly queries, w/occasional writes.
srfm.update_all_timecode_dirs(
self.archive_name, self.suprsync_file_root, self.instance_id)
session.degraded = False
except sqlalchemy.exc.OperationalError as e: # database is locked
session.degraded = True
counters['errors_sqlite'] += 1
self.log.warn(f"Operational error: {e}")
self.log.debug("{e}", e=traceback.format_exc())
self.log.info("Waiting 5 seconds before continuing...")
time.sleep(5)
continue

session.data['activity'] = 'idle'
time.sleep(self.sleep_time)
Expand Down
1 change: 1 addition & 0 deletions socs/db/suprsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ def copy_files(self, max_copy_attempts=None, num_files=None):

subprocess.run(cmd, check=True, timeout=self.copy_timeout)

# Mark all files as 'copied' to remote.
for file in files:
file.copied = time.time()

Expand Down