From 6a4443c68fae2d487cb450fcea47b462894cc3bd Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Thu, 7 Mar 2024 12:12:23 +0100 Subject: [PATCH 01/19] Improve the execute() function allowing it to take in stdin data. --- svnpubsub/util.py | 48 ++++++++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/svnpubsub/util.py b/svnpubsub/util.py index 9e05b44..530647b 100644 --- a/svnpubsub/util.py +++ b/svnpubsub/util.py @@ -14,53 +14,59 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import io import os import sys import logging -import subprocess as __subprocess +import subprocess +from subprocess import CalledProcessError # check_output() is only available in Python 2.7. Allow us to run with # earlier versions try: - __check_output = __subprocess.check_output + __check_output = subprocess.check_output def check_output(args, env=None, universal_newlines=False): return __check_output(args, shell=False, env=env, universal_newlines=universal_newlines) except AttributeError: def check_output(args, env=None, universal_newlines=False): # note: we only use these three args - pipe = __subprocess.Popen(args, shell=False, env=env, - stdout=__subprocess.PIPE, - universal_newlines=universal_newlines) + pipe = subprocess.Popen(args, shell=False, env=env, + stdout=subprocess.PIPE, + universal_newlines=universal_newlines) output, _ = pipe.communicate() if pipe.returncode: - raise __subprocess.CalledProcessError(pipe.returncode, args) + raise subprocess.CalledProcessError(pipe.returncode, args) return output -def execute(*args, text=True): - stdout = [] - stderr = [] +def execute(*args, text=True, env=None, stdin=None, throw=True): process = None arguments = [*args] + stdout = [] if text else bytes() + stderr = [] if text else bytes() logging.debug("Running: %s", " ".join(arguments)) + if hasattr(stdin, 'fileno'): + stdin = stdin.read() + try: - process = __subprocess.Popen(arguments, text=text, universal_newlines=text, - stdout=__subprocess.PIPE, stderr=__subprocess.PIPE) + process = subprocess.Popen(arguments, text=text, universal_newlines=text, env=env, + stdin=subprocess.PIPE if stdin is not None else None, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) if text: - for line in process.stdout.readlines(): + stdout_data, stderr_data = process.communicate(input=stdin) + for line in stdout_data.splitlines(): stdout.append(line.rstrip()) - for line in process.stderr.readlines(): - stderr.append(line.rstrip()) logging.debug(os.linesep.join(stdout)) - if process.returncode: - raise __subprocess.CalledProcessError(process.returncode, process.args, process.stdout, process.stderr) + for line in stderr_data.splitlines(): + stderr.append(line.rstrip()) + else: + stdout, stderr = process.communicate(input=stdin) + if process.returncode and throw: + raise CalledProcessError(process.returncode, process.args, process.stdout, process.stderr) except Exception: _, value, traceback = sys.exc_info() - if not text: - stderr.extend(line.decode('utf-8').rstrip() for line in process.stderr.readlines()) - raise RuntimeError(os.linesep.join(stderr)).with_traceback(traceback) - return process, os.linesep.join(stdout) if text else process.stdout.read(), os.linesep.join(stderr) if text else process.stderr.read() + raise RuntimeError(os.linesep.join(stderr) if text else stderr.decode('utf-8')).with_traceback(traceback) + return process, os.linesep.join(stdout) if text else stdout, os.linesep.join(stderr) if text else stderr From 7dbc867205ac0a524cbd2b2381b60f960e2142ee Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Thu, 7 Mar 2024 12:16:40 +0100 Subject: [PATCH 02/19] Use execute() to run commands and refactored the file fixing some PEP 8 warnings. --- svndumpsub.py | 162 +++++++++++++++++--------------------------------- 1 file changed, 56 insertions(+), 106 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index 7ddc8c0..a06940e 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -16,7 +16,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # - # # SvnDumpSub - Subscribe to a SvnPubSub stream, dumps commits, zips and uploads them to AWS S3. # @@ -26,17 +25,12 @@ # On startup SvnDumpSub starts listening to commits in all repositories. # -from io import BytesIO -import subprocess import threading import sys import stat import os -import tempfile -import re -import json -import socket import boto3 +import re import logging.handlers try: import queue @@ -47,34 +41,18 @@ import daemonize import svnpubsub.client import svnpubsub.util +from svnpubsub.util import execute +from io import BytesIO HOST = "127.0.0.1" PORT = 2069 -#Will not handle commits if repo starts with any name icluded in REPO_EXCLUDES +# Will not handle commits if repo starts with any name included in REPO_EXCLUDES REPO_EXCLUDES = ['demo', 'repo'] +# Start logging warnings if the work backlog reaches this many items +BACKLOG_TOO_HIGH = 500 s3client = boto3.client('s3') -assert hasattr(subprocess, 'check_call') - -def check_call(*args, **kwds): - """Wrapper around subprocess.check_call() that logs stderr upon failure, - with an optional list of exit codes to consider non-failure.""" - assert 'stderr' not in kwds - if '__okayexits' in kwds: - __okayexits = kwds['__okayexits'] - del kwds['__okayexits'] - else: - __okayexits = set([0]) # EXIT_SUCCESS - kwds.update(stderr=subprocess.PIPE) - pipe = subprocess.Popen(*args, **kwds) - output, errput = pipe.communicate() - if pipe.returncode not in __okayexits: - cmd = args[0] if len(args) else kwds.get('args', '(no command)') - logging.error('Command failed: returncode=%d command=%r stderr=%r', - pipe.returncode, cmd, errput) - raise subprocess.CalledProcessError(pipe.returncode, args) - return pipe.returncode # is EXIT_OK class Job(object): @@ -86,14 +64,14 @@ def __init__(self, repo, rev, head): self.shard_size = 'shard0' def get_key(self, rev): - #/v1/Cloudid/reponame/shardX/0000001000/reponame-0000001000.svndump.gz + # /v1/Cloudid/reponame/shardX/0000001000/reponame-0000001000.svndump.gz return '%s/%s' % (self._get_s3_base(rev = rev), self.get_name(rev)) def get_name(self, rev): revStr = str(rev) revStr = revStr.zfill(10) name = self.repo + '-' + revStr + '.svndump.gz' - #reponame-0000001000.svndump.gz + # reponame-0000001000.svndump.gz return name def _get_s3_base(self, rev): @@ -113,10 +91,9 @@ def _get_svn_dump_args(self, from_rev, to_rev): #svnadmin dump --incremental --deltas /srv/cms/svn/demo1 -r 237:237 return [SVNADMIN, 'dump', '--incremental', '--deltas', path, dump_rev] - - #def _get_aws_cp_args(self, rev): - # # aws s3 cp - s3://cms-review-jandersson/v1/jandersson/demo1/shard0/0000000000/demo1-0000000363.svndump.gz - # return [AWS, 's3', 'cp', '-', 's3://%s/%s' % (BUCKET, self.get_key(rev))] + # def _get_aws_cp_args(self, rev): + # # aws s3 cp - s3://cms-review-jandersson/v1/jandersson/demo1/shard0/0000000000/demo1-0000000363.svndump.gz + # return [AWS, 's3', 'cp', '-', 's3://%s/%s' % (BUCKET, self.get_key(rev))] def _validate_shard(self, rev): key = self.get_key(rev) @@ -126,15 +103,14 @@ def _validate_shard(self, rev): if (not response["ContentLength"] > 0): logging.warning('Dump file empty: %s' % key) return False - #logging.info(response) + # logging.info(response) return True except Exception as err: logging.debug("S3 exception: {0}".format(err)) logging.info('Shard key does not exist: s3://%s/%s' % (BUCKET, key)) return False - - #Will recursively check a bucket if (rev - 1) exists until it finds a rev dump. + # Will recursively check a bucket if (rev - 1) exists until it finds a rev dump. def validate_rev(self, rev): validate_to_rev = self._get_validate_to_rev() @@ -146,7 +122,6 @@ def validate_rev(self, rev): return self._validate_shard(rev_to_validate) - def _get_validate_to_rev(self): rev_round_down = int((self.head - 1) / 1000) return rev_round_down * 1000 @@ -155,30 +130,23 @@ def _backup_commit(self): logging.info('Dumping and uploading rev: %s from repo: %s' % (self.rev, self.repo)) self.dump_zip_upload(self._get_svn_dump_args(self.rev, self.rev), self.rev) - def dump_zip_upload(self, dump_args, rev): shard_key = self.get_key(rev) + gz_args = ['/bin/gzip'] + + # svnadmin dump + dump, dump_stdout, _ = execute(*dump_args, text=False, env=self.env) + # gzip stdout + gz, gz_stdout, _ = execute(*gz_args, text=False, env=self.env, stdin=dump_stdout) + # Upload gzip.stdout to S3 + s3client.upload_fileobj(gz_stdout, BUCKET, shard_key) - gz = '/bin/gzip' - gz_args = [gz] - - # Svn admin dump - p1 = subprocess.Popen((dump_args), stdout=subprocess.PIPE, env=self.env) - # Zip stdout - p2 = subprocess.Popen((gz_args), stdin=p1.stdout, stdout=subprocess.PIPE) - p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits. - # Upload zip.stdout to s3 - s3client.upload_fileobj(p2.stdout, BUCKET, shard_key) - #TODO: Do we need to close stuff? - p2.communicate()[0] - p1.poll() - - if p1.returncode != 0: - logging.error('Dumping shard failed (rc=%s): %s', p1.returncode, shard_key) + if dump.returncode != 0: + logging.error('Dumping shard failed (rc=%s): %s', dump.returncode, shard_key) raise Exception('Dumping shard failed') - if p2.returncode != 0: - logging.error('Compressing shard failed (rc=%s): %s', p2.returncode, shard_key) + if gz.returncode != 0: + logging.error('Compressing shard failed (rc=%s): %s', gz.returncode, shard_key) raise Exception('Compressing shard failed') @@ -227,19 +195,16 @@ def _get_head(self, repo): raise Exception('Repository does not exist') # Considered using svn to enable rdump in the future. - #fqdn = socket.getfqdn() - #url = 'http://%s/svn/%s' % (fqdn, repo) + # fqdn = socket.getfqdn() + # url = 'http://%s/svn/%s' % (fqdn, repo) - #args = [SVN, 'info', url] - #grep_args = ['/bin/grep', 'Revision:'] + # args = [SVN, 'info', url] + # grep_args = ['/bin/grep', 'Revision:'] - args = [SVNLOOK, 'youngest', path] - grep_args = ['/bin/grep', '^[0-9]\+'] - - p1 = subprocess.Popen((args), stdout=subprocess.PIPE) - output = subprocess.check_output((grep_args), stdin=p1.stdout) - - rev = int(''.join(filter(str.isdigit, output.decode("utf-8")))) + svnlook_args = [SVNLOOK, 'youngest', path] + svnlook, svnlook_stdout, _ = execute(*svnlook_args, text=True, env=self.env) + match = re.match(r'^(\d+)', svnlook_stdout) + rev = int(match.group(1)) if match else None logging.info('Repository %s youngest: %s' % (repo, rev)) return rev @@ -253,7 +218,6 @@ def _get_shards(self, head): # Upper limit must be +1 before division (both shard3 and shard0). return list(range(self.rev_min, int((head + 1) / self.shard_div) * self.shard_div, self.shard_div)) - def _backup_shard(self, shard): logging.info('Dumping and uploading shard: %s from repo: %s' % (shard, self.repo)) start_rev = str(shard) @@ -283,7 +247,6 @@ def __init__(self, repo): shards = self._get_shards(self.rev_min + 1000 * self.shard_div) self._run(shards) - # Refresh head after potentially loading large dumps. self.head = self._get_head(self.repo) if self.head == 0: @@ -314,60 +277,47 @@ def _run(self, shards): # Restart or raise the maximum number of shards. raise Exception('Maximum number of shards processed') - def _load_shard(self, shard): logging.info('Loading shard: %s from repo: %s' % (shard, self.repo)) start_rev = str(shard) to_rev = str(((int(shard / self.shard_div) + 1) * self.shard_div) - 1) logging.info('Loading shard %s' % shard) - self.load_zip(start_rev) - + self._load_zip(start_rev) - def load_zip(self, rev): + def _load_zip(self, rev): shard_key = self.get_key(rev) - gz = '/bin/gunzip' - gz_args = [gz, '-c'] + gz_args = ['/bin/gunzip', '-c'] path = '%s/%s' % (SVNROOT, self.repo) load_args = [SVNADMIN, 'load', path] - # Temporary file - # TODO: Use specific tmp dir. - prefix = '%s_%s_' % (self.repo, rev) - fp = tempfile.NamedTemporaryFile(prefix=prefix, suffix='.svndump.gz', delete=True) - logging.debug('Downloading shard to temporary file: %s', fp.name) + shard_buffer = BytesIO() + logging.debug('Downloading shard to memory...') # Download from s3 - s3client.download_fileobj(BUCKET, shard_key, fp) - fp.seek(0) + s3client.download_fileobj(BUCKET, shard_key, shard_buffer) # gunzip - p1 = subprocess.Popen((gz_args), stdin=fp, stdout=subprocess.PIPE, env=self.env) + gz, gz_stdout, _ = execute(*gz_args, text=False, env=self.env, stdin=shard_buffer.getvalue()) # svnadmin load - p2 = subprocess.Popen((load_args), stdin=p1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) - p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits. - #TODO: Do we need to close stuff? - p2.communicate()[0] - logging.debug('Load return code: %s', p2.returncode) - p1.poll() - logging.debug('Gunzip return code: %s', p1.returncode) - - # Closing tmp file should delete it. - fp.close() - - if p1.returncode != 0: - logging.error('Decompressing shard failed (rc=%s): %s', p1.returncode, shard_key) + load, _, _ = execute(*load_args, text=False, env=self.env, stdin=gz_stdout) + logging.debug('Load return code: %s', load.returncode) + logging.debug('Gunzip return code: %s', gz.returncode) + + if gz.returncode != 0: + logging.error('Decompressing shard failed (rc=%s): %s', gz.returncode, shard_key) raise Exception('Decompressing shard failed') - if p2.returncode != 0: - logging.error('Loading shard failed (rc=%s): %s', p2.returncode, shard_key) + if load.returncode != 0: + logging.error('Loading shard failed (rc=%s): %s', load.returncode, shard_key) raise Exception('Loading shard failed') # TODO Analyze output, should conclude with (ensure revision is correct for shard): # ------- Committed revision 4999 >>> + class BigDoEverythingClasss(object): - #removed the config object from __init__. + # removed the config object from __init__. def __init__(self): self.streams = ["http://%s:%d/commits" %(HOST, PORT)] @@ -397,15 +347,13 @@ def commit(self, url, commit): job = Job(commit.repositoryname, commit.id, commit.id) self.worker.add_job(job) -# Start logging warnings if the work backlog reaches this many items -BACKLOG_TOO_HIGH = 500 class BackgroundWorker(threading.Thread): def __init__(self, svnbin, hook): threading.Thread.__init__(self) # The main thread/process should not wait for this thread to exit. - ### compat with Python 2.5 + # compat with Python 2.5 self.setDaemon(True) self.svnbin = svnbin @@ -454,6 +402,7 @@ def _validate(self, job, boot=False): logging.info("Starting validation of rev: %s in repo: %s" % (job.rev, job.repo)) return job.validate_rev(job.rev) + class Daemon(daemonize.Daemon): def __init__(self, logfile, pidfile, umask, bdec): daemonize.Daemon.__init__(self, logfile, pidfile) @@ -494,7 +443,7 @@ def _event(self, url, event_name, event_arg): def prepare_logging(logfile): - "Log to the specified file, or to stdout if None." + """Log to the specified file, or to stdout if None.""" if logfile: # Rotate logs daily, keeping 7 days worth. handler = logging.handlers.TimedRotatingFileHandler( @@ -512,9 +461,10 @@ def prepare_logging(logfile): root = logging.getLogger() root.addHandler(handler) - ### use logging.INFO for now. switch to cmdline option or a config? + # use logging.INFO for now. switch to cmdline option or a config? root.setLevel(logging.INFO) + def handle_options(options): if not options.aws: @@ -560,7 +510,6 @@ def handle_options(options): # Set up the logging, then process the rest of the options. - # In daemon mode, we let the daemonize module handle the pidfile. # Otherwise, we should write this (foreground) PID into the file. if options.pidfile and not options.daemon: @@ -596,6 +545,7 @@ def handle_options(options): prepare_logging(options.logfile) + def main(args): parser = optparse.OptionParser( description='An SvnPubSub client to keep working copies synchronized ' From b32e96b7230657ac9472dea38d8138ea5def5f79 Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Tue, 12 Mar 2024 11:21:36 +0100 Subject: [PATCH 03/19] Some more refactoring and PEP 8 fixes. --- svndumpsub.py | 70 ++++++++++++++++++++++++--------------------------- 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index a06940e..f90bc44 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -32,10 +32,11 @@ import boto3 import re import logging.handlers + try: - import queue + import queue except ImportError: - import queue as Queue + import queue as Queue import optparse import daemonize @@ -65,7 +66,7 @@ def __init__(self, repo, rev, head): def get_key(self, rev): # /v1/Cloudid/reponame/shardX/0000001000/reponame-0000001000.svndump.gz - return '%s/%s' % (self._get_s3_base(rev = rev), self.get_name(rev)) + return '%s/%s' % (self._get_s3_base(rev=rev), self.get_name(rev)) def get_name(self, rev): revStr = str(rev) @@ -88,7 +89,7 @@ def _get_s3_base(self, rev): def _get_svn_dump_args(self, from_rev, to_rev): path = '%s/%s' % (SVNROOT, self.repo) dump_rev = '-r%s:%s' % (from_rev, to_rev) - #svnadmin dump --incremental --deltas /srv/cms/svn/demo1 -r 237:237 + # svnadmin dump --incremental --deltas /srv/cms/svn/demo1 -r 237:237 return [SVNADMIN, 'dump', '--incremental', '--deltas', path, dump_rev] # def _get_aws_cp_args(self, rev): @@ -100,7 +101,7 @@ def _validate_shard(self, rev): try: response = s3client.head_object(Bucket=BUCKET, Key=key) logging.debug('Shard key exists: %s' % key) - if (not response["ContentLength"] > 0): + if not response["ContentLength"] > 0: logging.warning('Dump file empty: %s' % key) return False # logging.info(response) @@ -126,7 +127,7 @@ def _get_validate_to_rev(self): rev_round_down = int((self.head - 1) / 1000) return rev_round_down * 1000 - def _backup_commit(self): + def backup_commit(self): logging.info('Dumping and uploading rev: %s from repo: %s' % (self.rev, self.repo)) self.dump_zip_upload(self._get_svn_dump_args(self.rev, self.rev), self.rev) @@ -164,7 +165,7 @@ def __init__(self, repo, shard_size): self.rev_min = 0 elif shard_size == 'shard0': self.shard_div = 1 - self.shard_div_next = 1000 # next larger shard + self.shard_div_next = 1000 # next larger shard self.rev_min = int(self.head / self.shard_div_next) * self.shard_div_next # shard0 revisions might be skipped when using only --history without svnpubsub # revision will be dumped in shard3 but can be a problem for slave servers loading single revisions @@ -319,12 +320,12 @@ def _load_zip(self, rev): class BigDoEverythingClasss(object): # removed the config object from __init__. def __init__(self): - self.streams = ["http://%s:%d/commits" %(HOST, PORT)] + self.streams = ["http://%s:%d/commits" % (HOST, PORT)] self.hook = None self.svnbin = SVNADMIN self.worker = BackgroundWorker(self.svnbin, self.hook) - self.watch = [ ] + self.watch = [] def start(self): logging.info('start') @@ -371,14 +372,14 @@ def run(self): # Warn if the queue is too long. # (Note: the other thread might have added entries to self.q # after the .get() and before the .qsize().) - qsize = self.q.qsize()+1 + qsize = self.q.qsize() + 1 if qsize > BACKLOG_TOO_HIGH: logging.warn('worker backlog is at %d', qsize) try: prev_exists = self._validate(job) if prev_exists: - job._backup_commit() + job.backup_commit() else: logging.info('Rev - 1 has not been dumped, adding it to the queue') self.add_job(job) @@ -398,7 +399,7 @@ def add_job(self, job): self.q.put((job.rev, job)) def _validate(self, job, boot=False): - "Validate the specific job." + """Validate the specific job.""" logging.info("Starting validation of rev: %s in repo: %s" % (job.rev, job.repo)) return job.validate_rev(job.rev) @@ -446,15 +447,12 @@ def prepare_logging(logfile): """Log to the specified file, or to stdout if None.""" if logfile: # Rotate logs daily, keeping 7 days worth. - handler = logging.handlers.TimedRotatingFileHandler( - logfile, when='midnight', backupCount=7, - ) + handler = logging.handlers.TimedRotatingFileHandler(logfile, when='midnight', backupCount=7) else: handler = logging.StreamHandler(sys.stdout) # Add a timestamp to the log records - formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', - '%Y-%m-%d %H:%M:%S') + formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S') handler.setFormatter(formatter) # Apply the handler to the root logger @@ -466,7 +464,6 @@ def prepare_logging(logfile): def handle_options(options): - if not options.aws: raise ValueError('A valid --aws has to be provided (path to aws executable)') else: @@ -551,39 +548,39 @@ def main(args): description='An SvnPubSub client to keep working copies synchronized ' 'with a repository.', usage='Usage: %prog [options] CONFIG_FILE', - ) + ) parser.add_option('--logfile', - help='filename for logging') + help='filename for logging') parser.add_option('--pidfile', - help="the process' PID will be written to this file") + help="the process' PID will be written to this file") parser.add_option('--uid', - help='switch to this UID before running') + help='switch to this UID before running') parser.add_option('--gid', - help='switch to this GID before running') + help='switch to this GID before running') parser.add_option('--daemon', action='store_true', - help='run as a background daemon') + help='run as a background daemon') parser.add_option('--umask', - help='set this (octal) umask before running') + help='set this (octal) umask before running') parser.add_option('--history', - help='Will dump and backup repository in shard3 ranges (even thousands), e.g --history reponame') + help='Will dump and backup repository in shard3 ranges (even thousands), e.g --history reponame') parser.add_option('--shardsize', default='shard3', - help='Shard size used by --history. Assumes that shard3 is executed before shard0.') + help='Shard size used by --history. Assumes that shard3 is executed before shard0.') parser.add_option('--load', - help='Will load repository from shards in size order (shard3 then shard0), e.g --load reponame') + help='Will load repository from shards in size order (shard3 then shard0), e.g --load reponame') parser.add_option('--aws', - help='path to aws executable e.g /usr/bin/aws') + help='path to aws executable e.g /usr/bin/aws') parser.add_option('--svnadmin', - help='path to svnadmin executable e.g /usr/bin/svnadmin') + help='path to svnadmin executable e.g /usr/bin/svnadmin') parser.add_option('--svnlook', - help='path to svnlook executable e.g /usr/bin/svnlook') + help='path to svnlook executable e.g /usr/bin/svnlook') parser.add_option('--svnroot', - help='path to repository locations /srv/cms/svn') + help='path to repository locations /srv/cms/svn') parser.add_option('--svn', - help='path to svn executable only required when combined with --history e.g /usr/bin/svn') + help='path to svn executable only required when combined with --history e.g /usr/bin/svn') parser.add_option('--bucket', - help='name of S3 bucket where dumps will be stored') + help='name of S3 bucket where dumps will be stored') parser.add_option('--cloudid', - help='AWS cloud-id') + help='AWS cloud-id') options, extra = parser.parse_args(args) @@ -604,8 +601,7 @@ def main(args): # We manage the logfile ourselves (along with possible rotation). The # daemon process can just drop stdout/stderr into /dev/null. - d = Daemon('/dev/null', os.path.abspath(options.pidfile), - options.umask, bdec) + d = Daemon('/dev/null', os.path.abspath(options.pidfile), options.umask, bdec) if options.daemon: # Daemonize the process and call sys.exit() with appropriate code d.daemonize_exit() From fdbdc4cab8a3826a0beeec97bfe5967b1d4d8f89 Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Tue, 12 Mar 2024 12:53:55 +0100 Subject: [PATCH 04/19] Added a --log-level argument and suppressed the unnecessary boto3 debug logs. --- svndumpsub.py | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index f90bc44..c054398 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -279,11 +279,10 @@ def _run(self, shards): raise Exception('Maximum number of shards processed') def _load_shard(self, shard): - logging.info('Loading shard: %s from repo: %s' % (shard, self.repo)) start_rev = str(shard) to_rev = str(((int(shard / self.shard_div) + 1) * self.shard_div) - 1) - logging.info('Loading shard %s' % shard) + logging.info('Loading shard %s from repo: %s' % (shard, self.repo)) self._load_zip(start_rev) def _load_zip(self, rev): @@ -295,23 +294,15 @@ def _load_zip(self, rev): load_args = [SVNADMIN, 'load', path] shard_buffer = BytesIO() - logging.debug('Downloading shard to memory...') + logging.debug('Downloading shard from: %s' % shard_key) # Download from s3 s3client.download_fileobj(BUCKET, shard_key, shard_buffer) # gunzip + logging.debug('Decompressing downloaded shard from: %s' % shard_key) gz, gz_stdout, _ = execute(*gz_args, text=False, env=self.env, stdin=shard_buffer.getvalue()) # svnadmin load + logging.debug('Loading downloaded and decompressed shard from: %s' % shard_key) load, _, _ = execute(*load_args, text=False, env=self.env, stdin=gz_stdout) - logging.debug('Load return code: %s', load.returncode) - logging.debug('Gunzip return code: %s', gz.returncode) - - if gz.returncode != 0: - logging.error('Decompressing shard failed (rc=%s): %s', gz.returncode, shard_key) - raise Exception('Decompressing shard failed') - - if load.returncode != 0: - logging.error('Loading shard failed (rc=%s): %s', load.returncode, shard_key) - raise Exception('Loading shard failed') # TODO Analyze output, should conclude with (ensure revision is correct for shard): # ------- Committed revision 4999 >>> @@ -443,24 +434,30 @@ def _event(self, url, event_name, event_arg): logging.info('"%s" from %s', event_name, url) -def prepare_logging(logfile): +def prepare_logging(logfile, level): """Log to the specified file, or to stdout if None.""" if logfile: # Rotate logs daily, keeping 7 days worth. handler = logging.handlers.TimedRotatingFileHandler(logfile, when='midnight', backupCount=7) else: - handler = logging.StreamHandler(sys.stdout) + handler = logging.getLogger().handlers[0] # Add a timestamp to the log records formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S') handler.setFormatter(formatter) - # Apply the handler to the root logger + # Remove all existing handlers and apply the new handler to the root logger root = logging.getLogger() + for handler in root.handlers: + root.removeHandler(handler) root.addHandler(handler) + root.setLevel(level) - # use logging.INFO for now. switch to cmdline option or a config? - root.setLevel(logging.INFO) + # Suppress the unnecessary boto3 logs + logging.getLogger('boto3').setLevel(logging.INFO) + logging.getLogger('botocore').setLevel(logging.INFO) + logging.getLogger('s3transfer').setLevel(logging.INFO) + logging.getLogger('urllib3').setLevel(logging.INFO) def handle_options(options): @@ -540,7 +537,7 @@ def handle_options(options): logging.info('setting uid %d', uid) os.setuid(uid) - prepare_logging(options.logfile) + prepare_logging(options.logfile, options.log_level) def main(args): @@ -551,6 +548,10 @@ def main(args): ) parser.add_option('--logfile', help='filename for logging') + parser.add_option('--log-level', type=int, metavar='level', default=logging.INFO, + help='debug logging level (DEBUG: %d | INFO: %d | WARNING: %d | ERROR: %d | CRITICAL: %d) ' + '(default: %d)' % (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, + logging.CRITICAL, logging.INFO)) parser.add_option('--pidfile', help="the process' PID will be written to this file") parser.add_option('--uid', From c87fa895abd01ef6eec6e5cc4b32ecdb2dd9c761 Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Tue, 12 Mar 2024 13:05:21 +0100 Subject: [PATCH 05/19] Fixed a logical issue and revived the maximum number of shards error. --- svndumpsub.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index c054398..3986c5e 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -273,10 +273,9 @@ def _run(self, shards): continue else: logging.info('Shard does not exist, done for now - %s' % shard) - break - logging.warning('Maximum number of shards processed, terminating.') - # Restart or raise the maximum number of shards. - raise Exception('Maximum number of shards processed') + return + # Restart or raise the maximum number of shards. + raise Exception('Maximum number of shards processed') def _load_shard(self, shard): start_rev = str(shard) From 9c123468049c15ee93c9de615f771c760e237127 Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Mon, 18 Mar 2024 10:43:55 +0100 Subject: [PATCH 06/19] Check the youngest revision and compare it against the last commit restored from the dump. --- svndumpsub.py | 56 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index 3986c5e..f2668c7 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -175,7 +175,7 @@ def __init__(self, repo, shard_size): logging.error('Unsupported shard type: %s' % shard_size) raise Exception('Unsupported shard type') - logging.info('Processing repo %s with head revision %s' % (self.repo, self.head)) + logging.info('Processing repo: %s with head revision: %s' % (self.repo, self.head)) shards = self._get_shards(self.head) self._run(shards) @@ -206,7 +206,7 @@ def _get_head(self, repo): svnlook, svnlook_stdout, _ = execute(*svnlook_args, text=True, env=self.env) match = re.match(r'^(\d+)', svnlook_stdout) rev = int(match.group(1)) if match else None - logging.info('Repository %s youngest: %s' % (repo, rev)) + logging.info('Repository: %s youngest: %s' % (repo, rev)) return rev def _get_shards(self, head): @@ -240,7 +240,7 @@ def __init__(self, repo): else: self.rev_min = self.head + 1 - logging.info('Processing repo %s with head revision %s' % (self.repo, self.head)) + logging.info('Processing repo: %s with head revision: %s' % (self.repo, self.head)) # First process large shards if local head is divisible with shard size. if self.rev_min % 1000 == 0: self.shard_size = 'shard3' @@ -264,11 +264,11 @@ def __init__(self, repo): self._run(shards) def _run(self, shards): - logging.info('Shards length %s' % len(shards)) + logging.info('Shards length: %s' % len(shards)) for shard in shards: dump_exists = self._validate_shard(shard) if dump_exists: - logging.info('Shard exists, will load shard %s' % shard) + logging.info('Shard exists, will load shard: %s' % shard) self._load_shard(shard) continue else: @@ -278,17 +278,16 @@ def _run(self, shards): raise Exception('Maximum number of shards processed') def _load_shard(self, shard): - start_rev = str(shard) - to_rev = str(((int(shard / self.shard_div) + 1) * self.shard_div) - 1) + start_rev = shard + end_rev = ((int(shard / self.shard_div) + 1) * self.shard_div) - 1 + logging.info('Loading shard: %s to repo: %s' % (shard, self.repo)) + return self._load_zip(start_rev, end_rev) - logging.info('Loading shard %s from repo: %s' % (shard, self.repo)) - self._load_zip(start_rev) - - def _load_zip(self, rev): - shard_key = self.get_key(rev) + def _load_zip(self, start_rev, end_rev) -> (int, int): + start = end = None + shard_key = self.get_key(str(start_rev)) gz_args = ['/bin/gunzip', '-c'] - path = '%s/%s' % (SVNROOT, self.repo) load_args = [SVNADMIN, 'load', path] @@ -301,10 +300,33 @@ def _load_zip(self, rev): gz, gz_stdout, _ = execute(*gz_args, text=False, env=self.env, stdin=shard_buffer.getvalue()) # svnadmin load logging.debug('Loading downloaded and decompressed shard from: %s' % shard_key) - load, _, _ = execute(*load_args, text=False, env=self.env, stdin=gz_stdout) - - # TODO Analyze output, should conclude with (ensure revision is correct for shard): - # ------- Committed revision 4999 >>> + load, load_stdout, _ = execute(*load_args, text=False, env=self.env, stdin=gz_stdout) + for line in load_stdout.decode("utf-8").splitlines(): + logging.debug(line.rstrip()) + # Parse the revision from the output lines similar to: ------- Committed revision 4999 >>> + match = re.search(r"-+ Committed revision (\d+) >+", line) + if match: + rev = int(match.group(1)) + if start is None: + start = rev + else: + start = min(start, rev) + if end is None: + end = rev + else: + end = max(end, rev) + if start is None or end is None: + logging.error('No shards were loaded') + return start, end + youngest = self._get_head(self.repo) + if youngest != end: + raise Exception('The last committed revision is not the youngest (%d != %d)' % (end, youngest)) + if start_rev == end_rev: + logging.info('Loaded shard: %d to repo: %s at rev: %d', start_rev, self.repo, end) + else: + logging.info('Loaded shard: %d to repo: %s at revs: (%d-%d)', + start_rev, self.repo, start, end) + return start, end class BigDoEverythingClasss(object): From e833c3078cb1c8658574aa5cf7522d87bce122bc Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Mon, 18 Mar 2024 12:45:39 +0100 Subject: [PATCH 07/19] Rewrote the _load_zip method avoiding buffers as the dumps can be quite large Even the previous approach using the communicate() function could potentially cause memory issues as the function buffers the entire stdout before returning. --- svndumpsub.py | 84 +++++++++++++++++++++++++++------------------------ 1 file changed, 45 insertions(+), 39 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index f2668c7..41eff68 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -24,13 +24,14 @@ # # On startup SvnDumpSub starts listening to commits in all repositories. # - -import threading import sys import stat import os -import boto3 import re +import gzip +import boto3 +import tempfile +import threading import logging.handlers try: @@ -43,7 +44,6 @@ import svnpubsub.client import svnpubsub.util from svnpubsub.util import execute -from io import BytesIO HOST = "127.0.0.1" PORT = 2069 @@ -287,45 +287,45 @@ def _load_zip(self, start_rev, end_rev) -> (int, int): start = end = None shard_key = self.get_key(str(start_rev)) - gz_args = ['/bin/gunzip', '-c'] path = '%s/%s' % (SVNROOT, self.repo) load_args = [SVNADMIN, 'load', path] - shard_buffer = BytesIO() - logging.debug('Downloading shard from: %s' % shard_key) - # Download from s3 - s3client.download_fileobj(BUCKET, shard_key, shard_buffer) - # gunzip - logging.debug('Decompressing downloaded shard from: %s' % shard_key) - gz, gz_stdout, _ = execute(*gz_args, text=False, env=self.env, stdin=shard_buffer.getvalue()) - # svnadmin load - logging.debug('Loading downloaded and decompressed shard from: %s' % shard_key) - load, load_stdout, _ = execute(*load_args, text=False, env=self.env, stdin=gz_stdout) - for line in load_stdout.decode("utf-8").splitlines(): - logging.debug(line.rstrip()) - # Parse the revision from the output lines similar to: ------- Committed revision 4999 >>> - match = re.search(r"-+ Committed revision (\d+) >+", line) - if match: - rev = int(match.group(1)) - if start is None: - start = rev + prefix = '%s_%s_' % (self.repo, start_rev) + with tempfile.NamedTemporaryFile(dir=TEMPDIR, prefix=prefix, suffix='.svndump.gz', delete=True) as gz: + logging.debug('Downloading shard from: %s to temporary file: %s', shard_key, gz.name) + # Download from s3 + s3client.download_fileobj(BUCKET, shard_key, gz) + gz.seek(0) + with gzip.open(gz.name, 'rb') as svndump: + # svnadmin load + logging.debug('Loading downloaded and decompressed shard from: %s' % shard_key) + load, load_stdout, _ = execute(*load_args, text=False, env=self.env, stdin=svndump) + for line in load_stdout.decode("utf-8").splitlines(): + logging.debug(line.rstrip()) + # Parse the revision from the output lines similar to: ------- Committed revision 4999 >>> + match = re.search(r"-+ Committed revision (\d+) >+", line) + if match: + rev = int(match.group(1)) + if start is None: + start = rev + else: + start = min(start, rev) + if end is None: + end = rev + else: + end = max(end, rev) + if start is None or end is None: + logging.error('No shards were loaded') + return start, end + youngest = self._get_head(self.repo) + if youngest != end: + raise Exception('The last committed revision is not the youngest (%d != %d)' % (end, youngest)) + if start_rev == end_rev: + logging.info('Loaded rev: %d from shard: %d to repo: %s', end, start_rev, self.repo) else: - start = min(start, rev) - if end is None: - end = rev - else: - end = max(end, rev) - if start is None or end is None: - logging.error('No shards were loaded') - return start, end - youngest = self._get_head(self.repo) - if youngest != end: - raise Exception('The last committed revision is not the youngest (%d != %d)' % (end, youngest)) - if start_rev == end_rev: - logging.info('Loaded shard: %d to repo: %s at rev: %d', start_rev, self.repo, end) - else: - logging.info('Loaded shard: %d to repo: %s at revs: (%d-%d)', - start_rev, self.repo, start, end) + logging.info('Loaded revs: (%d-%d) from shard: %d to repo: %s', + start, end, start_rev, self.repo) + return start, end @@ -488,6 +488,10 @@ def handle_options(options): global AWS AWS = options.aws + if options.tempdir: + global TEMPDIR + TEMPDIR = options.tempdir + if not options.svnadmin: raise ValueError('A valid --svnadmin has to be provided (path to svnadmin executable)') else: @@ -575,6 +579,8 @@ def main(args): logging.CRITICAL, logging.INFO)) parser.add_option('--pidfile', help="the process' PID will be written to this file") + parser.add_option('--tempdir', + help="temporary directory for storing downloaded files", default=tempfile.gettempdir()) parser.add_option('--uid', help='switch to this UID before running') parser.add_option('--gid', From 2242c728a3d1326e58b29a61d148d279abf3e955 Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Wed, 20 Mar 2024 18:08:50 +0100 Subject: [PATCH 08/19] Re-implemented the execute() function reading it chunk by chunk instead of entirely loading it to the memory. --- svndumpsub.py | 57 ++++++++++++++++------------- svnpubsub/util.py | 92 ++++++++++++++++++++++++++++++++--------------- 2 files changed, 96 insertions(+), 53 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index 41eff68..42fdea1 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -32,6 +32,7 @@ import boto3 import tempfile import threading +import subprocess import logging.handlers try: @@ -133,21 +134,27 @@ def backup_commit(self): def dump_zip_upload(self, dump_args, rev): shard_key = self.get_key(rev) - gz_args = ['/bin/gzip'] - # svnadmin dump - dump, dump_stdout, _ = execute(*dump_args, text=False, env=self.env) - # gzip stdout - gz, gz_stdout, _ = execute(*gz_args, text=False, env=self.env, stdin=dump_stdout) - # Upload gzip.stdout to S3 - s3client.upload_fileobj(gz_stdout, BUCKET, shard_key) - - if dump.returncode != 0: - logging.error('Dumping shard failed (rc=%s): %s', dump.returncode, shard_key) + gz = '/bin/gzip' + gz_args = [gz] + + # Svn admin dump + p1 = subprocess.Popen((dump_args), stdout=subprocess.PIPE, env=self.env) + # Zip stdout + p2 = subprocess.Popen((gz_args), stdin=p1.stdout, stdout=subprocess.PIPE) + p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits. + # Upload zip.stdout to s3 + s3client.upload_fileobj(p2.stdout, BUCKET, shard_key) + #TODO: Do we need to close stuff? + p2.communicate()[0] + p1.poll() + + if p1.returncode != 0: + logging.error('Dumping shard failed (rc=%s): %s', p1.returncode, shard_key) raise Exception('Dumping shard failed') - if gz.returncode != 0: - logging.error('Compressing shard failed (rc=%s): %s', gz.returncode, shard_key) + if p2.returncode != 0: + logging.error('Compressing shard failed (rc=%s): %s', p2.returncode, shard_key) raise Exception('Compressing shard failed') @@ -221,11 +228,11 @@ def _get_shards(self, head): def _backup_shard(self, shard): logging.info('Dumping and uploading shard: %s from repo: %s' % (shard, self.repo)) - start_rev = str(shard) + from_rev = str(shard) to_rev = str(((int(shard / self.shard_div) + 1) * self.shard_div) - 1) - svn_args = self._get_svn_dump_args(start_rev, to_rev) - self.dump_zip_upload(svn_args, start_rev) + svn_args = self._get_svn_dump_args(from_rev, to_rev) + self.dump_zip_upload(svn_args, from_rev) class JobMultiLoad(JobMulti): @@ -272,25 +279,25 @@ def _run(self, shards): self._load_shard(shard) continue else: - logging.info('Shard does not exist, done for now - %s' % shard) + logging.info('Shard: %s does not exist, done for now' % shard) return # Restart or raise the maximum number of shards. raise Exception('Maximum number of shards processed') def _load_shard(self, shard): - start_rev = shard - end_rev = ((int(shard / self.shard_div) + 1) * self.shard_div) - 1 + from_rev = shard + to_rev = ((int(shard / self.shard_div) + 1) * self.shard_div) - 1 logging.info('Loading shard: %s to repo: %s' % (shard, self.repo)) - return self._load_zip(start_rev, end_rev) + return self._load_zip(from_rev, to_rev) - def _load_zip(self, start_rev, end_rev) -> (int, int): + def _load_zip(self, from_rev, to_rev) -> (int, int): start = end = None - shard_key = self.get_key(str(start_rev)) + shard_key = self.get_key(str(from_rev)) path = '%s/%s' % (SVNROOT, self.repo) load_args = [SVNADMIN, 'load', path] - prefix = '%s_%s_' % (self.repo, start_rev) + prefix = '%s_%s_' % (self.repo, from_rev) with tempfile.NamedTemporaryFile(dir=TEMPDIR, prefix=prefix, suffix='.svndump.gz', delete=True) as gz: logging.debug('Downloading shard from: %s to temporary file: %s', shard_key, gz.name) # Download from s3 @@ -320,11 +327,11 @@ def _load_zip(self, start_rev, end_rev) -> (int, int): youngest = self._get_head(self.repo) if youngest != end: raise Exception('The last committed revision is not the youngest (%d != %d)' % (end, youngest)) - if start_rev == end_rev: - logging.info('Loaded rev: %d from shard: %d to repo: %s', end, start_rev, self.repo) + if from_rev == to_rev: + logging.info('Loaded rev: %d from shard: %d to repo: %s', end, from_rev, self.repo) else: logging.info('Loaded revs: (%d-%d) from shard: %d to repo: %s', - start, end, start_rev, self.repo) + start, end, from_rev, self.repo) return start, end diff --git a/svnpubsub/util.py b/svnpubsub/util.py index 530647b..9a35cc8 100644 --- a/svnpubsub/util.py +++ b/svnpubsub/util.py @@ -19,54 +19,90 @@ import sys import logging import subprocess -from subprocess import CalledProcessError +from select import select +from threading import Thread +from subprocess import Popen, PIPE, CalledProcessError # check_output() is only available in Python 2.7. Allow us to run with # earlier versions try: - __check_output = subprocess.check_output def check_output(args, env=None, universal_newlines=False): - return __check_output(args, shell=False, env=env, - universal_newlines=universal_newlines) + return subprocess.check_output(args, shell=False, env=env, universal_newlines=universal_newlines) except AttributeError: def check_output(args, env=None, universal_newlines=False): # note: we only use these three args - pipe = subprocess.Popen(args, shell=False, env=env, - stdout=subprocess.PIPE, - universal_newlines=universal_newlines) + pipe = Popen(args, shell=False, env=env, stdout=PIPE, universal_newlines=universal_newlines) output, _ = pipe.communicate() if pipe.returncode: - raise subprocess.CalledProcessError(pipe.returncode, args) + raise CalledProcessError(pipe.returncode, args) return output -def execute(*args, text=True, env=None, stdin=None, throw=True): +def is_file_like(variable) -> bool: + try: + return hasattr(variable, 'fileno') + except AttributeError: + return False + + +def execute(*args, text=True, env=None, throw=True, stdin=None): process = None arguments = [*args] - stdout = [] if text else bytes() - stderr = [] if text else bytes() + rlist = wlist = xlist = [] + chunk_size = 1 * 1024 * 1024 # 1 MB + stdout_buffer = [] if text else bytes() + stderr_buffer = [] if text else bytes() + stdin_writer = None logging.debug("Running: %s", " ".join(arguments)) - if hasattr(stdin, 'fileno'): - stdin = stdin.read() + def writer(source, destination): + """ + Copy the source file-like object to the destination file-like object and close + the destination file-like object. + @param source: A source file-like object typically the source of the stdin + @param destination: A destination file-like object typically the stdin stream of the process + """ + source.seek(0) + while True: + data = source.read(chunk_size) + if not data: + break + destination.write(data) + destination.flush() + destination.close() try: - process = subprocess.Popen(arguments, text=text, universal_newlines=text, env=env, - stdin=subprocess.PIPE if stdin is not None else None, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - if text: - stdout_data, stderr_data = process.communicate(input=stdin) - for line in stdout_data.splitlines(): - stdout.append(line.rstrip()) - logging.debug(os.linesep.join(stdout)) - for line in stderr_data.splitlines(): - stderr.append(line.rstrip()) - else: - stdout, stderr = process.communicate(input=stdin) + process = Popen(arguments, text=text, universal_newlines=text, env=env, + stdin=PIPE if stdin is not None else None, stdout=PIPE, stderr=PIPE) + if stdin is not None and process.poll() is None: + # If stdin was supplied, copy its contents to the stdin stream of the process in a separate thread + stdin_writer = Thread(target=writer, args=[stdin, process.stdin]) + stdin_writer.start() + while process.poll() is None: + rlist += [process.stdout.fileno(), process.stderr.fileno()] + for fd in [item for sublist in select(rlist, wlist, xlist) for item in sublist]: + if fd == process.stdout.fileno(): + if text: + line = process.stdout.readline() + stdout_buffer.append(line.rstrip()) + else: + chunk = process.stdout.read(chunk_size) + if chunk: + stdout_buffer += chunk + if fd == process.stderr.fileno(): + if text: + line = process.stderr.readline() + stderr_buffer.append(line.rstrip()) + else: + chunk = process.stderr.read(chunk_size) + if chunk: + stdout_buffer += chunk if process.returncode and throw: - raise CalledProcessError(process.returncode, process.args, process.stdout, process.stderr) + raise subprocess.CalledProcessError(process.returncode, process.args, process.stdout, process.stderr) except Exception: _, value, traceback = sys.exc_info() - raise RuntimeError(os.linesep.join(stderr) if text else stderr.decode('utf-8')).with_traceback(traceback) - return process, os.linesep.join(stdout) if text else stdout, os.linesep.join(stderr) if text else stderr + raise RuntimeError(os.linesep.join(stderr_buffer) if text else stderr_buffer.decode('utf-8')).with_traceback(traceback) + if stdin_writer is not None and stdin_writer.is_alive(): + raise ChildProcessError("The child stdin writer process did not terminate successfully") + return process, os.linesep.join(stdout_buffer) if text else stdout_buffer, os.linesep.join(stderr_buffer) if text else stderr_buffer From c5740d0da3cf88176f50a925af837d1a83eab7a4 Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Wed, 20 Mar 2024 21:32:53 +0100 Subject: [PATCH 09/19] Re-wrote the dump_zip_upload() method using the disk instead of the memory to keep the data. --- svndumpsub.py | 35 +++++++++++--------------------- svnpubsub/util.py | 51 ++++++++++++++++++++++++++++++----------------- 2 files changed, 45 insertions(+), 41 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index 42fdea1..6bfa926 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -134,28 +134,17 @@ def backup_commit(self): def dump_zip_upload(self, dump_args, rev): shard_key = self.get_key(rev) - - gz = '/bin/gzip' - gz_args = [gz] - - # Svn admin dump - p1 = subprocess.Popen((dump_args), stdout=subprocess.PIPE, env=self.env) - # Zip stdout - p2 = subprocess.Popen((gz_args), stdin=p1.stdout, stdout=subprocess.PIPE) - p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits. - # Upload zip.stdout to s3 - s3client.upload_fileobj(p2.stdout, BUCKET, shard_key) - #TODO: Do we need to close stuff? - p2.communicate()[0] - p1.poll() - - if p1.returncode != 0: - logging.error('Dumping shard failed (rc=%s): %s', p1.returncode, shard_key) - raise Exception('Dumping shard failed') - - if p2.returncode != 0: - logging.error('Compressing shard failed (rc=%s): %s', p2.returncode, shard_key) - raise Exception('Compressing shard failed') + prefix = '%s_%s_' % (self.repo, rev) + with tempfile.NamedTemporaryFile(dir=TEMPDIR, prefix=prefix, suffix='.svndump.gz', delete=True) as gz: + with gzip.open(gz, 'wb') as svndump: + # Svn admin dump + logging.debug('Dumping and compressing rev: %s', rev) + dump, _, _ = execute(*dump_args, text=False, env=self.env, stdout=svndump) + logging.debug('Uploading %d bytes for rev: %s to: %s', gz.tell(), rev, shard_key) + gz.seek(0) + # Upload the .svndump.gz file to s3 + s3client.upload_fileobj(gz, BUCKET, shard_key) + logging.info('Uploaded rev: %s from repo: %s to: %s', rev, self.repo, shard_key) # Processing one repo, specified in history option @@ -303,7 +292,7 @@ def _load_zip(self, from_rev, to_rev) -> (int, int): # Download from s3 s3client.download_fileobj(BUCKET, shard_key, gz) gz.seek(0) - with gzip.open(gz.name, 'rb') as svndump: + with gzip.open(gz, 'rb') as svndump: # svnadmin load logging.debug('Loading downloaded and decompressed shard from: %s' % shard_key) load, load_stdout, _ = execute(*load_args, text=False, env=self.env, stdin=svndump) diff --git a/svnpubsub/util.py b/svnpubsub/util.py index 9a35cc8..b7c7d96 100644 --- a/svnpubsub/util.py +++ b/svnpubsub/util.py @@ -21,6 +21,7 @@ import subprocess from select import select from threading import Thread +from shutil import copyfileobj from subprocess import Popen, PIPE, CalledProcessError # check_output() is only available in Python 2.7. Allow us to run with @@ -45,15 +46,22 @@ def is_file_like(variable) -> bool: return False -def execute(*args, text=True, env=None, throw=True, stdin=None): +def execute(*args, text=True, env=None, throw=True, stdin=None, stdout=None, stderr=None): process = None arguments = [*args] rlist = wlist = xlist = [] chunk_size = 1 * 1024 * 1024 # 1 MB - stdout_buffer = [] if text else bytes() - stderr_buffer = [] if text else bytes() + stdout_buffer = None if stdout is not None else [] if text else bytes() + stderr_buffer = None if stderr is not None else [] if text else bytes() stdin_writer = None + if stdin is not None and not is_file_like(stdin): + raise ValueError("stdin must be a file-like object") + if stdout is not None and not is_file_like(stdout): + raise ValueError("stdout must be a file-like object") + if stderr is not None and not is_file_like(stderr): + raise ValueError("stderr must be a file-like object") + logging.debug("Running: %s", " ".join(arguments)) def writer(source, destination): @@ -64,12 +72,7 @@ def writer(source, destination): @param destination: A destination file-like object typically the stdin stream of the process """ source.seek(0) - while True: - data = source.read(chunk_size) - if not data: - break - destination.write(data) - destination.flush() + copyfileobj(source, destination, chunk_size) destination.close() try: @@ -83,21 +86,31 @@ def writer(source, destination): rlist += [process.stdout.fileno(), process.stderr.fileno()] for fd in [item for sublist in select(rlist, wlist, xlist) for item in sublist]: if fd == process.stdout.fileno(): - if text: - line = process.stdout.readline() - stdout_buffer.append(line.rstrip()) + if stdout is None: + if text: + line = process.stdout.readline() + stdout_buffer.append(line.rstrip()) + else: + chunk = process.stdout.read(chunk_size) + if chunk: + stdout_buffer += chunk else: chunk = process.stdout.read(chunk_size) if chunk: - stdout_buffer += chunk + stdout.write(chunk) if fd == process.stderr.fileno(): - if text: - line = process.stderr.readline() - stderr_buffer.append(line.rstrip()) + if stderr is None: + if text: + line = process.stderr.readline() + stderr_buffer.append(line.rstrip()) + else: + chunk = process.stderr.read(chunk_size) + if chunk: + stderr_buffer += chunk else: chunk = process.stderr.read(chunk_size) if chunk: - stdout_buffer += chunk + stderr.write(chunk) if process.returncode and throw: raise subprocess.CalledProcessError(process.returncode, process.args, process.stdout, process.stderr) except Exception: @@ -105,4 +118,6 @@ def writer(source, destination): raise RuntimeError(os.linesep.join(stderr_buffer) if text else stderr_buffer.decode('utf-8')).with_traceback(traceback) if stdin_writer is not None and stdin_writer.is_alive(): raise ChildProcessError("The child stdin writer process did not terminate successfully") - return process, os.linesep.join(stdout_buffer) if text else stdout_buffer, os.linesep.join(stderr_buffer) if text else stderr_buffer + return (process, + None if stdout is not None else os.linesep.join(stdout_buffer) if text else stdout_buffer, + None if stderr is not None else os.linesep.join(stderr_buffer) if text else stderr_buffer) From 7d2cb04c762034bad7436dbf6effbb825eb893f4 Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Thu, 21 Mar 2024 12:16:02 +0100 Subject: [PATCH 10/19] Added a check that makes sure the modified paths in the restored shard0 revision matches those in the dump file. --- svndumpsub.py | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index 6bfa926..d311579 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -282,10 +282,9 @@ def _load_shard(self, shard): def _load_zip(self, from_rev, to_rev) -> (int, int): start = end = None shard_key = self.get_key(str(from_rev)) - path = '%s/%s' % (SVNROOT, self.repo) load_args = [SVNADMIN, 'load', path] - + changed_args = [SVNLOOK, 'changed', '-r', str(from_rev), path] prefix = '%s_%s_' % (self.repo, from_rev) with tempfile.NamedTemporaryFile(dir=TEMPDIR, prefix=prefix, suffix='.svndump.gz', delete=True) as gz: logging.debug('Downloading shard from: %s to temporary file: %s', shard_key, gz.name) @@ -293,9 +292,19 @@ def _load_zip(self, from_rev, to_rev) -> (int, int): s3client.download_fileobj(BUCKET, shard_key, gz) gz.seek(0) with gzip.open(gz, 'rb') as svndump: + changed_paths_after = set() + changed_paths_before = set() + if self.shard_size == 'shard0': + logging.debug('Extracting the changed paths from the dump file: %s', gz.name) + for line in svndump: + if line.startswith(b'Node-path:'): + change = line.split(b':', 1)[1].decode('utf-8').strip() + logging.debug(change) + changed_paths_before.add(change) # svnadmin load - logging.debug('Loading downloaded and decompressed shard from: %s' % shard_key) - load, load_stdout, _ = execute(*load_args, text=False, env=self.env, stdin=svndump) + svndump.seek(0) + logging.debug('Decompressing and loading shard from: %s', gz.name) + _, load_stdout, _ = execute(*load_args, text=False, env=self.env, stdin=svndump) for line in load_stdout.decode("utf-8").splitlines(): logging.debug(line.rstrip()) # Parse the revision from the output lines similar to: ------- Committed revision 4999 >>> @@ -315,12 +324,27 @@ def _load_zip(self, from_rev, to_rev) -> (int, int): return start, end youngest = self._get_head(self.repo) if youngest != end: + logging.error('The last committed revision is not the youngest (%d != %d)', end, youngest) raise Exception('The last committed revision is not the youngest (%d != %d)' % (end, youngest)) if from_rev == to_rev: logging.info('Loaded rev: %d from shard: %d to repo: %s', end, from_rev, self.repo) else: logging.info('Loaded revs: (%d-%d) from shard: %d to repo: %s', start, end, from_rev, self.repo) + if self.shard_size == 'shard0': + logging.debug('Extracting the changed paths in the restored revision...') + _, changes_stdout, _ = execute(*changed_args, text=True, env=self.env) + for line in changes_stdout.splitlines(): + match = re.search(r"(A|D|U|_U|UU)\s+(.*)", line) + if match and len(match.groups()) > 1: + change = match.group(2).rstrip().rstrip('/') + changed_paths_after.add(change) + logging.debug(change) + if changed_paths_before != changed_paths_after: + logging.error('Not all expected changed paths were successfully restored: %s != %s', + "{" + ", ".join(changed_paths_before) + "}", + "{" + ", ".join(changed_paths_after) + "}") + raise Exception('Not all expected changed paths were successfully restored.') return start, end @@ -465,8 +489,6 @@ def prepare_logging(logfile, level): # Remove all existing handlers and apply the new handler to the root logger root = logging.getLogger() - for handler in root.handlers: - root.removeHandler(handler) root.addHandler(handler) root.setLevel(level) From 4c1cfe7e4f35c235558902e176911da7443be0ac Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Fri, 22 Mar 2024 11:24:47 +0100 Subject: [PATCH 11/19] Improved the execute() function fixing an issue that prevented it from reading the entire stdout/stderr. --- svnpubsub/util.py | 95 +++++++++++++++++++++++++++++++---------------- 1 file changed, 62 insertions(+), 33 deletions(-) diff --git a/svnpubsub/util.py b/svnpubsub/util.py index b7c7d96..bc66f34 100644 --- a/svnpubsub/util.py +++ b/svnpubsub/util.py @@ -47,12 +47,14 @@ def is_file_like(variable) -> bool: def execute(*args, text=True, env=None, throw=True, stdin=None, stdout=None, stderr=None): + rlist = [] + wlist = [] + xlist = [] process = None arguments = [*args] - rlist = wlist = xlist = [] chunk_size = 1 * 1024 * 1024 # 1 MB - stdout_buffer = None if stdout is not None else [] if text else bytes() - stderr_buffer = None if stderr is not None else [] if text else bytes() + stdout_buffer = None if stdout is not None else [] if text else bytearray() + stderr_buffer = None if stderr is not None else [] if text else bytearray() stdin_writer = None if stdin is not None and not is_file_like(stdin): @@ -64,7 +66,7 @@ def execute(*args, text=True, env=None, throw=True, stdin=None, stdout=None, std logging.debug("Running: %s", " ".join(arguments)) - def writer(source, destination): + def write(source, destination): """ Copy the source file-like object to the destination file-like object and close the destination file-like object. @@ -75,42 +77,69 @@ def writer(source, destination): copyfileobj(source, destination, chunk_size) destination.close() + def read(p, f, o, e) -> (bool, bool): + """ + Reads a chunk or a line of the available data and add them to the stdout and stderr buffers. + @param p: The process from which to read the stdout or stderr data + @param f: The file descriptor of the corresponding stdout or stderr object + @param o: The stdout buffer to write to + @param e: The stderr buffer to write to + @return: A boolean tuple indicating whether there's more to read for either stdout or stderr + """ + more_o = False + more_e = False + if f == p.stdout.fileno(): + if stdout is None: + if text: + line = p.stdout.readline() + if line: + o.append(line.rstrip()) + more_o = True + else: + chunk = p.stdout.read(chunk_size) + if chunk: + o += chunk + more_o = True + else: + chunk = p.stdout.read(chunk_size) + if chunk: + stdout.write(chunk) + more_o = True + if f == p.stderr.fileno(): + if stderr is None: + if text: + line = p.stderr.readline() + if line: + e.append(line.rstrip()) + more_e = True + else: + chunk = p.stderr.read(chunk_size) + if chunk: + e += chunk + more_e = True + else: + chunk = p.stderr.read(chunk_size) + if chunk: + stderr.write(chunk) + more_e = True + + return more_o, more_e + try: process = Popen(arguments, text=text, universal_newlines=text, env=env, stdin=PIPE if stdin is not None else None, stdout=PIPE, stderr=PIPE) + if stdin is not None and process.poll() is None: # If stdin was supplied, copy its contents to the stdin stream of the process in a separate thread - stdin_writer = Thread(target=writer, args=[stdin, process.stdin]) + stdin_writer = Thread(target=write, args=[stdin, process.stdin]) stdin_writer.start() while process.poll() is None: rlist += [process.stdout.fileno(), process.stderr.fileno()] for fd in [item for sublist in select(rlist, wlist, xlist) for item in sublist]: - if fd == process.stdout.fileno(): - if stdout is None: - if text: - line = process.stdout.readline() - stdout_buffer.append(line.rstrip()) - else: - chunk = process.stdout.read(chunk_size) - if chunk: - stdout_buffer += chunk - else: - chunk = process.stdout.read(chunk_size) - if chunk: - stdout.write(chunk) - if fd == process.stderr.fileno(): - if stderr is None: - if text: - line = process.stderr.readline() - stderr_buffer.append(line.rstrip()) - else: - chunk = process.stderr.read(chunk_size) - if chunk: - stderr_buffer += chunk - else: - chunk = process.stderr.read(chunk_size) - if chunk: - stderr.write(chunk) + while True: + more_stdout, more_stderr = read(process, fd, stdout_buffer, stderr_buffer) + if not more_stdout and not more_stderr: + break if process.returncode and throw: raise subprocess.CalledProcessError(process.returncode, process.args, process.stdout, process.stderr) except Exception: @@ -119,5 +148,5 @@ def writer(source, destination): if stdin_writer is not None and stdin_writer.is_alive(): raise ChildProcessError("The child stdin writer process did not terminate successfully") return (process, - None if stdout is not None else os.linesep.join(stdout_buffer) if text else stdout_buffer, - None if stderr is not None else os.linesep.join(stderr_buffer) if text else stderr_buffer) + None if stdout is not None else os.linesep.join(stdout_buffer) if text else bytes(stdout_buffer), + None if stderr is not None else os.linesep.join(stderr_buffer) if text else bytes(stderr_buffer)) From 1d67c5f16d213ceec79ccc72cf783cba3374ef3d Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Mon, 25 Mar 2024 11:18:12 +0100 Subject: [PATCH 12/19] Simplified the execute() logic. --- svnpubsub/util.py | 72 ++++++++++++++++++----------------------------- 1 file changed, 28 insertions(+), 44 deletions(-) diff --git a/svnpubsub/util.py b/svnpubsub/util.py index bc66f34..214fdb8 100644 --- a/svnpubsub/util.py +++ b/svnpubsub/util.py @@ -77,53 +77,33 @@ def write(source, destination): copyfileobj(source, destination, chunk_size) destination.close() - def read(p, f, o, e) -> (bool, bool): + def read(stream, buffer, file=None) -> bool: """ - Reads a chunk or a line of the available data and add them to the stdout and stderr buffers. - @param p: The process from which to read the stdout or stderr data - @param f: The file descriptor of the corresponding stdout or stderr object - @param o: The stdout buffer to write to - @param e: The stderr buffer to write to - @return: A boolean tuple indicating whether there's more to read for either stdout or stderr + Reads a chunk or a line of the available data and add it to the buffer. + @param stream: The process from which to read the stdout or stderr data + @param buffer: The buffer to write to + @param file: The file-like object to write to instead of the buffer if a direct output is required + @return: True if there's more to read from the stream, False otherwise """ - more_o = False - more_e = False - if f == p.stdout.fileno(): - if stdout is None: - if text: - line = p.stdout.readline() - if line: - o.append(line.rstrip()) - more_o = True - else: - chunk = p.stdout.read(chunk_size) - if chunk: - o += chunk - more_o = True + eof = True + if file is None: + if text: + line = stream.readline() + if line: + buffer.append(line.rstrip()) + eof = False else: - chunk = p.stdout.read(chunk_size) + chunk = stream.read(chunk_size) if chunk: - stdout.write(chunk) - more_o = True - if f == p.stderr.fileno(): - if stderr is None: - if text: - line = p.stderr.readline() - if line: - e.append(line.rstrip()) - more_e = True - else: - chunk = p.stderr.read(chunk_size) - if chunk: - e += chunk - more_e = True - else: - chunk = p.stderr.read(chunk_size) - if chunk: - stderr.write(chunk) - more_e = True + buffer += chunk + eof = False + else: + chunk = stream.read(chunk_size) + if chunk: + file.write(chunk) + eof = False - return more_o, more_e + return not eof try: process = Popen(arguments, text=text, universal_newlines=text, env=env, @@ -137,8 +117,12 @@ def read(p, f, o, e) -> (bool, bool): rlist += [process.stdout.fileno(), process.stderr.fileno()] for fd in [item for sublist in select(rlist, wlist, xlist) for item in sublist]: while True: - more_stdout, more_stderr = read(process, fd, stdout_buffer, stderr_buffer) - if not more_stdout and not more_stderr: + more = False + if fd == process.stdout.fileno(): + more |= read(process.stdout, stdout_buffer, stdout) + if fd == process.stderr.fileno(): + more |= read(process.stderr, stderr_buffer, stderr) + if not more: break if process.returncode and throw: raise subprocess.CalledProcessError(process.returncode, process.args, process.stdout, process.stderr) From a33d773cdb382ab4edffca5328c1f61dc58afc72 Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Tue, 26 Mar 2024 11:19:59 +0100 Subject: [PATCH 13/19] Clean up corresponding shard0 after successfully dumping a shard3. --- svndumpsub.py | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index d311579..f8ef830 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -67,7 +67,7 @@ def __init__(self, repo, rev, head): def get_key(self, rev): # /v1/Cloudid/reponame/shardX/0000001000/reponame-0000001000.svndump.gz - return '%s/%s' % (self._get_s3_base(rev=rev), self.get_name(rev)) + return '%s/%s' % (self.get_s3_base(self.shard_size, rev), self.get_name(rev)) def get_name(self, rev): revStr = str(rev) @@ -76,7 +76,7 @@ def get_name(self, rev): # reponame-0000001000.svndump.gz return name - def _get_s3_base(self, rev): + def get_s3_base(self, shard_size, rev): # Always using 1000 for folders, can not yet support >shard3. d = int(rev) / 1000 @@ -85,7 +85,7 @@ def _get_s3_base(self, rev): version = 'v1' # v1/CLOUDID/demo1/shard0/0000000000 - return '%s/%s/%s/%s/%s' % (version, CLOUDID, self.repo, self.shard_size, shard_number) + return '%s/%s/%s/%s/%s' % (version, CLOUDID, self.repo, shard_size, shard_number) def _get_svn_dump_args(self, from_rev, to_rev): path = '%s/%s' % (SVNROOT, self.repo) @@ -132,7 +132,7 @@ def backup_commit(self): logging.info('Dumping and uploading rev: %s from repo: %s' % (self.rev, self.repo)) self.dump_zip_upload(self._get_svn_dump_args(self.rev, self.rev), self.rev) - def dump_zip_upload(self, dump_args, rev): + def dump_zip_upload(self, dump_args, rev) -> bool: shard_key = self.get_key(rev) prefix = '%s_%s_' % (self.repo, rev) with tempfile.NamedTemporaryFile(dir=TEMPDIR, prefix=prefix, suffix='.svndump.gz', delete=True) as gz: @@ -146,6 +146,17 @@ def dump_zip_upload(self, dump_args, rev): s3client.upload_fileobj(gz, BUCKET, shard_key) logging.info('Uploaded rev: %s from repo: %s to: %s', rev, self.repo, shard_key) + def cleanup_shards(self, shard_size, shard): + prefix = self.get_s3_base(shard_size, shard) + try: + s3 = boto3.resource('s3') + bucket = s3.Bucket(BUCKET) + logging.debug('Cleaning up shards: %s/*', prefix) + bucket.objects.filter(Prefix=prefix).delete() + logging.info('Cleaned up shards: %s/*', prefix) + except Exception: + logging.exception('Failed to cleanup: %s/*', prefix) + # Processing one repo, specified in history option class JobMulti(Job): @@ -219,9 +230,11 @@ def _backup_shard(self, shard): logging.info('Dumping and uploading shard: %s from repo: %s' % (shard, self.repo)) from_rev = str(shard) to_rev = str(((int(shard / self.shard_div) + 1) * self.shard_div) - 1) - svn_args = self._get_svn_dump_args(from_rev, to_rev) self.dump_zip_upload(svn_args, from_rev) + # Clean up corresponding shard0 after successfully dumping a shard3. + if self.shard_size == 'shard3': + self.cleanup_shards('shard0', shard) class JobMultiLoad(JobMulti): @@ -298,7 +311,7 @@ def _load_zip(self, from_rev, to_rev) -> (int, int): logging.debug('Extracting the changed paths from the dump file: %s', gz.name) for line in svndump: if line.startswith(b'Node-path:'): - change = line.split(b':', 1)[1].decode('utf-8').strip() + change = str(line.split(b':', 1)[1].decode('utf-8').strip()) logging.debug(change) changed_paths_before.add(change) # svnadmin load @@ -337,7 +350,7 @@ def _load_zip(self, from_rev, to_rev) -> (int, int): for line in changes_stdout.splitlines(): match = re.search(r"(A|D|U|_U|UU)\s+(.*)", line) if match and len(match.groups()) > 1: - change = match.group(2).rstrip().rstrip('/') + change = str(match.group(2).rstrip().rstrip('/')) changed_paths_after.add(change) logging.debug(change) if changed_paths_before != changed_paths_after: From f328aadbb60432a5f20bcc0fe9f6ddabb5de9cf7 Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Wed, 27 Mar 2024 10:59:04 +0100 Subject: [PATCH 14/19] Use the common bgworker and daemon modules instead of the duplicated code. --- svndumpsub.py | 176 ++++++++---------------------------------- svnpubsub/bgworker.py | 36 ++++----- 2 files changed, 49 insertions(+), 163 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index f8ef830..043e2f0 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -31,8 +31,6 @@ import gzip import boto3 import tempfile -import threading -import subprocess import logging.handlers try: @@ -41,29 +39,32 @@ import queue as Queue import optparse -import daemonize -import svnpubsub.client -import svnpubsub.util from svnpubsub.util import execute +from svnpubsub.client import Commit +from svnpubsub.daemon import Daemon, DaemonTask +from svnpubsub.bgworker import BackgroundJob -HOST = "127.0.0.1" PORT = 2069 -# Will not handle commits if repo starts with any name included in REPO_EXCLUDES -REPO_EXCLUDES = ['demo', 'repo'] -# Start logging warnings if the work backlog reaches this many items -BACKLOG_TOO_HIGH = 500 +HOST = "127.0.0.1" +# Will not handle commits if repo starts with any name included in EXCLUDED_REPOS +EXCLUDED_REPOS = ['demo', 'repo'] s3client = boto3.client('s3') -class Job(object): +class Job(BackgroundJob): + + def __init__(self, commit: Commit): + super().__init__(repo=commit.repositoryname, rev=commit.id, head=commit.id, commit=commit) - def __init__(self, repo, rev, head): - self.repo = repo - self.rev = rev - self.head = head - self.env = {'LANG': 'en_US.UTF-8', 'LC_ALL': 'en_US.UTF-8'} self.shard_size = 'shard0' + self.env = {'LANG': 'en_US.UTF-8', 'LC_ALL': 'en_US.UTF-8'} + + def validate(self) -> bool: + return self.validate_rev(self.rev) + + def run(self): + self.backup_commit() def get_key(self, rev): # /v1/Cloudid/reponame/shardX/0000001000/reponame-0000001000.svndump.gz @@ -77,12 +78,10 @@ def get_name(self, rev): return name def get_s3_base(self, shard_size, rev): - # Always using 1000 for folders, can not yet support >shard3. d = int(rev) / 1000 d = str(int(d)) + '000' shard_number = d.zfill(10) - version = 'v1' # v1/CLOUDID/demo1/shard0/0000000000 return '%s/%s/%s/%s/%s' % (version, CLOUDID, self.repo, shard_size, shard_number) @@ -362,130 +361,17 @@ def _load_zip(self, from_rev, to_rev) -> (int, int): return start, end -class BigDoEverythingClasss(object): - # removed the config object from __init__. - def __init__(self): - self.streams = ["http://%s:%d/commits" % (HOST, PORT)] +class Task(DaemonTask): - self.hook = None - self.svnbin = SVNADMIN - self.worker = BackgroundWorker(self.svnbin, self.hook) - self.watch = [] + def __init__(self): + super().__init__(urls=["http://%s:%d/commits" % (HOST, PORT)], excluded_repos=EXCLUDED_REPOS) def start(self): - logging.info('start') - - def commit(self, url, commit): - if commit.type != 'svn' or commit.format != 1: - logging.info("SKIP unknown commit format (%s.%d)", - commit.type, commit.format) - return - logging.info("COMMIT r%d (%d paths) from %s" - % (commit.id, len(commit.changed), url)) - - excluded = False - for repo in REPO_EXCLUDES: - if commit.repositoryname.startswith(repo): - logging.info('Commit in excluded repository, ignoring: %s' % commit.repositoryname) - excluded = True - - if not excluded: - job = Job(commit.repositoryname, commit.id, commit.id) - self.worker.add_job(job) - - -class BackgroundWorker(threading.Thread): - def __init__(self, svnbin, hook): - threading.Thread.__init__(self) + logging.info('Daemon started.') - # The main thread/process should not wait for this thread to exit. - # compat with Python 2.5 - self.setDaemon(True) - - self.svnbin = svnbin - self.hook = hook - self.q = queue.PriorityQueue() - - self.has_started = False - - def run(self): - while True: - # This will block until something arrives - tuple = self.q.get() - job = tuple[1] - - # Warn if the queue is too long. - # (Note: the other thread might have added entries to self.q - # after the .get() and before the .qsize().) - qsize = self.q.qsize() + 1 - if qsize > BACKLOG_TOO_HIGH: - logging.warn('worker backlog is at %d', qsize) - - try: - prev_exists = self._validate(job) - if prev_exists: - job.backup_commit() - else: - logging.info('Rev - 1 has not been dumped, adding it to the queue') - self.add_job(job) - self.add_job(Job(job.repo, job.rev - 1, job.head)) - - self.q.task_done() - except: - logging.exception('Exception in worker') - - def add_job(self, job): - # Start the thread when work first arrives. Thread-start needs to - # be delayed in case the process forks itself to become a daemon. - if not self.has_started: - self.start() - self.has_started = True - - self.q.put((job.rev, job)) - - def _validate(self, job, boot=False): - """Validate the specific job.""" - logging.info("Starting validation of rev: %s in repo: %s" % (job.rev, job.repo)) - return job.validate_rev(job.rev) - - -class Daemon(daemonize.Daemon): - def __init__(self, logfile, pidfile, umask, bdec): - daemonize.Daemon.__init__(self, logfile, pidfile) - - self.umask = umask - self.bdec = bdec - - def setup(self): - # There is no setup which the parent needs to wait for. - pass - - def run(self): - logging.info('svndumpsub started, pid=%d', os.getpid()) - - # Set the umask in the daemon process. Defaults to 000 for - # daemonized processes. Foreground processes simply inherit - # the value from the parent process. - if self.umask is not None: - umask = int(self.umask, 8) - os.umask(umask) - logging.info('umask set to %03o', umask) - - # Start the BDEC (on the main thread), then start the client - self.bdec.start() - - mc = svnpubsub.client.MultiClient(self.bdec.streams, - self.bdec.commit, - self._event) - mc.run_forever() - - def _event(self, url, event_name, event_arg): - if event_name == 'error': - logging.exception('from %s', url) - elif event_name == 'ping': - logging.debug('ping from %s', url) - else: - logging.info('"%s" from %s', event_name, url) + def commit(self, url: str, commit: Commit): + job = Job(commit) + self.worker.queue(job) def prepare_logging(logfile, level): @@ -656,17 +542,17 @@ def main(args): if options.daemon and not options.pidfile: parser.error('PIDFILE is required when running as a daemon') - bdec = BigDoEverythingClasss() - - # We manage the logfile ourselves (along with possible rotation). The - # daemon process can just drop stdout/stderr into /dev/null. - d = Daemon('/dev/null', os.path.abspath(options.pidfile), options.umask, bdec) + daemon = Daemon(name=os.path.basename(__file__), + logfile='/dev/null', + pidfile=os.path.abspath(options.pidfile) if options.pidfile else None, + umask=options.umask, + task=Task()) if options.daemon: # Daemonize the process and call sys.exit() with appropriate code - d.daemonize_exit() + daemon.daemonize_exit() else: # Just run in the foreground (the default) - d.foreground() + daemon.foreground() if __name__ == "__main__": diff --git a/svnpubsub/bgworker.py b/svnpubsub/bgworker.py index d8876b9..8500ff1 100644 --- a/svnpubsub/bgworker.py +++ b/svnpubsub/bgworker.py @@ -5,6 +5,23 @@ BACKLOG_TOO_HIGH = 500 +class BackgroundJob(object): + + def __init__(self, repo, rev, head, **kwargs): + self.repo = repo + self.rev = rev + self.head = head + # Set the kwargs as class attributes + for key, value in kwargs.items(): + setattr(self, key, value) + + def validate(self) -> bool: + raise NotImplementedError("The child class must supply its own implementation!") + + def run(self): + raise NotImplementedError("The child class must supply its own implementation!") + + class BackgroundWorker(Thread): def __init__(self, recursive=False, **kwargs): @@ -38,7 +55,7 @@ def run(self): except Exception: logging.exception('Exception in background worker.') - def queue(self, job): + def queue(self, job: BackgroundJob): # Start the thread when work first arrives. Thread-start needs to # be delayed in case the process forks itself to become a daemon. if not self.started: @@ -51,20 +68,3 @@ def __validate(self, job): logging.info("Validating r%s in: %s" % (job.rev, job.repo)) return job.validate() - -class BackgroundJob(object): - - def __init__(self, repo, rev, head, **kwargs): - self.repo = repo - self.rev = rev - self.head = head - # Set the kwargs as class attributes - for key, value in kwargs.items(): - setattr(self, key, value) - - def validate(self) -> bool: - raise NotImplementedError("The child class must supply its own implementation!") - - def run(self): - raise NotImplementedError("The child class must supply its own implementation!") - From a480a0629c1faef2f997cdd712e1b5fbdf47c7ff Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Mon, 6 May 2024 14:44:10 +0200 Subject: [PATCH 15/19] Improved exception handling when a commit fails to be queued so it wouldn't crash the daemon. --- svndumpsub.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index 043e2f0..20d072f 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -370,8 +370,11 @@ def start(self): logging.info('Daemon started.') def commit(self, url: str, commit: Commit): - job = Job(commit) - self.worker.queue(job) + try: + job = Job(commit) + self.worker.queue(job) + except Exception: + logging.exception('Failed to queue a job for r%s in: %s.', job.rev, job.repo) def prepare_logging(logfile, level): From c2d04fee733a23bc73813b50cee01fdca06d1d19 Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Wed, 8 May 2024 14:07:55 +0200 Subject: [PATCH 16/19] Improved the validation following the review comments. --- svndumpsub.py | 50 ++++++++++++++++++++++++-------------------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index 20d072f..a91d93d 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -131,7 +131,7 @@ def backup_commit(self): logging.info('Dumping and uploading rev: %s from repo: %s' % (self.rev, self.repo)) self.dump_zip_upload(self._get_svn_dump_args(self.rev, self.rev), self.rev) - def dump_zip_upload(self, dump_args, rev) -> bool: + def dump_zip_upload(self, dump_args, rev): shard_key = self.get_key(rev) prefix = '%s_%s_' % (self.repo, rev) with tempfile.NamedTemporaryFile(dir=TEMPDIR, prefix=prefix, suffix='.svndump.gz', delete=True) as gz: @@ -289,10 +289,16 @@ def _load_shard(self, shard): from_rev = shard to_rev = ((int(shard / self.shard_div) + 1) * self.shard_div) - 1 logging.info('Loading shard: %s to repo: %s' % (shard, self.repo)) - return self._load_zip(from_rev, to_rev) - - def _load_zip(self, from_rev, to_rev) -> (int, int): - start = end = None + if self.shard_size == 'shard3': + youngest = self._get_head(self.repo) + if (youngest % self.shard_div) != 0: + logging.error('Unable to load %s as the youngest revision: %d is not a multiple of %d', + self.shard_size, youngest, self.shard_div) + raise Exception('Unable to load %s as the youngest revision: %d is not a multiple of %d' % + (self.shard_size, youngest, self.shard_div)) + self._load_zip(from_rev, to_rev) + + def _load_zip(self, from_rev, to_rev): shard_key = self.get_key(str(from_rev)) path = '%s/%s' % (SVNROOT, self.repo) load_args = [SVNADMIN, 'load', path] @@ -319,30 +325,24 @@ def _load_zip(self, from_rev, to_rev) -> (int, int): _, load_stdout, _ = execute(*load_args, text=False, env=self.env, stdin=svndump) for line in load_stdout.decode("utf-8").splitlines(): logging.debug(line.rstrip()) - # Parse the revision from the output lines similar to: ------- Committed revision 4999 >>> - match = re.search(r"-+ Committed revision (\d+) >+", line) + # Make sure the shard was not loaded over an incorrect revision + match = re.search(r"-+ Committed new rev (\d+) \(loaded from original rev (\d+)\)", line) if match: - rev = int(match.group(1)) - if start is None: - start = rev - else: - start = min(start, rev) - if end is None: - end = rev - else: - end = max(end, rev) - if start is None or end is None: - logging.error('No shards were loaded') - return start, end + new_rev = int(match.group(1)) + original_rev = int(match.group(2)) + logging.error('The original rev: %d was committed to new rev: %d', original_rev, new_rev) + raise Exception('The original rev: %d was committed to new rev: %d' % (original_rev, new_rev)) youngest = self._get_head(self.repo) - if youngest != end: - logging.error('The last committed revision is not the youngest (%d != %d)', end, youngest) - raise Exception('The last committed revision is not the youngest (%d != %d)' % (end, youngest)) + if youngest != to_rev: + logging.error('The youngest revision did not match the expected revision (%d != %d)', + youngest, to_rev) + raise Exception('The youngest revision did not match the expected revision (%d != %d)' % + (youngest, to_rev)) if from_rev == to_rev: - logging.info('Loaded rev: %d from shard: %d to repo: %s', end, from_rev, self.repo) + logging.info('Loaded rev: %d from shard: %d to repo: %s', to_rev, from_rev, self.repo) else: logging.info('Loaded revs: (%d-%d) from shard: %d to repo: %s', - start, end, from_rev, self.repo) + from_rev, to_rev, from_rev, self.repo) if self.shard_size == 'shard0': logging.debug('Extracting the changed paths in the restored revision...') _, changes_stdout, _ = execute(*changed_args, text=True, env=self.env) @@ -358,8 +358,6 @@ def _load_zip(self, from_rev, to_rev) -> (int, int): "{" + ", ".join(changed_paths_after) + "}") raise Exception('Not all expected changed paths were successfully restored.') - return start, end - class Task(DaemonTask): From aba5b3cabc7066dfa3f0be988ff69894d0e1b268 Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Fri, 17 May 2024 18:31:57 +0200 Subject: [PATCH 17/19] Reverted the dump_zip_upload to its previous implementation to avoid using the disk. --- svndumpsub.py | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index a91d93d..5ff8459 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -32,6 +32,7 @@ import boto3 import tempfile import logging.handlers +import subprocess try: import queue @@ -133,17 +134,27 @@ def backup_commit(self): def dump_zip_upload(self, dump_args, rev): shard_key = self.get_key(rev) - prefix = '%s_%s_' % (self.repo, rev) - with tempfile.NamedTemporaryFile(dir=TEMPDIR, prefix=prefix, suffix='.svndump.gz', delete=True) as gz: - with gzip.open(gz, 'wb') as svndump: - # Svn admin dump - logging.debug('Dumping and compressing rev: %s', rev) - dump, _, _ = execute(*dump_args, text=False, env=self.env, stdout=svndump) - logging.debug('Uploading %d bytes for rev: %s to: %s', gz.tell(), rev, shard_key) - gz.seek(0) - # Upload the .svndump.gz file to s3 - s3client.upload_fileobj(gz, BUCKET, shard_key) - logging.info('Uploaded rev: %s from repo: %s to: %s', rev, self.repo, shard_key) + gz_args = ['/bin/gzip'] + # Svn admin dump + logging.debug('Dumping and compressing rev: %s', rev) + with subprocess.Popen(dump_args, stdout=subprocess.PIPE, env=self.env) as dump: + # Zip stdout + with subprocess.Popen(gz_args, stdin=dump.stdout, stdout=subprocess.PIPE) as gz: + dump.stdout.close() # Allow dump to receive a SIGPIPE if gz exits. + # Upload zip.stdout to s3 + logging.debug('Uploading rev: %s to: %s', rev, shard_key) + s3client.upload_fileobj(gz.stdout, BUCKET, shard_key) + logging.info('Uploaded rev: %s from repo: %s to: %s', rev, self.repo, shard_key) + gz.communicate()[0] + dump.poll() + + if dump.returncode != 0: + logging.error('Dumping shard failed (rc=%s): %s', dump.returncode, shard_key) + raise Exception('Dumping shard failed') + + if gz.returncode != 0: + logging.error('Compressing shard failed (rc=%s): %s', gz.returncode, shard_key) + raise Exception('Compressing shard failed') def cleanup_shards(self, shard_size, shard): prefix = self.get_s3_base(shard_size, shard) From 020870c0563e14062c797389d94fa22b637908c7 Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Tue, 21 May 2024 12:18:57 +0200 Subject: [PATCH 18/19] Comparing changed paths is now done before restoring a shard0 and between the previous shard and the last rev. --- svndumpsub.py | 80 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 52 insertions(+), 28 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index 5ff8459..095ee0d 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -34,6 +34,8 @@ import logging.handlers import subprocess +import botocore + try: import queue except ImportError: @@ -296,24 +298,69 @@ def _run(self, shards): # Restart or raise the maximum number of shards. raise Exception('Maximum number of shards processed') + def _extract_changed_paths_from_shard(self, shard) -> set: + changed_paths = set() + shard_key = self.get_key(str(shard)) + prefix = '%s_%s_' % (self.repo, shard) + with tempfile.NamedTemporaryFile(dir=TEMPDIR, prefix=prefix, suffix='.svndump.gz', delete=True) as gz: + logging.debug('Downloading shard from: %s to temporary file: %s', shard_key, gz.name) + try: + # Download from s3 + s3client.download_fileobj(BUCKET, shard_key, gz) + except botocore.exceptions.ClientError as e: + logging.info('Shard not found: %s', shard_key) + return None + gz.seek(0) + with gzip.open(gz, 'rb') as svndump: + logging.debug('Extracting the changed paths from shard: %d', shard) + for line in svndump: + if line.startswith(b'Node-path:'): + change = str(line.split(b':', 1)[1].decode('utf-8').strip()) + logging.debug(change) + changed_paths.add(change) + return changed_paths + + def _extract_changed_paths_from_rev(self, rev) -> set: + changed_paths = set() + path = '%s/%s' % (SVNROOT, self.repo) + changed_args = [SVNLOOK, 'changed', '-r', str(rev), path] + logging.debug('Extracting the changed paths from rev: %d', rev) + _, changes_stdout, _ = execute(*changed_args, text=True, env=self.env) + for line in changes_stdout.splitlines(): + match = re.search(r"(A|D|U|_U|UU)\s+(.*)", line) + if match and len(match.groups()) > 1: + change = str(match.group(2).rstrip().rstrip('/')) + changed_paths.add(change) + logging.debug(change) + return changed_paths + def _load_shard(self, shard): from_rev = shard + previous_rev = shard - 1 to_rev = ((int(shard / self.shard_div) + 1) * self.shard_div) - 1 logging.info('Loading shard: %s to repo: %s' % (shard, self.repo)) if self.shard_size == 'shard3': youngest = self._get_head(self.repo) - if (youngest % self.shard_div) != 0: - logging.error('Unable to load %s as the youngest revision: %d is not a multiple of %d', + if youngest != 0 and youngest % self.shard_div != self.shard_div - 1: + logging.error('Unable to load %s as the youngest revision: %d is not exactly or almost a multiple of %d', self.shard_size, youngest, self.shard_div) - raise Exception('Unable to load %s as the youngest revision: %d is not a multiple of %d' % - (self.shard_size, youngest, self.shard_div)) + raise Exception('Unable to load %s as the youngest revision: %d is not exactly or almost a multiple ' + 'of %d' % (self.shard_size, youngest, self.shard_div)) + elif self.shard_size == 'shard0': + changed_paths_rev = self._extract_changed_paths_from_rev(previous_rev) + changed_paths_shard = self._extract_changed_paths_from_shard(previous_rev) + if changed_paths_rev is not None and changed_paths_shard is not None: + if changed_paths_shard != changed_paths_rev: + logging.error('Not all expected changed paths for rev: %s were previously restored: %s != %s', + previous_rev, "{" + ", ".join(changed_paths_shard) + "}", + "{" + ", ".join(changed_paths_rev) + "}") + raise Exception('Not all expected changed paths were previously restored.') self._load_zip(from_rev, to_rev) def _load_zip(self, from_rev, to_rev): shard_key = self.get_key(str(from_rev)) path = '%s/%s' % (SVNROOT, self.repo) load_args = [SVNADMIN, 'load', path] - changed_args = [SVNLOOK, 'changed', '-r', str(from_rev), path] prefix = '%s_%s_' % (self.repo, from_rev) with tempfile.NamedTemporaryFile(dir=TEMPDIR, prefix=prefix, suffix='.svndump.gz', delete=True) as gz: logging.debug('Downloading shard from: %s to temporary file: %s', shard_key, gz.name) @@ -321,15 +368,6 @@ def _load_zip(self, from_rev, to_rev): s3client.download_fileobj(BUCKET, shard_key, gz) gz.seek(0) with gzip.open(gz, 'rb') as svndump: - changed_paths_after = set() - changed_paths_before = set() - if self.shard_size == 'shard0': - logging.debug('Extracting the changed paths from the dump file: %s', gz.name) - for line in svndump: - if line.startswith(b'Node-path:'): - change = str(line.split(b':', 1)[1].decode('utf-8').strip()) - logging.debug(change) - changed_paths_before.add(change) # svnadmin load svndump.seek(0) logging.debug('Decompressing and loading shard from: %s', gz.name) @@ -354,20 +392,6 @@ def _load_zip(self, from_rev, to_rev): else: logging.info('Loaded revs: (%d-%d) from shard: %d to repo: %s', from_rev, to_rev, from_rev, self.repo) - if self.shard_size == 'shard0': - logging.debug('Extracting the changed paths in the restored revision...') - _, changes_stdout, _ = execute(*changed_args, text=True, env=self.env) - for line in changes_stdout.splitlines(): - match = re.search(r"(A|D|U|_U|UU)\s+(.*)", line) - if match and len(match.groups()) > 1: - change = str(match.group(2).rstrip().rstrip('/')) - changed_paths_after.add(change) - logging.debug(change) - if changed_paths_before != changed_paths_after: - logging.error('Not all expected changed paths were successfully restored: %s != %s', - "{" + ", ".join(changed_paths_before) + "}", - "{" + ", ".join(changed_paths_after) + "}") - raise Exception('Not all expected changed paths were successfully restored.') class Task(DaemonTask): From 517b9fc009820faf47923464d216eb99c191032f Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Wed, 22 May 2024 12:29:35 +0200 Subject: [PATCH 19/19] Added support for the PID file locking to svnloadsub. --- svndumpsub.py | 52 ++++++++++++++++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index 095ee0d..76445b0 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -342,8 +342,9 @@ def _load_shard(self, shard): if self.shard_size == 'shard3': youngest = self._get_head(self.repo) if youngest != 0 and youngest % self.shard_div != self.shard_div - 1: - logging.error('Unable to load %s as the youngest revision: %d is not exactly or almost a multiple of %d', - self.shard_size, youngest, self.shard_div) + logging.error( + 'Unable to load %s as the youngest revision: %d is not exactly or almost a multiple of %d', + self.shard_size, youngest, self.shard_div) raise Exception('Unable to load %s as the youngest revision: %d is not exactly or almost a multiple ' 'of %d' % (self.shard_size, youngest, self.shard_div)) elif self.shard_size == 'shard0': @@ -434,6 +435,30 @@ def prepare_logging(logfile, level): logging.getLogger('urllib3').setLevel(logging.INFO) +def create_pid_file(name): + if os.path.isfile(name): + with open(name, 'r') as pidfile: + try: + pid = int(pidfile.read().strip()) + if pid and os.path.exists(f"/proc/{pid}"): + logging.error("Another instance of the daemon is already running with PID %d.", pid) + sys.exit(1) + except ValueError: + pass + + with open(name, 'w') as pidfile: + pid = os.getpid() + pidfile.write(str(pid)) + logging.info('pid %d written to %s', pid, name) + + +def remove_pid_file(name): + try: + os.remove(name) + except OSError: + pass + + def handle_options(options): if not options.aws: raise ValueError('A valid --aws has to be provided (path to aws executable)') @@ -482,21 +507,6 @@ def handle_options(options): # Set up the logging, then process the rest of the options. - # In daemon mode, we let the daemonize module handle the pidfile. - # Otherwise, we should write this (foreground) PID into the file. - if options.pidfile and not options.daemon: - pid = os.getpid() - # Be wary of symlink attacks - try: - os.remove(options.pidfile) - except OSError: - pass - fd = os.open(options.pidfile, os.O_WRONLY | os.O_CREAT | os.O_EXCL, - stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) - os.write(fd, b'%d\n' % pid) - os.close(fd) - logging.info('pid %d written to %s', pid, options.pidfile) - if options.gid: try: gid = int(options.gid) @@ -568,6 +578,11 @@ def main(args): # Process any provided options. handle_options(options) + # In daemon mode, we let the daemonize module handle the pidfile. + # Otherwise, we should write this (foreground) PID into the file. + if options.pidfile and not options.daemon: + create_pid_file(options.pidfile) + if options.history: JobMulti(options.history, options.shardsize) elif options.load: @@ -590,6 +605,9 @@ def main(args): # Just run in the foreground (the default) daemon.foreground() + if options.pidfile and not options.daemon: + remove_pid_file(options.pidfile) + if __name__ == "__main__": main(sys.argv[1:])