From bd8907553faec9b320156a52eb6d5ab3649119b2 Mon Sep 17 00:00:00 2001 From: lewis smith Date: Thu, 23 Feb 2017 21:23:30 +0000 Subject: [PATCH 01/14] First attempt at parallel processing support --- PulsarFeatureLab/Src/DataProcessor.py | 232 +++++++++++++------------- 1 file changed, 120 insertions(+), 112 deletions(-) diff --git a/PulsarFeatureLab/Src/DataProcessor.py b/PulsarFeatureLab/Src/DataProcessor.py index 53c0def..813950d 100644 --- a/PulsarFeatureLab/Src/DataProcessor.py +++ b/PulsarFeatureLab/Src/DataProcessor.py @@ -18,10 +18,57 @@ # Standard library Imports: import sys,os,fnmatch,datetime +import multiprocessing + # Custom file Imports: import Utilities, Candidate + +def featureMeta(features): + """ + Appends candidate features to a list held by this object. This stores + each feature in memory, as opposed to writing them out to a file each time. + + Parameters: + + candidate - The name of the candidate the features belong to. + features - A float array of candidate features. + + Return: + N/A + """ + + # Join features into single comma separated line. + allFeatures = str(",".join(map(str, features))) + entry1 = allFeatures + ",%" + candidate + entry2 = entry1.replace("nan","0") # Remove NaNs since these cause error for ML tools like WEKA + entry3 = entry2.replace("inf","0") # Remove infinity values since these cause error for ML tools like WEKA + return entry3 + +# **************************************************************************************************** + +def featureNoMeta(features): + """ + Appends candidate features to a list held by this object. This records + each feature in memory as opposed to writing them out to a file each time. + + Parameters: + + candidate - The name of the candidate the features belong to. + features - A float array of candidate features. + + Return: + N/A + """ + + # Join features into single comma separated line. + allFeatures = str(",".join(map(str, features))) + entry1 = allFeatures + entry2 = entry1.replace("nan","0") # Remove NaNs since these cause error for ML tools like WEKA + entry3 = entry2.replace("inf","0") # Remove infinity values since these cause error for ML tools like WEKA + return entry3 + # **************************************************************************************************** # # CLASS DEFINITION @@ -57,54 +104,28 @@ def __init__(self,debugFlag): self.pfdRegex = "*.pfd" self.featureStore = [] # Variable which stores the features created for a candidate. - # **************************************************************************************************** + - def storeFeatureMeta(self,candidate,features): - """ - Appends candidate features to a list held by this object. This stores - each feature in memory, as opposed to writing them out to a file each time. - - Parameters: - - candidate - The name of the candidate the features belong to. - features - A float array of candidate features. - - Return: - N/A - """ - - # Join features into single comma separated line. - allFeatures = str(",".join(map(str, features))) - entry1 = allFeatures + ",%" + candidate - entry2 = entry1.replace("nan","0") # Remove NaNs since these cause error for ML tools like WEKA - entry3 = entry2.replace("inf","0") # Remove infinity values since these cause error for ML tools like WEKA - self.featureStore.append(entry3) - # **************************************************************************************************** - - def storeFeatureNoMeta(self,candidate,features): + def generate_filenames(self, directory, fileTypeRegexes): """ - Appends candidate features to a list held by this object. This records - each feature in memory as opposed to writing them out to a file each time. - - Parameters: - - candidate - The name of the candidate the features belong to. - features - A float array of candidate features. - - Return: - N/A + A generator that yields the paths of files matching the given regex and thier names, + in the format (filename,full-path-to-file) """ - - # Join features into single comma separated line. - allFeatures = str(",".join(map(str, features))) - entry1 = allFeatures - entry2 = entry1.replace("nan","0") # Remove NaNs since these cause error for ML tools like WEKA - entry3 = entry2.replace("inf","0") # Remove infinity values since these cause error for ML tools like WEKA - self.featureStore.append(entry3) - - # **************************************************************************************************** - + for filetype in fileTypeRegexes: + # Loop through the specified directory + for root, subFolders, filenames in os.walk(directory): + + # If the file type matches one of those this program recognises + for filename in fnmatch.filter(filenames, filetype): + + yield filename,os.path.join(root, filename) # Gets full path to the candidate. + + # If the file does not have the expected suffix (file extension), skip to the next. + if(cand.endswith(filetype.replace("*",""))==False): + continue + + def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff): """ Processes pulsar candidates of the type specified by 'candidate_type'. @@ -149,9 +170,9 @@ def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff) """ # Used to monitor feature creation statistics. - candidatesProcessed = 0; - successes = 0; - failures = 0; + candidatesProcessed = 0 + successes = 0 + failures = 0 print "\n\t*************************" print "\t| Searching Recursively |" @@ -178,74 +199,61 @@ def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff) start = datetime.datetime.now() # Used to measure feature generation time. - # For each type of file this program recognises - for filetype in fileTypeRegexes: + #spawn a pool of workers to process the individual files + + worker_pool = multiprocessing.Pool(multiprocessing.cpu_count()) #try to utilize all avaliable cores + + def worker((name,path)): + """ + This function processes each file, returning the features as a list. It closes over + various neccesary environment variables, preventing them being needed to be passed in + explicitly every time + """ + try: + c = Candidate.Candidate(name,str(path+cand)) + features = c.getFeatures(feature_type, candidate_type, verbose) + if (arff and feature_type > 0 and feature_type < 7): + features.append('?') + if meta: + return featureMeta(features) + else: + return featureNoMeta(features) + except: + """catch all exceptions, printing to stdout""" + print "\tError reading candidate data :\n\t", sys.exc_info()[0] + #print self.format_exception(e) this function doesn't seem to exist? + print "\t",cand, " did not have features generated." + return None + + #dispatch the worker process to the worker pool, feeding in the filenames generated from the generator + #used unordered_map because we don't care what order the candidates are processed in + for feature in worker_pool.imap_unordered(worker, self.generate_filenames(directory, fileTypeRegexes) ): + if feature is not None: + self.featureStore.append(feature) + successes += 1 + else: + #worker wrote to stdout already + failures += 1 + + candidatesProcessed+=1 + if(candidatesProcessed%10000==0):# Every 10,000 candidates - # Loop through the specified directory - for root, subFolders, filenames in os.walk(directory): + # This 'if' statement is used to provide useful feedback on feature + # generation. But it is also used to write the features collected so far, + # to the output file at set intervals. This helps a) reduce memory load, and + # b) reduce disc load (by writing out lots of features in one go, as opposed + # to one by one). - # If the file type matches one of those this program recognises - for filename in fnmatch.filter(filenames, filetype): - - cand = os.path.join(root, filename) # Gets full path to the candidate. - - # If the file does not have the expected suffix (file extension), skip to the next. - if(cand.endswith(filetype.replace("*",""))==False): - continue - - candidatesProcessed+=1 - - if(candidatesProcessed%10000==0):# Every 10,000 candidates - - # This 'if' statement is used to provide useful feedback on feature - # generation. But it is also used to write the features collected so far, - # to the output file at set intervals. This helps a) reduce memory load, and - # b) reduce disc load (by writing out lots of features in one go, as opposed - # to one by one). - - print "\tCandidates processed: ", candidatesProcessed + print "\tCandidates processed: ", candidatesProcessed + # Write out the features collected so far. + outputText="" + for s in self.featureStore: + outputText+=s+"\n" - # Write out the features collected so far. - outputText="" - for s in self.featureStore: - outputText+=s+"\n" - - self.appendToFile(output, outputText) # Write all 10,000 entries to the output file. - self.featureStore = [] # Clear the feature store, freeing up memory. - - try: - - # Create the candidate object. - c = Candidate.Candidate(cand,str(directory+cand)) - - # Get the features from the candidate. - features = c.getFeatures(feature_type,candidate_type,verbose) - - # If the user would like the output to be in ARFF format, then each candidate - # has to be associated with a label. Since this code cannot know the true label - # of a candidate, here the unknown label '?' is appended as a additional feature. - if(arff and feature_type > 0 and feature_type < 7): - features.append("?") - - # Store the features so it can later be written to the specified output file. - if(meta): - # Store with meta information - basically this means including the candidate - # name (full path) with each feature set. This means that - # each set of features will be linked to a candidate, - # useful for certain investigations (i.e. why a specific - # candidate achieved particular feature values). - self.storeFeatureMeta(cand, features) - else: - self.storeFeatureNoMeta(cand, features) # Store only the feature data. - - except Exception as e: # Catch *all* exceptions. - print "\tError reading candidate data :\n\t", sys.exc_info()[0] - print self.format_exception(e) - print "\t",cand, " did not have features generated." - failures+=1 - continue - - successes+=1 + self.appendToFile(output, outputText) # Write all 10,000 entries to the output file. + self.featureStore = [] # Clear the feature store, freeing up memory. + + # Save any remaining features, since its possible that some features # were not written to the output file in the loop above. From bcdc45c8449bbedb9b3998993e308294200f3abf Mon Sep 17 00:00:00 2001 From: lewis smith Date: Thu, 23 Feb 2017 21:28:04 +0000 Subject: [PATCH 02/14] bug fix --- PulsarFeatureLab/Src/DataProcessor.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/PulsarFeatureLab/Src/DataProcessor.py b/PulsarFeatureLab/Src/DataProcessor.py index 813950d..d123d38 100644 --- a/PulsarFeatureLab/Src/DataProcessor.py +++ b/PulsarFeatureLab/Src/DataProcessor.py @@ -200,7 +200,7 @@ def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff) start = datetime.datetime.now() # Used to measure feature generation time. #spawn a pool of workers to process the individual files - + worker_pool = multiprocessing.Pool(multiprocessing.cpu_count()) #try to utilize all avaliable cores def worker((name,path)): @@ -227,7 +227,8 @@ def worker((name,path)): #dispatch the worker process to the worker pool, feeding in the filenames generated from the generator #used unordered_map because we don't care what order the candidates are processed in - for feature in worker_pool.imap_unordered(worker, self.generate_filenames(directory, fileTypeRegexes) ): + filename_gen = self.generate_filenames(directory,fileTypeRegexes) + for feature in worker_pool.imap_unordered(worker, filename_gen): if feature is not None: self.featureStore.append(feature) successes += 1 From fc312794c375f5f7f4e8f7795aaf096e02231370 Mon Sep 17 00:00:00 2001 From: lewis smith Date: Thu, 23 Feb 2017 21:31:51 +0000 Subject: [PATCH 03/14] moved function out of class to deal with pickling issues --- PulsarFeatureLab/Src/DataProcessor.py | 37 ++++++++++++++------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/PulsarFeatureLab/Src/DataProcessor.py b/PulsarFeatureLab/Src/DataProcessor.py index d123d38..0f7e8cc 100644 --- a/PulsarFeatureLab/Src/DataProcessor.py +++ b/PulsarFeatureLab/Src/DataProcessor.py @@ -69,6 +69,24 @@ def featureNoMeta(features): entry3 = entry2.replace("inf","0") # Remove infinity values since these cause error for ML tools like WEKA return entry3 +def generate_filenames(self, directory, fileTypeRegexes): + """ + A generator that yields the paths of files matching the given regex and thier names, + in the format (filename,full-path-to-file) + """ + for filetype in fileTypeRegexes: + # Loop through the specified directory + for root, subFolders, filenames in os.walk(directory): + + # If the file type matches one of those this program recognises + for filename in fnmatch.filter(filenames, filetype): + + yield filename,os.path.join(root, filename) # Gets full path to the candidate. + + # If the file does not have the expected suffix (file extension), skip to the next. + if(cand.endswith(filetype.replace("*",""))==False): + continue + # **************************************************************************************************** # # CLASS DEFINITION @@ -107,23 +125,6 @@ def __init__(self,debugFlag): # **************************************************************************************************** - def generate_filenames(self, directory, fileTypeRegexes): - """ - A generator that yields the paths of files matching the given regex and thier names, - in the format (filename,full-path-to-file) - """ - for filetype in fileTypeRegexes: - # Loop through the specified directory - for root, subFolders, filenames in os.walk(directory): - - # If the file type matches one of those this program recognises - for filename in fnmatch.filter(filenames, filetype): - - yield filename,os.path.join(root, filename) # Gets full path to the candidate. - - # If the file does not have the expected suffix (file extension), skip to the next. - if(cand.endswith(filetype.replace("*",""))==False): - continue def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff): @@ -227,7 +228,7 @@ def worker((name,path)): #dispatch the worker process to the worker pool, feeding in the filenames generated from the generator #used unordered_map because we don't care what order the candidates are processed in - filename_gen = self.generate_filenames(directory,fileTypeRegexes) + filename_gen = generate_filenames(directory,fileTypeRegexes) for feature in worker_pool.imap_unordered(worker, filename_gen): if feature is not None: self.featureStore.append(feature) From 98df140ce901b66dda88d74524a7d03d4dbfef9d Mon Sep 17 00:00:00 2001 From: lewis smith Date: Thu, 23 Feb 2017 21:32:03 +0000 Subject: [PATCH 04/14] moved function out of class to deal with pickling issues --- PulsarFeatureLab/Src/DataProcessor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/PulsarFeatureLab/Src/DataProcessor.py b/PulsarFeatureLab/Src/DataProcessor.py index 0f7e8cc..8aee8df 100644 --- a/PulsarFeatureLab/Src/DataProcessor.py +++ b/PulsarFeatureLab/Src/DataProcessor.py @@ -69,7 +69,7 @@ def featureNoMeta(features): entry3 = entry2.replace("inf","0") # Remove infinity values since these cause error for ML tools like WEKA return entry3 -def generate_filenames(self, directory, fileTypeRegexes): +def generate_filenames(directory, fileTypeRegexes): """ A generator that yields the paths of files matching the given regex and thier names, in the format (filename,full-path-to-file) From 7c1475baae70e1b6c62879f38cb0aa78ab084c77 Mon Sep 17 00:00:00 2001 From: lewis smith Date: Thu, 23 Feb 2017 21:50:21 +0000 Subject: [PATCH 05/14] moved functions outside class in attempt to fix pickling bug --- PulsarFeatureLab/Src/DataProcessor.py | 85 ++++++++++++++------------- 1 file changed, 45 insertions(+), 40 deletions(-) diff --git a/PulsarFeatureLab/Src/DataProcessor.py b/PulsarFeatureLab/Src/DataProcessor.py index 8aee8df..8a066f1 100644 --- a/PulsarFeatureLab/Src/DataProcessor.py +++ b/PulsarFeatureLab/Src/DataProcessor.py @@ -19,6 +19,7 @@ import sys,os,fnmatch,datetime import multiprocessing +import itertools # Custom file Imports: import Utilities, Candidate @@ -69,23 +70,29 @@ def featureNoMeta(features): entry3 = entry2.replace("inf","0") # Remove infinity values since these cause error for ML tools like WEKA return entry3 -def generate_filenames(directory, fileTypeRegexes): - """ - A generator that yields the paths of files matching the given regex and thier names, - in the format (filename,full-path-to-file) - """ - for filetype in fileTypeRegexes: - # Loop through the specified directory - for root, subFolders, filenames in os.walk(directory): - - # If the file type matches one of those this program recognises - for filename in fnmatch.filter(filenames, filetype): - - yield filename,os.path.join(root, filename) # Gets full path to the candidate. - - # If the file does not have the expected suffix (file extension), skip to the next. - if(cand.endswith(filetype.replace("*",""))==False): - continue +def worker( (name,path,feature_type,candidate_type,verbose,meta,arff) ): + """ + Creates a candidate object to perform the processing. It's execution is controlled + by the generate_orders function of the DataProcessor class. It is defined outside the + class to enable multiprocessing, since the default implementation is unable to pickle + classes and closure-type functions, neccesitating passing in a large number of options + to each worker + """ + try: + c = Candidate.Candidate(name,str(path+cand)) + features = c.getFeatures(feature_type, candidate_type, verbose) + if (arff and feature_type > 0 and feature_type < 7): + features.append('?') + if meta: + return featureMeta(features) + else: + return featureNoMeta(features) + except: + """catch all exceptions, printing to stdout""" + print "\tError reading candidate data :\n\t", sys.exc_info()[0] + #print self.format_exception(e) this function doesn't seem to exist? + print "\t",cand, " did not have features generated." + return None # **************************************************************************************************** # @@ -126,6 +133,24 @@ def __init__(self,debugFlag): # **************************************************************************************************** + def generate_orders(self, directory, fileTypeRegexes, feature_type,candidate_type,verbose,meta,arff): + """ + A generator that yields the paths of files matching the given regex and thier names, + in the format (filename,full-path-to-file) + """ + for filetype in fileTypeRegexes: + # Loop through the specified directory + for root, subFolders, filenames in os.walk(directory): + + # If the file type matches one of those this program recognises + for filename in fnmatch.filter(filenames, filetype): + + #yield all the arguments that need to be passed into the worker + yield filename,os.path.join(root, filename), feature_type, candidate_type, verbose, meta, arff + + # If the file does not have the expected suffix (file extension), skip to the next. + if(cand.endswith(filetype.replace("*",""))==False): + continue def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff): """ @@ -204,32 +229,12 @@ def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff) worker_pool = multiprocessing.Pool(multiprocessing.cpu_count()) #try to utilize all avaliable cores - def worker((name,path)): - """ - This function processes each file, returning the features as a list. It closes over - various neccesary environment variables, preventing them being needed to be passed in - explicitly every time - """ - try: - c = Candidate.Candidate(name,str(path+cand)) - features = c.getFeatures(feature_type, candidate_type, verbose) - if (arff and feature_type > 0 and feature_type < 7): - features.append('?') - if meta: - return featureMeta(features) - else: - return featureNoMeta(features) - except: - """catch all exceptions, printing to stdout""" - print "\tError reading candidate data :\n\t", sys.exc_info()[0] - #print self.format_exception(e) this function doesn't seem to exist? - print "\t",cand, " did not have features generated." - return None + #dispatch the worker process to the worker pool, feeding in the filenames generated from the generator #used unordered_map because we don't care what order the candidates are processed in - filename_gen = generate_filenames(directory,fileTypeRegexes) - for feature in worker_pool.imap_unordered(worker, filename_gen): + orders = self.generate_orders(directory,fileTypeRegexes, meta, feature_type, candidate_type, verbose) + for feature in worker_pool.imap_unordered(worker, orders): if feature is not None: self.featureStore.append(feature) successes += 1 From b9f15dd63e7bffb83a15ff6737cbca3dec0b5ae8 Mon Sep 17 00:00:00 2001 From: lewis smith Date: Thu, 23 Feb 2017 21:53:04 +0000 Subject: [PATCH 06/14] fixed a typo --- PulsarFeatureLab/Src/DataProcessor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/PulsarFeatureLab/Src/DataProcessor.py b/PulsarFeatureLab/Src/DataProcessor.py index 8a066f1..a797b07 100644 --- a/PulsarFeatureLab/Src/DataProcessor.py +++ b/PulsarFeatureLab/Src/DataProcessor.py @@ -233,7 +233,7 @@ def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff) #dispatch the worker process to the worker pool, feeding in the filenames generated from the generator #used unordered_map because we don't care what order the candidates are processed in - orders = self.generate_orders(directory,fileTypeRegexes, meta, feature_type, candidate_type, verbose) + orders = self.generate_orders(directory,fileTypeRegexes, feature_type,candidate_type,verbose,meta,arff) for feature in worker_pool.imap_unordered(worker, orders): if feature is not None: self.featureStore.append(feature) From a85cb0ff10644f3012c77dae915cc2ba893b1e91 Mon Sep 17 00:00:00 2001 From: lewis smith Date: Thu, 23 Feb 2017 21:56:03 +0000 Subject: [PATCH 07/14] fixed a typo --- PulsarFeatureLab/Src/DataProcessor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/PulsarFeatureLab/Src/DataProcessor.py b/PulsarFeatureLab/Src/DataProcessor.py index a797b07..280ba32 100644 --- a/PulsarFeatureLab/Src/DataProcessor.py +++ b/PulsarFeatureLab/Src/DataProcessor.py @@ -144,7 +144,7 @@ def generate_orders(self, directory, fileTypeRegexes, feature_type,candidate_typ # If the file type matches one of those this program recognises for filename in fnmatch.filter(filenames, filetype): - + cand = os.path.join(root, filename) #yield all the arguments that need to be passed into the worker yield filename,os.path.join(root, filename), feature_type, candidate_type, verbose, meta, arff From df2e48168b547f9f449ec5801d7a14d2cec6c582 Mon Sep 17 00:00:00 2001 From: lewis smith Date: Thu, 23 Feb 2017 22:44:51 +0000 Subject: [PATCH 08/14] added changes --- PulsarFeatureLab/Src/DataProcessor.py | 29 ++++++++++----------------- 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/PulsarFeatureLab/Src/DataProcessor.py b/PulsarFeatureLab/Src/DataProcessor.py index 280ba32..8d55a86 100644 --- a/PulsarFeatureLab/Src/DataProcessor.py +++ b/PulsarFeatureLab/Src/DataProcessor.py @@ -25,19 +25,12 @@ import Utilities, Candidate +#Helper functions defined outside the class in order to allow for mutliprocessing: +#multiprocessing pickles objects, and this doesn't work well with classes. -def featureMeta(features): +def featureMeta(candidate,features): """ - Appends candidate features to a list held by this object. This stores - each feature in memory, as opposed to writing them out to a file each time. - - Parameters: - - candidate - The name of the candidate the features belong to. - features - A float array of candidate features. - - Return: - N/A + Take a feature and """ # Join features into single comma separated line. @@ -49,7 +42,7 @@ def featureMeta(features): # **************************************************************************************************** -def featureNoMeta(features): +def featureNoMeta(candidate,features): """ Appends candidate features to a list held by this object. This records each feature in memory as opposed to writing them out to a file each time. @@ -79,19 +72,19 @@ class to enable multiprocessing, since the default implementation is unable to p to each worker """ try: - c = Candidate.Candidate(name,str(path+cand)) + c = Candidate.Candidate(name,path) features = c.getFeatures(feature_type, candidate_type, verbose) if (arff and feature_type > 0 and feature_type < 7): features.append('?') if meta: - return featureMeta(features) + return featureMeta(path,features) else: - return featureNoMeta(features) + return featureNoMeta(path,features) except: """catch all exceptions, printing to stdout""" print "\tError reading candidate data :\n\t", sys.exc_info()[0] #print self.format_exception(e) this function doesn't seem to exist? - print "\t",cand, " did not have features generated." + print "\t",name, " did not have features generated." return None # **************************************************************************************************** @@ -146,7 +139,7 @@ def generate_orders(self, directory, fileTypeRegexes, feature_type,candidate_typ for filename in fnmatch.filter(filenames, filetype): cand = os.path.join(root, filename) #yield all the arguments that need to be passed into the worker - yield filename,os.path.join(root, filename), feature_type, candidate_type, verbose, meta, arff + yield cand,os.path.join(directory,cand), feature_type, candidate_type, verbose, meta, arff # If the file does not have the expected suffix (file extension), skip to the next. if(cand.endswith(filetype.replace("*",""))==False): @@ -290,4 +283,4 @@ def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff) print "\tFailures:\t", failures print "\tExecution time: ", str(end - start) - # **************************************************************************************************** \ No newline at end of file + # **************************************************************************************************** From d7e7e16ffa73bbd1aea61a61462c63868a651efb Mon Sep 17 00:00:00 2001 From: lewis smith Date: Thu, 23 Feb 2017 22:56:49 +0000 Subject: [PATCH 09/14] actually figured out how to apply a patch --- PulsarFeatureLab/Src/Candidate.py | 2 +- PulsarFeatureLab/Src/DataProcessor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/PulsarFeatureLab/Src/Candidate.py b/PulsarFeatureLab/Src/Candidate.py index d2d3b28..4deeb0b 100644 --- a/PulsarFeatureLab/Src/Candidate.py +++ b/PulsarFeatureLab/Src/Candidate.py @@ -186,4 +186,4 @@ def __str__(self): return self.candidateName + "," + self.candidatePath - # **************************************************************************************************** \ No newline at end of file + # **************************************************************************************************** diff --git a/PulsarFeatureLab/Src/DataProcessor.py b/PulsarFeatureLab/Src/DataProcessor.py index 8d55a86..944d6f0 100644 --- a/PulsarFeatureLab/Src/DataProcessor.py +++ b/PulsarFeatureLab/Src/DataProcessor.py @@ -26,7 +26,7 @@ #Helper functions defined outside the class in order to allow for mutliprocessing: -#multiprocessing pickles objects, and this doesn't work well with classes. +#multiprocessing pickles objects, and this doesn't work well with classes. def featureMeta(candidate,features): """ From 020dab02d301be9472a4509dd8d7195c9558455c Mon Sep 17 00:00:00 2001 From: lewis smith Date: Fri, 24 Feb 2017 10:23:15 +0000 Subject: [PATCH 10/14] Allowed nice exception reporting again --- PulsarFeatureLab/Src/DataProcessor.py | 34 +++++++++++++++------------ 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/PulsarFeatureLab/Src/DataProcessor.py b/PulsarFeatureLab/Src/DataProcessor.py index 944d6f0..dbda862 100644 --- a/PulsarFeatureLab/Src/DataProcessor.py +++ b/PulsarFeatureLab/Src/DataProcessor.py @@ -14,7 +14,6 @@ ************************************************************************** """ - # Standard library Imports: import sys,os,fnmatch,datetime @@ -70,6 +69,9 @@ def worker( (name,path,feature_type,candidate_type,verbose,meta,arff) ): class to enable multiprocessing, since the default implementation is unable to pickle classes and closure-type functions, neccesitating passing in a large number of options to each worker + If succesful, returns the feature and None,None. If the worker hits an exception, + return the exception information and the name of the candidate to the main thread + so it can be printed nicely using the Utility class """ try: c = Candidate.Candidate(name,path) @@ -77,15 +79,13 @@ class to enable multiprocessing, since the default implementation is unable to p if (arff and feature_type > 0 and feature_type < 7): features.append('?') if meta: - return featureMeta(path,features) + return featureMeta(path,features),None, name else: - return featureNoMeta(path,features) + return featureNoMeta(path,features),None, name except: """catch all exceptions, printing to stdout""" - print "\tError reading candidate data :\n\t", sys.exc_info()[0] - #print self.format_exception(e) this function doesn't seem to exist? - print "\t",name, " did not have features generated." - return None + + return None,e, name # **************************************************************************************************** # @@ -128,8 +128,8 @@ def __init__(self,debugFlag): def generate_orders(self, directory, fileTypeRegexes, feature_type,candidate_type,verbose,meta,arff): """ - A generator that yields the paths of files matching the given regex and thier names, - in the format (filename,full-path-to-file) + Yields the parameters to be dispatched to the subprocess workers: the filename generated from + walking the directory, and the behaviour parameters (feature_type, candidate_type, verbose, meta, arff) """ for filetype in fileTypeRegexes: # Loop through the specified directory @@ -222,17 +222,21 @@ def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff) worker_pool = multiprocessing.Pool(multiprocessing.cpu_count()) #try to utilize all avaliable cores - - - #dispatch the worker process to the worker pool, feeding in the filenames generated from the generator - #used unordered_map because we don't care what order the candidates are processed in + #created a generator that feeds the filenames of the candidates to process and the behaviour options to + #the processor workers orders = self.generate_orders(directory,fileTypeRegexes, feature_type,candidate_type,verbose,meta,arff) - for feature in worker_pool.imap_unordered(worker, orders): + + #dispatch the processes to the worker pool. Use imap_unordered because it doesn't matter what order + #we process the files in + for feature,exception,name in worker_pool.imap_unordered(worker, orders): if feature is not None: self.featureStore.append(feature) successes += 1 else: - #worker wrote to stdout already + #process hit an exception: don't throw, but display its information + print "\tError reading candidate data :\n\t", sys.exc_info()[0] + print self.format_exception(exception) + print "\t",name, " did not have features generated." failures += 1 candidatesProcessed+=1 From 59e0de0cdf61f72aef47f6d7a8a049c7e5fa316b Mon Sep 17 00:00:00 2001 From: lewis smith Date: Fri, 24 Feb 2017 10:47:57 +0000 Subject: [PATCH 11/14] Added patches to fix problems found in testing --- PulsarFeatureLab/Src/DataProcessor.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/PulsarFeatureLab/Src/DataProcessor.py b/PulsarFeatureLab/Src/DataProcessor.py index dbda862..8847a1a 100644 --- a/PulsarFeatureLab/Src/DataProcessor.py +++ b/PulsarFeatureLab/Src/DataProcessor.py @@ -82,7 +82,7 @@ class to enable multiprocessing, since the default implementation is unable to p return featureMeta(path,features),None, name else: return featureNoMeta(path,features),None, name - except: + except Exception as e: """catch all exceptions, printing to stdout""" return None,e, name @@ -228,14 +228,16 @@ def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff) #dispatch the processes to the worker pool. Use imap_unordered because it doesn't matter what order #we process the files in - for feature,exception,name in worker_pool.imap_unordered(worker, orders): + for feature,ex,name in worker_pool.imap_unordered(worker, orders): if feature is not None: self.featureStore.append(feature) successes += 1 else: - #process hit an exception: don't throw, but display its information - print "\tError reading candidate data :\n\t", sys.exc_info()[0] - print self.format_exception(exception) + #worker hit an exception: display the kind of exception and the file name. + #Unfortunatly, multiprocessing makes it harder to give a better traceback + print "\tError reading candidate data :\n\t" + print "\tEncountered exception: " + print "\t",ex print "\t",name, " did not have features generated." failures += 1 From a8b51b0e9c0b80e65265a6866a0fda6eaf298798 Mon Sep 17 00:00:00 2001 From: lewis smith Date: Fri, 24 Feb 2017 10:54:26 +0000 Subject: [PATCH 12/14] fixed a whitespace error --- PulsarFeatureLab/Src/DataProcessor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/PulsarFeatureLab/Src/DataProcessor.py b/PulsarFeatureLab/Src/DataProcessor.py index 8847a1a..b213ce1 100644 --- a/PulsarFeatureLab/Src/DataProcessor.py +++ b/PulsarFeatureLab/Src/DataProcessor.py @@ -236,8 +236,8 @@ def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff) #worker hit an exception: display the kind of exception and the file name. #Unfortunatly, multiprocessing makes it harder to give a better traceback print "\tError reading candidate data :\n\t" - print "\tEncountered exception: " - print "\t",ex + print "\tEncountered exception: " + print "\t",ex print "\t",name, " did not have features generated." failures += 1 From 3f5631d18fa62e2479bcc7d023c043f2c87857f0 Mon Sep 17 00:00:00 2001 From: lewis smith Date: Fri, 24 Feb 2017 11:00:10 +0000 Subject: [PATCH 13/14] updated some comments --- PulsarFeatureLab/Src/DataProcessor.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/PulsarFeatureLab/Src/DataProcessor.py b/PulsarFeatureLab/Src/DataProcessor.py index b213ce1..7d8627f 100644 --- a/PulsarFeatureLab/Src/DataProcessor.py +++ b/PulsarFeatureLab/Src/DataProcessor.py @@ -64,14 +64,11 @@ def featureNoMeta(candidate,features): def worker( (name,path,feature_type,candidate_type,verbose,meta,arff) ): """ - Creates a candidate object to perform the processing. It's execution is controlled - by the generate_orders function of the DataProcessor class. It is defined outside the - class to enable multiprocessing, since the default implementation is unable to pickle - classes and closure-type functions, neccesitating passing in a large number of options - to each worker - If succesful, returns the feature and None,None. If the worker hits an exception, - return the exception information and the name of the candidate to the main thread - so it can be printed nicely using the Utility class + Creates a candidate object to perform the processing. If succesful, returns the feature + and None,None. If the worker hits an exception,return the exception information and + the name of the candidate to the main thread so it can be printed. + + This needs to be defined outside the class so it can be done in parallel """ try: c = Candidate.Candidate(name,path) From 64e60f66e0b8bd97507ed3ecca1d0d85647184aa Mon Sep 17 00:00:00 2001 From: lewis smith Date: Fri, 24 Feb 2017 11:04:25 +0000 Subject: [PATCH 14/14] finished updating comments --- PulsarFeatureLab/Src/DataProcessor.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/PulsarFeatureLab/Src/DataProcessor.py b/PulsarFeatureLab/Src/DataProcessor.py index 7d8627f..2ce93ad 100644 --- a/PulsarFeatureLab/Src/DataProcessor.py +++ b/PulsarFeatureLab/Src/DataProcessor.py @@ -29,9 +29,16 @@ def featureMeta(candidate,features): """ - Take a feature and - """ + Returns a string of features with the metadata attached. Strips bad data fields from the list + + Parameters: + + candidate - The name of the candidate the features belong to. + features - A float array of candidate features. + Return: + modified version of the list + """ # Join features into single comma separated line. allFeatures = str(",".join(map(str, features))) entry1 = allFeatures + ",%" + candidate @@ -43,8 +50,7 @@ def featureMeta(candidate,features): def featureNoMeta(candidate,features): """ - Appends candidate features to a list held by this object. This records - each feature in memory as opposed to writing them out to a file each time. + Returns a string of features without the metadata attached. Strips bad data fields from the list Parameters: @@ -52,7 +58,7 @@ def featureNoMeta(candidate,features): features - A float array of candidate features. Return: - N/A + modified version of the list """ # Join features into single comma separated line. @@ -80,8 +86,7 @@ def worker( (name,path,feature_type,candidate_type,verbose,meta,arff) ): else: return featureNoMeta(path,features),None, name except Exception as e: - """catch all exceptions, printing to stdout""" - + #return exception information to the main thread return None,e, name # **************************************************************************************************** @@ -219,7 +224,7 @@ def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff) worker_pool = multiprocessing.Pool(multiprocessing.cpu_count()) #try to utilize all avaliable cores - #created a generator that feeds the filenames of the candidates to process and the behaviour options to + #create a generator that feeds the filenames of the candidates to process and the behaviour options to #the processor workers orders = self.generate_orders(directory,fileTypeRegexes, feature_type,candidate_type,verbose,meta,arff)