diff --git a/svndumpsub.py b/svndumpsub.py index 7ddc8c0..76445b0 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -16,7 +16,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # - # # SvnDumpSub - Subscribe to a SvnPubSub stream, dumps commits, zips and uploads them to AWS S3. # @@ -25,116 +24,97 @@ # # On startup SvnDumpSub starts listening to commits in all repositories. # - -from io import BytesIO -import subprocess -import threading import sys import stat import os -import tempfile import re -import json -import socket +import gzip import boto3 +import tempfile import logging.handlers +import subprocess + +import botocore + try: - import queue + import queue except ImportError: - import queue as Queue + import queue as Queue import optparse -import daemonize -import svnpubsub.client -import svnpubsub.util +from svnpubsub.util import execute +from svnpubsub.client import Commit +from svnpubsub.daemon import Daemon, DaemonTask +from svnpubsub.bgworker import BackgroundJob -HOST = "127.0.0.1" PORT = 2069 -#Will not handle commits if repo starts with any name icluded in REPO_EXCLUDES -REPO_EXCLUDES = ['demo', 'repo'] +HOST = "127.0.0.1" +# Will not handle commits if repo starts with any name included in EXCLUDED_REPOS +EXCLUDED_REPOS = ['demo', 'repo'] s3client = boto3.client('s3') -assert hasattr(subprocess, 'check_call') -def check_call(*args, **kwds): - """Wrapper around subprocess.check_call() that logs stderr upon failure, - with an optional list of exit codes to consider non-failure.""" - assert 'stderr' not in kwds - if '__okayexits' in kwds: - __okayexits = kwds['__okayexits'] - del kwds['__okayexits'] - else: - __okayexits = set([0]) # EXIT_SUCCESS - kwds.update(stderr=subprocess.PIPE) - pipe = subprocess.Popen(*args, **kwds) - output, errput = pipe.communicate() - if pipe.returncode not in __okayexits: - cmd = args[0] if len(args) else kwds.get('args', '(no command)') - logging.error('Command failed: returncode=%d command=%r stderr=%r', - pipe.returncode, cmd, errput) - raise subprocess.CalledProcessError(pipe.returncode, args) - return pipe.returncode # is EXIT_OK - -class Job(object): - - def __init__(self, repo, rev, head): - self.repo = repo - self.rev = rev - self.head = head - self.env = {'LANG': 'en_US.UTF-8', 'LC_ALL': 'en_US.UTF-8'} +class Job(BackgroundJob): + + def __init__(self, commit: Commit): + super().__init__(repo=commit.repositoryname, rev=commit.id, head=commit.id, commit=commit) + self.shard_size = 'shard0' + self.env = {'LANG': 'en_US.UTF-8', 'LC_ALL': 'en_US.UTF-8'} + + def validate(self) -> bool: + return self.validate_rev(self.rev) + + def run(self): + self.backup_commit() def get_key(self, rev): - #/v1/Cloudid/reponame/shardX/0000001000/reponame-0000001000.svndump.gz - return '%s/%s' % (self._get_s3_base(rev = rev), self.get_name(rev)) + # /v1/Cloudid/reponame/shardX/0000001000/reponame-0000001000.svndump.gz + return '%s/%s' % (self.get_s3_base(self.shard_size, rev), self.get_name(rev)) def get_name(self, rev): revStr = str(rev) revStr = revStr.zfill(10) name = self.repo + '-' + revStr + '.svndump.gz' - #reponame-0000001000.svndump.gz + # reponame-0000001000.svndump.gz return name - def _get_s3_base(self, rev): - + def get_s3_base(self, shard_size, rev): # Always using 1000 for folders, can not yet support >shard3. d = int(rev) / 1000 d = str(int(d)) + '000' shard_number = d.zfill(10) - version = 'v1' # v1/CLOUDID/demo1/shard0/0000000000 - return '%s/%s/%s/%s/%s' % (version, CLOUDID, self.repo, self.shard_size, shard_number) + return '%s/%s/%s/%s/%s' % (version, CLOUDID, self.repo, shard_size, shard_number) def _get_svn_dump_args(self, from_rev, to_rev): path = '%s/%s' % (SVNROOT, self.repo) dump_rev = '-r%s:%s' % (from_rev, to_rev) - #svnadmin dump --incremental --deltas /srv/cms/svn/demo1 -r 237:237 + # svnadmin dump --incremental --deltas /srv/cms/svn/demo1 -r 237:237 return [SVNADMIN, 'dump', '--incremental', '--deltas', path, dump_rev] - - #def _get_aws_cp_args(self, rev): - # # aws s3 cp - s3://cms-review-jandersson/v1/jandersson/demo1/shard0/0000000000/demo1-0000000363.svndump.gz - # return [AWS, 's3', 'cp', '-', 's3://%s/%s' % (BUCKET, self.get_key(rev))] + # def _get_aws_cp_args(self, rev): + # # aws s3 cp - s3://cms-review-jandersson/v1/jandersson/demo1/shard0/0000000000/demo1-0000000363.svndump.gz + # return [AWS, 's3', 'cp', '-', 's3://%s/%s' % (BUCKET, self.get_key(rev))] def _validate_shard(self, rev): key = self.get_key(rev) try: response = s3client.head_object(Bucket=BUCKET, Key=key) logging.debug('Shard key exists: %s' % key) - if (not response["ContentLength"] > 0): + if not response["ContentLength"] > 0: logging.warning('Dump file empty: %s' % key) return False - #logging.info(response) + # logging.info(response) return True except Exception as err: logging.debug("S3 exception: {0}".format(err)) logging.info('Shard key does not exist: s3://%s/%s' % (BUCKET, key)) return False - - #Will recursively check a bucket if (rev - 1) exists until it finds a rev dump. + # Will recursively check a bucket if (rev - 1) exists until it finds a rev dump. def validate_rev(self, rev): validate_to_rev = self._get_validate_to_rev() @@ -146,40 +126,48 @@ def validate_rev(self, rev): return self._validate_shard(rev_to_validate) - def _get_validate_to_rev(self): rev_round_down = int((self.head - 1) / 1000) return rev_round_down * 1000 - def _backup_commit(self): + def backup_commit(self): logging.info('Dumping and uploading rev: %s from repo: %s' % (self.rev, self.repo)) self.dump_zip_upload(self._get_svn_dump_args(self.rev, self.rev), self.rev) - def dump_zip_upload(self, dump_args, rev): shard_key = self.get_key(rev) - - gz = '/bin/gzip' - gz_args = [gz] - + gz_args = ['/bin/gzip'] # Svn admin dump - p1 = subprocess.Popen((dump_args), stdout=subprocess.PIPE, env=self.env) - # Zip stdout - p2 = subprocess.Popen((gz_args), stdin=p1.stdout, stdout=subprocess.PIPE) - p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits. - # Upload zip.stdout to s3 - s3client.upload_fileobj(p2.stdout, BUCKET, shard_key) - #TODO: Do we need to close stuff? - p2.communicate()[0] - p1.poll() - - if p1.returncode != 0: - logging.error('Dumping shard failed (rc=%s): %s', p1.returncode, shard_key) - raise Exception('Dumping shard failed') - - if p2.returncode != 0: - logging.error('Compressing shard failed (rc=%s): %s', p2.returncode, shard_key) - raise Exception('Compressing shard failed') + logging.debug('Dumping and compressing rev: %s', rev) + with subprocess.Popen(dump_args, stdout=subprocess.PIPE, env=self.env) as dump: + # Zip stdout + with subprocess.Popen(gz_args, stdin=dump.stdout, stdout=subprocess.PIPE) as gz: + dump.stdout.close() # Allow dump to receive a SIGPIPE if gz exits. + # Upload zip.stdout to s3 + logging.debug('Uploading rev: %s to: %s', rev, shard_key) + s3client.upload_fileobj(gz.stdout, BUCKET, shard_key) + logging.info('Uploaded rev: %s from repo: %s to: %s', rev, self.repo, shard_key) + gz.communicate()[0] + dump.poll() + + if dump.returncode != 0: + logging.error('Dumping shard failed (rc=%s): %s', dump.returncode, shard_key) + raise Exception('Dumping shard failed') + + if gz.returncode != 0: + logging.error('Compressing shard failed (rc=%s): %s', gz.returncode, shard_key) + raise Exception('Compressing shard failed') + + def cleanup_shards(self, shard_size, shard): + prefix = self.get_s3_base(shard_size, shard) + try: + s3 = boto3.resource('s3') + bucket = s3.Bucket(BUCKET) + logging.debug('Cleaning up shards: %s/*', prefix) + bucket.objects.filter(Prefix=prefix).delete() + logging.info('Cleaned up shards: %s/*', prefix) + except Exception: + logging.exception('Failed to cleanup: %s/*', prefix) # Processing one repo, specified in history option @@ -196,7 +184,7 @@ def __init__(self, repo, shard_size): self.rev_min = 0 elif shard_size == 'shard0': self.shard_div = 1 - self.shard_div_next = 1000 # next larger shard + self.shard_div_next = 1000 # next larger shard self.rev_min = int(self.head / self.shard_div_next) * self.shard_div_next # shard0 revisions might be skipped when using only --history without svnpubsub # revision will be dumped in shard3 but can be a problem for slave servers loading single revisions @@ -206,7 +194,7 @@ def __init__(self, repo, shard_size): logging.error('Unsupported shard type: %s' % shard_size) raise Exception('Unsupported shard type') - logging.info('Processing repo %s with head revision %s' % (self.repo, self.head)) + logging.info('Processing repo: %s with head revision: %s' % (self.repo, self.head)) shards = self._get_shards(self.head) self._run(shards) @@ -227,20 +215,17 @@ def _get_head(self, repo): raise Exception('Repository does not exist') # Considered using svn to enable rdump in the future. - #fqdn = socket.getfqdn() - #url = 'http://%s/svn/%s' % (fqdn, repo) + # fqdn = socket.getfqdn() + # url = 'http://%s/svn/%s' % (fqdn, repo) - #args = [SVN, 'info', url] - #grep_args = ['/bin/grep', 'Revision:'] + # args = [SVN, 'info', url] + # grep_args = ['/bin/grep', 'Revision:'] - args = [SVNLOOK, 'youngest', path] - grep_args = ['/bin/grep', '^[0-9]\+'] - - p1 = subprocess.Popen((args), stdout=subprocess.PIPE) - output = subprocess.check_output((grep_args), stdin=p1.stdout) - - rev = int(''.join(filter(str.isdigit, output.decode("utf-8")))) - logging.info('Repository %s youngest: %s' % (repo, rev)) + svnlook_args = [SVNLOOK, 'youngest', path] + svnlook, svnlook_stdout, _ = execute(*svnlook_args, text=True, env=self.env) + match = re.match(r'^(\d+)', svnlook_stdout) + rev = int(match.group(1)) if match else None + logging.info('Repository: %s youngest: %s' % (repo, rev)) return rev def _get_shards(self, head): @@ -253,14 +238,15 @@ def _get_shards(self, head): # Upper limit must be +1 before division (both shard3 and shard0). return list(range(self.rev_min, int((head + 1) / self.shard_div) * self.shard_div, self.shard_div)) - def _backup_shard(self, shard): logging.info('Dumping and uploading shard: %s from repo: %s' % (shard, self.repo)) - start_rev = str(shard) + from_rev = str(shard) to_rev = str(((int(shard / self.shard_div) + 1) * self.shard_div) - 1) - - svn_args = self._get_svn_dump_args(start_rev, to_rev) - self.dump_zip_upload(svn_args, start_rev) + svn_args = self._get_svn_dump_args(from_rev, to_rev) + self.dump_zip_upload(svn_args, from_rev) + # Clean up corresponding shard0 after successfully dumping a shard3. + if self.shard_size == 'shard3': + self.cleanup_shards('shard0', shard) class JobMultiLoad(JobMulti): @@ -275,7 +261,7 @@ def __init__(self, repo): else: self.rev_min = self.head + 1 - logging.info('Processing repo %s with head revision %s' % (self.repo, self.head)) + logging.info('Processing repo: %s with head revision: %s' % (self.repo, self.head)) # First process large shards if local head is divisible with shard size. if self.rev_min % 1000 == 0: self.shard_size = 'shard3' @@ -283,7 +269,6 @@ def __init__(self, repo): shards = self._get_shards(self.rev_min + 1000 * self.shard_div) self._run(shards) - # Refresh head after potentially loading large dumps. self.head = self._get_head(self.repo) if self.head == 0: @@ -300,229 +285,191 @@ def __init__(self, repo): self._run(shards) def _run(self, shards): - logging.info('Shards length %s' % len(shards)) + logging.info('Shards length: %s' % len(shards)) for shard in shards: dump_exists = self._validate_shard(shard) if dump_exists: - logging.info('Shard exists, will load shard %s' % shard) + logging.info('Shard exists, will load shard: %s' % shard) self._load_shard(shard) continue else: - logging.info('Shard does not exist, done for now - %s' % shard) - break - logging.warning('Maximum number of shards processed, terminating.') - # Restart or raise the maximum number of shards. - raise Exception('Maximum number of shards processed') - + logging.info('Shard: %s does not exist, done for now' % shard) + return + # Restart or raise the maximum number of shards. + raise Exception('Maximum number of shards processed') + + def _extract_changed_paths_from_shard(self, shard) -> set: + changed_paths = set() + shard_key = self.get_key(str(shard)) + prefix = '%s_%s_' % (self.repo, shard) + with tempfile.NamedTemporaryFile(dir=TEMPDIR, prefix=prefix, suffix='.svndump.gz', delete=True) as gz: + logging.debug('Downloading shard from: %s to temporary file: %s', shard_key, gz.name) + try: + # Download from s3 + s3client.download_fileobj(BUCKET, shard_key, gz) + except botocore.exceptions.ClientError as e: + logging.info('Shard not found: %s', shard_key) + return None + gz.seek(0) + with gzip.open(gz, 'rb') as svndump: + logging.debug('Extracting the changed paths from shard: %d', shard) + for line in svndump: + if line.startswith(b'Node-path:'): + change = str(line.split(b':', 1)[1].decode('utf-8').strip()) + logging.debug(change) + changed_paths.add(change) + return changed_paths + + def _extract_changed_paths_from_rev(self, rev) -> set: + changed_paths = set() + path = '%s/%s' % (SVNROOT, self.repo) + changed_args = [SVNLOOK, 'changed', '-r', str(rev), path] + logging.debug('Extracting the changed paths from rev: %d', rev) + _, changes_stdout, _ = execute(*changed_args, text=True, env=self.env) + for line in changes_stdout.splitlines(): + match = re.search(r"(A|D|U|_U|UU)\s+(.*)", line) + if match and len(match.groups()) > 1: + change = str(match.group(2).rstrip().rstrip('/')) + changed_paths.add(change) + logging.debug(change) + return changed_paths def _load_shard(self, shard): - logging.info('Loading shard: %s from repo: %s' % (shard, self.repo)) - start_rev = str(shard) - to_rev = str(((int(shard / self.shard_div) + 1) * self.shard_div) - 1) - - logging.info('Loading shard %s' % shard) - self.load_zip(start_rev) - - - def load_zip(self, rev): - shard_key = self.get_key(rev) - - gz = '/bin/gunzip' - gz_args = [gz, '-c'] - + from_rev = shard + previous_rev = shard - 1 + to_rev = ((int(shard / self.shard_div) + 1) * self.shard_div) - 1 + logging.info('Loading shard: %s to repo: %s' % (shard, self.repo)) + if self.shard_size == 'shard3': + youngest = self._get_head(self.repo) + if youngest != 0 and youngest % self.shard_div != self.shard_div - 1: + logging.error( + 'Unable to load %s as the youngest revision: %d is not exactly or almost a multiple of %d', + self.shard_size, youngest, self.shard_div) + raise Exception('Unable to load %s as the youngest revision: %d is not exactly or almost a multiple ' + 'of %d' % (self.shard_size, youngest, self.shard_div)) + elif self.shard_size == 'shard0': + changed_paths_rev = self._extract_changed_paths_from_rev(previous_rev) + changed_paths_shard = self._extract_changed_paths_from_shard(previous_rev) + if changed_paths_rev is not None and changed_paths_shard is not None: + if changed_paths_shard != changed_paths_rev: + logging.error('Not all expected changed paths for rev: %s were previously restored: %s != %s', + previous_rev, "{" + ", ".join(changed_paths_shard) + "}", + "{" + ", ".join(changed_paths_rev) + "}") + raise Exception('Not all expected changed paths were previously restored.') + self._load_zip(from_rev, to_rev) + + def _load_zip(self, from_rev, to_rev): + shard_key = self.get_key(str(from_rev)) path = '%s/%s' % (SVNROOT, self.repo) load_args = [SVNADMIN, 'load', path] - - # Temporary file - # TODO: Use specific tmp dir. - prefix = '%s_%s_' % (self.repo, rev) - fp = tempfile.NamedTemporaryFile(prefix=prefix, suffix='.svndump.gz', delete=True) - logging.debug('Downloading shard to temporary file: %s', fp.name) - # Download from s3 - s3client.download_fileobj(BUCKET, shard_key, fp) - fp.seek(0) - # gunzip - p1 = subprocess.Popen((gz_args), stdin=fp, stdout=subprocess.PIPE, env=self.env) - # svnadmin load - p2 = subprocess.Popen((load_args), stdin=p1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) - p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits. - #TODO: Do we need to close stuff? - p2.communicate()[0] - logging.debug('Load return code: %s', p2.returncode) - p1.poll() - logging.debug('Gunzip return code: %s', p1.returncode) - - # Closing tmp file should delete it. - fp.close() - - if p1.returncode != 0: - logging.error('Decompressing shard failed (rc=%s): %s', p1.returncode, shard_key) - raise Exception('Decompressing shard failed') - - if p2.returncode != 0: - logging.error('Loading shard failed (rc=%s): %s', p2.returncode, shard_key) - raise Exception('Loading shard failed') - - # TODO Analyze output, should conclude with (ensure revision is correct for shard): - # ------- Committed revision 4999 >>> - -class BigDoEverythingClasss(object): - #removed the config object from __init__. - def __init__(self): - self.streams = ["http://%s:%d/commits" %(HOST, PORT)] - - self.hook = None - self.svnbin = SVNADMIN - self.worker = BackgroundWorker(self.svnbin, self.hook) - self.watch = [ ] - - def start(self): - logging.info('start') - - def commit(self, url, commit): - if commit.type != 'svn' or commit.format != 1: - logging.info("SKIP unknown commit format (%s.%d)", - commit.type, commit.format) - return - logging.info("COMMIT r%d (%d paths) from %s" - % (commit.id, len(commit.changed), url)) - - excluded = False - for repo in REPO_EXCLUDES: - if commit.repositoryname.startswith(repo): - logging.info('Commit in excluded repository, ignoring: %s' % commit.repositoryname) - excluded = True - - if not excluded: - job = Job(commit.repositoryname, commit.id, commit.id) - self.worker.add_job(job) - -# Start logging warnings if the work backlog reaches this many items -BACKLOG_TOO_HIGH = 500 - -class BackgroundWorker(threading.Thread): - def __init__(self, svnbin, hook): - threading.Thread.__init__(self) - - # The main thread/process should not wait for this thread to exit. - ### compat with Python 2.5 - self.setDaemon(True) - - self.svnbin = svnbin - self.hook = hook - self.q = queue.PriorityQueue() - - self.has_started = False - - def run(self): - while True: - # This will block until something arrives - tuple = self.q.get() - job = tuple[1] - - # Warn if the queue is too long. - # (Note: the other thread might have added entries to self.q - # after the .get() and before the .qsize().) - qsize = self.q.qsize()+1 - if qsize > BACKLOG_TOO_HIGH: - logging.warn('worker backlog is at %d', qsize) - - try: - prev_exists = self._validate(job) - if prev_exists: - job._backup_commit() + prefix = '%s_%s_' % (self.repo, from_rev) + with tempfile.NamedTemporaryFile(dir=TEMPDIR, prefix=prefix, suffix='.svndump.gz', delete=True) as gz: + logging.debug('Downloading shard from: %s to temporary file: %s', shard_key, gz.name) + # Download from s3 + s3client.download_fileobj(BUCKET, shard_key, gz) + gz.seek(0) + with gzip.open(gz, 'rb') as svndump: + # svnadmin load + svndump.seek(0) + logging.debug('Decompressing and loading shard from: %s', gz.name) + _, load_stdout, _ = execute(*load_args, text=False, env=self.env, stdin=svndump) + for line in load_stdout.decode("utf-8").splitlines(): + logging.debug(line.rstrip()) + # Make sure the shard was not loaded over an incorrect revision + match = re.search(r"-+ Committed new rev (\d+) \(loaded from original rev (\d+)\)", line) + if match: + new_rev = int(match.group(1)) + original_rev = int(match.group(2)) + logging.error('The original rev: %d was committed to new rev: %d', original_rev, new_rev) + raise Exception('The original rev: %d was committed to new rev: %d' % (original_rev, new_rev)) + youngest = self._get_head(self.repo) + if youngest != to_rev: + logging.error('The youngest revision did not match the expected revision (%d != %d)', + youngest, to_rev) + raise Exception('The youngest revision did not match the expected revision (%d != %d)' % + (youngest, to_rev)) + if from_rev == to_rev: + logging.info('Loaded rev: %d from shard: %d to repo: %s', to_rev, from_rev, self.repo) else: - logging.info('Rev - 1 has not been dumped, adding it to the queue') - self.add_job(job) - self.add_job(Job(job.repo, job.rev - 1, job.head)) - - self.q.task_done() - except: - logging.exception('Exception in worker') + logging.info('Loaded revs: (%d-%d) from shard: %d to repo: %s', + from_rev, to_rev, from_rev, self.repo) - def add_job(self, job): - # Start the thread when work first arrives. Thread-start needs to - # be delayed in case the process forks itself to become a daemon. - if not self.has_started: - self.start() - self.has_started = True - self.q.put((job.rev, job)) +class Task(DaemonTask): - def _validate(self, job, boot=False): - "Validate the specific job." - logging.info("Starting validation of rev: %s in repo: %s" % (job.rev, job.repo)) - return job.validate_rev(job.rev) - -class Daemon(daemonize.Daemon): - def __init__(self, logfile, pidfile, umask, bdec): - daemonize.Daemon.__init__(self, logfile, pidfile) - - self.umask = umask - self.bdec = bdec + def __init__(self): + super().__init__(urls=["http://%s:%d/commits" % (HOST, PORT)], excluded_repos=EXCLUDED_REPOS) - def setup(self): - # There is no setup which the parent needs to wait for. - pass + def start(self): + logging.info('Daemon started.') - def run(self): - logging.info('svndumpsub started, pid=%d', os.getpid()) - - # Set the umask in the daemon process. Defaults to 000 for - # daemonized processes. Foreground processes simply inherit - # the value from the parent process. - if self.umask is not None: - umask = int(self.umask, 8) - os.umask(umask) - logging.info('umask set to %03o', umask) - - # Start the BDEC (on the main thread), then start the client - self.bdec.start() - - mc = svnpubsub.client.MultiClient(self.bdec.streams, - self.bdec.commit, - self._event) - mc.run_forever() - - def _event(self, url, event_name, event_arg): - if event_name == 'error': - logging.exception('from %s', url) - elif event_name == 'ping': - logging.debug('ping from %s', url) - else: - logging.info('"%s" from %s', event_name, url) + def commit(self, url: str, commit: Commit): + try: + job = Job(commit) + self.worker.queue(job) + except Exception: + logging.exception('Failed to queue a job for r%s in: %s.', job.rev, job.repo) -def prepare_logging(logfile): - "Log to the specified file, or to stdout if None." +def prepare_logging(logfile, level): + """Log to the specified file, or to stdout if None.""" if logfile: # Rotate logs daily, keeping 7 days worth. - handler = logging.handlers.TimedRotatingFileHandler( - logfile, when='midnight', backupCount=7, - ) + handler = logging.handlers.TimedRotatingFileHandler(logfile, when='midnight', backupCount=7) else: - handler = logging.StreamHandler(sys.stdout) + handler = logging.getLogger().handlers[0] # Add a timestamp to the log records - formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', - '%Y-%m-%d %H:%M:%S') + formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S') handler.setFormatter(formatter) - # Apply the handler to the root logger + # Remove all existing handlers and apply the new handler to the root logger root = logging.getLogger() root.addHandler(handler) + root.setLevel(level) - ### use logging.INFO for now. switch to cmdline option or a config? - root.setLevel(logging.INFO) + # Suppress the unnecessary boto3 logs + logging.getLogger('boto3').setLevel(logging.INFO) + logging.getLogger('botocore').setLevel(logging.INFO) + logging.getLogger('s3transfer').setLevel(logging.INFO) + logging.getLogger('urllib3').setLevel(logging.INFO) -def handle_options(options): +def create_pid_file(name): + if os.path.isfile(name): + with open(name, 'r') as pidfile: + try: + pid = int(pidfile.read().strip()) + if pid and os.path.exists(f"/proc/{pid}"): + logging.error("Another instance of the daemon is already running with PID %d.", pid) + sys.exit(1) + except ValueError: + pass + + with open(name, 'w') as pidfile: + pid = os.getpid() + pidfile.write(str(pid)) + logging.info('pid %d written to %s', pid, name) + + +def remove_pid_file(name): + try: + os.remove(name) + except OSError: + pass + + +def handle_options(options): if not options.aws: raise ValueError('A valid --aws has to be provided (path to aws executable)') else: global AWS AWS = options.aws + if options.tempdir: + global TEMPDIR + TEMPDIR = options.tempdir + if not options.svnadmin: raise ValueError('A valid --svnadmin has to be provided (path to svnadmin executable)') else: @@ -560,22 +507,6 @@ def handle_options(options): # Set up the logging, then process the rest of the options. - - # In daemon mode, we let the daemonize module handle the pidfile. - # Otherwise, we should write this (foreground) PID into the file. - if options.pidfile and not options.daemon: - pid = os.getpid() - # Be wary of symlink attacks - try: - os.remove(options.pidfile) - except OSError: - pass - fd = os.open(options.pidfile, os.O_WRONLY | os.O_CREAT | os.O_EXCL, - stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) - os.write(fd, b'%d\n' % pid) - os.close(fd) - logging.info('pid %d written to %s', pid, options.pidfile) - if options.gid: try: gid = int(options.gid) @@ -594,52 +525,64 @@ def handle_options(options): logging.info('setting uid %d', uid) os.setuid(uid) - prepare_logging(options.logfile) + prepare_logging(options.logfile, options.log_level) + def main(args): parser = optparse.OptionParser( description='An SvnPubSub client to keep working copies synchronized ' 'with a repository.', usage='Usage: %prog [options] CONFIG_FILE', - ) + ) parser.add_option('--logfile', - help='filename for logging') + help='filename for logging') + parser.add_option('--log-level', type=int, metavar='level', default=logging.INFO, + help='debug logging level (DEBUG: %d | INFO: %d | WARNING: %d | ERROR: %d | CRITICAL: %d) ' + '(default: %d)' % (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, + logging.CRITICAL, logging.INFO)) parser.add_option('--pidfile', - help="the process' PID will be written to this file") + help="the process' PID will be written to this file") + parser.add_option('--tempdir', + help="temporary directory for storing downloaded files", default=tempfile.gettempdir()) parser.add_option('--uid', - help='switch to this UID before running') + help='switch to this UID before running') parser.add_option('--gid', - help='switch to this GID before running') + help='switch to this GID before running') parser.add_option('--daemon', action='store_true', - help='run as a background daemon') + help='run as a background daemon') parser.add_option('--umask', - help='set this (octal) umask before running') + help='set this (octal) umask before running') parser.add_option('--history', - help='Will dump and backup repository in shard3 ranges (even thousands), e.g --history reponame') + help='Will dump and backup repository in shard3 ranges (even thousands), e.g --history reponame') parser.add_option('--shardsize', default='shard3', - help='Shard size used by --history. Assumes that shard3 is executed before shard0.') + help='Shard size used by --history. Assumes that shard3 is executed before shard0.') parser.add_option('--load', - help='Will load repository from shards in size order (shard3 then shard0), e.g --load reponame') + help='Will load repository from shards in size order (shard3 then shard0), e.g --load reponame') parser.add_option('--aws', - help='path to aws executable e.g /usr/bin/aws') + help='path to aws executable e.g /usr/bin/aws') parser.add_option('--svnadmin', - help='path to svnadmin executable e.g /usr/bin/svnadmin') + help='path to svnadmin executable e.g /usr/bin/svnadmin') parser.add_option('--svnlook', - help='path to svnlook executable e.g /usr/bin/svnlook') + help='path to svnlook executable e.g /usr/bin/svnlook') parser.add_option('--svnroot', - help='path to repository locations /srv/cms/svn') + help='path to repository locations /srv/cms/svn') parser.add_option('--svn', - help='path to svn executable only required when combined with --history e.g /usr/bin/svn') + help='path to svn executable only required when combined with --history e.g /usr/bin/svn') parser.add_option('--bucket', - help='name of S3 bucket where dumps will be stored') + help='name of S3 bucket where dumps will be stored') parser.add_option('--cloudid', - help='AWS cloud-id') + help='AWS cloud-id') options, extra = parser.parse_args(args) # Process any provided options. handle_options(options) + # In daemon mode, we let the daemonize module handle the pidfile. + # Otherwise, we should write this (foreground) PID into the file. + if options.pidfile and not options.daemon: + create_pid_file(options.pidfile) + if options.history: JobMulti(options.history, options.shardsize) elif options.load: @@ -650,18 +593,20 @@ def main(args): if options.daemon and not options.pidfile: parser.error('PIDFILE is required when running as a daemon') - bdec = BigDoEverythingClasss() - - # We manage the logfile ourselves (along with possible rotation). The - # daemon process can just drop stdout/stderr into /dev/null. - d = Daemon('/dev/null', os.path.abspath(options.pidfile), - options.umask, bdec) + daemon = Daemon(name=os.path.basename(__file__), + logfile='/dev/null', + pidfile=os.path.abspath(options.pidfile) if options.pidfile else None, + umask=options.umask, + task=Task()) if options.daemon: # Daemonize the process and call sys.exit() with appropriate code - d.daemonize_exit() + daemon.daemonize_exit() else: # Just run in the foreground (the default) - d.foreground() + daemon.foreground() + + if options.pidfile and not options.daemon: + remove_pid_file(options.pidfile) if __name__ == "__main__": diff --git a/svnpubsub/bgworker.py b/svnpubsub/bgworker.py index d8876b9..8500ff1 100644 --- a/svnpubsub/bgworker.py +++ b/svnpubsub/bgworker.py @@ -5,6 +5,23 @@ BACKLOG_TOO_HIGH = 500 +class BackgroundJob(object): + + def __init__(self, repo, rev, head, **kwargs): + self.repo = repo + self.rev = rev + self.head = head + # Set the kwargs as class attributes + for key, value in kwargs.items(): + setattr(self, key, value) + + def validate(self) -> bool: + raise NotImplementedError("The child class must supply its own implementation!") + + def run(self): + raise NotImplementedError("The child class must supply its own implementation!") + + class BackgroundWorker(Thread): def __init__(self, recursive=False, **kwargs): @@ -38,7 +55,7 @@ def run(self): except Exception: logging.exception('Exception in background worker.') - def queue(self, job): + def queue(self, job: BackgroundJob): # Start the thread when work first arrives. Thread-start needs to # be delayed in case the process forks itself to become a daemon. if not self.started: @@ -51,20 +68,3 @@ def __validate(self, job): logging.info("Validating r%s in: %s" % (job.rev, job.repo)) return job.validate() - -class BackgroundJob(object): - - def __init__(self, repo, rev, head, **kwargs): - self.repo = repo - self.rev = rev - self.head = head - # Set the kwargs as class attributes - for key, value in kwargs.items(): - setattr(self, key, value) - - def validate(self) -> bool: - raise NotImplementedError("The child class must supply its own implementation!") - - def run(self): - raise NotImplementedError("The child class must supply its own implementation!") - diff --git a/svnpubsub/util.py b/svnpubsub/util.py index 9e05b44..214fdb8 100644 --- a/svnpubsub/util.py +++ b/svnpubsub/util.py @@ -14,53 +14,123 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import io import os import sys import logging -import subprocess as __subprocess +import subprocess +from select import select +from threading import Thread +from shutil import copyfileobj +from subprocess import Popen, PIPE, CalledProcessError # check_output() is only available in Python 2.7. Allow us to run with # earlier versions try: - __check_output = __subprocess.check_output def check_output(args, env=None, universal_newlines=False): - return __check_output(args, shell=False, env=env, - universal_newlines=universal_newlines) + return subprocess.check_output(args, shell=False, env=env, universal_newlines=universal_newlines) except AttributeError: def check_output(args, env=None, universal_newlines=False): # note: we only use these three args - pipe = __subprocess.Popen(args, shell=False, env=env, - stdout=__subprocess.PIPE, - universal_newlines=universal_newlines) + pipe = Popen(args, shell=False, env=env, stdout=PIPE, universal_newlines=universal_newlines) output, _ = pipe.communicate() if pipe.returncode: - raise __subprocess.CalledProcessError(pipe.returncode, args) + raise CalledProcessError(pipe.returncode, args) return output -def execute(*args, text=True): - stdout = [] - stderr = [] +def is_file_like(variable) -> bool: + try: + return hasattr(variable, 'fileno') + except AttributeError: + return False + + +def execute(*args, text=True, env=None, throw=True, stdin=None, stdout=None, stderr=None): + rlist = [] + wlist = [] + xlist = [] process = None arguments = [*args] + chunk_size = 1 * 1024 * 1024 # 1 MB + stdout_buffer = None if stdout is not None else [] if text else bytearray() + stderr_buffer = None if stderr is not None else [] if text else bytearray() + stdin_writer = None + + if stdin is not None and not is_file_like(stdin): + raise ValueError("stdin must be a file-like object") + if stdout is not None and not is_file_like(stdout): + raise ValueError("stdout must be a file-like object") + if stderr is not None and not is_file_like(stderr): + raise ValueError("stderr must be a file-like object") logging.debug("Running: %s", " ".join(arguments)) + def write(source, destination): + """ + Copy the source file-like object to the destination file-like object and close + the destination file-like object. + @param source: A source file-like object typically the source of the stdin + @param destination: A destination file-like object typically the stdin stream of the process + """ + source.seek(0) + copyfileobj(source, destination, chunk_size) + destination.close() + + def read(stream, buffer, file=None) -> bool: + """ + Reads a chunk or a line of the available data and add it to the buffer. + @param stream: The process from which to read the stdout or stderr data + @param buffer: The buffer to write to + @param file: The file-like object to write to instead of the buffer if a direct output is required + @return: True if there's more to read from the stream, False otherwise + """ + eof = True + if file is None: + if text: + line = stream.readline() + if line: + buffer.append(line.rstrip()) + eof = False + else: + chunk = stream.read(chunk_size) + if chunk: + buffer += chunk + eof = False + else: + chunk = stream.read(chunk_size) + if chunk: + file.write(chunk) + eof = False + + return not eof + try: - process = __subprocess.Popen(arguments, text=text, universal_newlines=text, - stdout=__subprocess.PIPE, stderr=__subprocess.PIPE) - if text: - for line in process.stdout.readlines(): - stdout.append(line.rstrip()) - for line in process.stderr.readlines(): - stderr.append(line.rstrip()) - logging.debug(os.linesep.join(stdout)) - if process.returncode: - raise __subprocess.CalledProcessError(process.returncode, process.args, process.stdout, process.stderr) + process = Popen(arguments, text=text, universal_newlines=text, env=env, + stdin=PIPE if stdin is not None else None, stdout=PIPE, stderr=PIPE) + + if stdin is not None and process.poll() is None: + # If stdin was supplied, copy its contents to the stdin stream of the process in a separate thread + stdin_writer = Thread(target=write, args=[stdin, process.stdin]) + stdin_writer.start() + while process.poll() is None: + rlist += [process.stdout.fileno(), process.stderr.fileno()] + for fd in [item for sublist in select(rlist, wlist, xlist) for item in sublist]: + while True: + more = False + if fd == process.stdout.fileno(): + more |= read(process.stdout, stdout_buffer, stdout) + if fd == process.stderr.fileno(): + more |= read(process.stderr, stderr_buffer, stderr) + if not more: + break + if process.returncode and throw: + raise subprocess.CalledProcessError(process.returncode, process.args, process.stdout, process.stderr) except Exception: _, value, traceback = sys.exc_info() - if not text: - stderr.extend(line.decode('utf-8').rstrip() for line in process.stderr.readlines()) - raise RuntimeError(os.linesep.join(stderr)).with_traceback(traceback) - return process, os.linesep.join(stdout) if text else process.stdout.read(), os.linesep.join(stderr) if text else process.stderr.read() + raise RuntimeError(os.linesep.join(stderr_buffer) if text else stderr_buffer.decode('utf-8')).with_traceback(traceback) + if stdin_writer is not None and stdin_writer.is_alive(): + raise ChildProcessError("The child stdin writer process did not terminate successfully") + return (process, + None if stdout is not None else os.linesep.join(stdout_buffer) if text else bytes(stdout_buffer), + None if stderr is not None else os.linesep.join(stderr_buffer) if text else bytes(stderr_buffer))