diff --git a/PulsarFeatureLab/Src/Candidate.py b/PulsarFeatureLab/Src/Candidate.py index d2d3b28..4deeb0b 100644 --- a/PulsarFeatureLab/Src/Candidate.py +++ b/PulsarFeatureLab/Src/Candidate.py @@ -186,4 +186,4 @@ def __str__(self): return self.candidateName + "," + self.candidatePath - # **************************************************************************************************** \ No newline at end of file + # **************************************************************************************************** diff --git a/PulsarFeatureLab/Src/DataProcessor.py b/PulsarFeatureLab/Src/DataProcessor.py index 53c0def..2ce93ad 100644 --- a/PulsarFeatureLab/Src/DataProcessor.py +++ b/PulsarFeatureLab/Src/DataProcessor.py @@ -14,14 +14,81 @@ ************************************************************************** """ - # Standard library Imports: import sys,os,fnmatch,datetime +import multiprocessing +import itertools + # Custom file Imports: import Utilities, Candidate +#Helper functions defined outside the class in order to allow for mutliprocessing: +#multiprocessing pickles objects, and this doesn't work well with classes. + +def featureMeta(candidate,features): + """ + Returns a string of features with the metadata attached. Strips bad data fields from the list + + Parameters: + + candidate - The name of the candidate the features belong to. + features - A float array of candidate features. + + Return: + modified version of the list + """ + # Join features into single comma separated line. + allFeatures = str(",".join(map(str, features))) + entry1 = allFeatures + ",%" + candidate + entry2 = entry1.replace("nan","0") # Remove NaNs since these cause error for ML tools like WEKA + entry3 = entry2.replace("inf","0") # Remove infinity values since these cause error for ML tools like WEKA + return entry3 + +# **************************************************************************************************** + +def featureNoMeta(candidate,features): + """ + Returns a string of features without the metadata attached. Strips bad data fields from the list + + Parameters: + + candidate - The name of the candidate the features belong to. + features - A float array of candidate features. + + Return: + modified version of the list + """ + + # Join features into single comma separated line. + allFeatures = str(",".join(map(str, features))) + entry1 = allFeatures + entry2 = entry1.replace("nan","0") # Remove NaNs since these cause error for ML tools like WEKA + entry3 = entry2.replace("inf","0") # Remove infinity values since these cause error for ML tools like WEKA + return entry3 + +def worker( (name,path,feature_type,candidate_type,verbose,meta,arff) ): + """ + Creates a candidate object to perform the processing. If succesful, returns the feature + and None,None. If the worker hits an exception,return the exception information and + the name of the candidate to the main thread so it can be printed. + + This needs to be defined outside the class so it can be done in parallel + """ + try: + c = Candidate.Candidate(name,path) + features = c.getFeatures(feature_type, candidate_type, verbose) + if (arff and feature_type > 0 and feature_type < 7): + features.append('?') + if meta: + return featureMeta(path,features),None, name + else: + return featureNoMeta(path,features),None, name + except Exception as e: + #return exception information to the main thread + return None,e, name + # **************************************************************************************************** # # CLASS DEFINITION @@ -57,54 +124,29 @@ def __init__(self,debugFlag): self.pfdRegex = "*.pfd" self.featureStore = [] # Variable which stores the features created for a candidate. - # **************************************************************************************************** + - def storeFeatureMeta(self,candidate,features): - """ - Appends candidate features to a list held by this object. This stores - each feature in memory, as opposed to writing them out to a file each time. - - Parameters: - - candidate - The name of the candidate the features belong to. - features - A float array of candidate features. - - Return: - N/A - """ - - # Join features into single comma separated line. - allFeatures = str(",".join(map(str, features))) - entry1 = allFeatures + ",%" + candidate - entry2 = entry1.replace("nan","0") # Remove NaNs since these cause error for ML tools like WEKA - entry3 = entry2.replace("inf","0") # Remove infinity values since these cause error for ML tools like WEKA - self.featureStore.append(entry3) - # **************************************************************************************************** - - def storeFeatureNoMeta(self,candidate,features): + + def generate_orders(self, directory, fileTypeRegexes, feature_type,candidate_type,verbose,meta,arff): """ - Appends candidate features to a list held by this object. This records - each feature in memory as opposed to writing them out to a file each time. - - Parameters: - - candidate - The name of the candidate the features belong to. - features - A float array of candidate features. - - Return: - N/A + Yields the parameters to be dispatched to the subprocess workers: the filename generated from + walking the directory, and the behaviour parameters (feature_type, candidate_type, verbose, meta, arff) """ - - # Join features into single comma separated line. - allFeatures = str(",".join(map(str, features))) - entry1 = allFeatures - entry2 = entry1.replace("nan","0") # Remove NaNs since these cause error for ML tools like WEKA - entry3 = entry2.replace("inf","0") # Remove infinity values since these cause error for ML tools like WEKA - self.featureStore.append(entry3) - - # **************************************************************************************************** - + for filetype in fileTypeRegexes: + # Loop through the specified directory + for root, subFolders, filenames in os.walk(directory): + + # If the file type matches one of those this program recognises + for filename in fnmatch.filter(filenames, filetype): + cand = os.path.join(root, filename) + #yield all the arguments that need to be passed into the worker + yield cand,os.path.join(directory,cand), feature_type, candidate_type, verbose, meta, arff + + # If the file does not have the expected suffix (file extension), skip to the next. + if(cand.endswith(filetype.replace("*",""))==False): + continue + def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff): """ Processes pulsar candidates of the type specified by 'candidate_type'. @@ -149,9 +191,9 @@ def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff) """ # Used to monitor feature creation statistics. - candidatesProcessed = 0; - successes = 0; - failures = 0; + candidatesProcessed = 0 + successes = 0 + failures = 0 print "\n\t*************************" print "\t| Searching Recursively |" @@ -178,74 +220,48 @@ def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff) start = datetime.datetime.now() # Used to measure feature generation time. - # For each type of file this program recognises - for filetype in fileTypeRegexes: + #spawn a pool of workers to process the individual files + + worker_pool = multiprocessing.Pool(multiprocessing.cpu_count()) #try to utilize all avaliable cores + + #create a generator that feeds the filenames of the candidates to process and the behaviour options to + #the processor workers + orders = self.generate_orders(directory,fileTypeRegexes, feature_type,candidate_type,verbose,meta,arff) + + #dispatch the processes to the worker pool. Use imap_unordered because it doesn't matter what order + #we process the files in + for feature,ex,name in worker_pool.imap_unordered(worker, orders): + if feature is not None: + self.featureStore.append(feature) + successes += 1 + else: + #worker hit an exception: display the kind of exception and the file name. + #Unfortunatly, multiprocessing makes it harder to give a better traceback + print "\tError reading candidate data :\n\t" + print "\tEncountered exception: " + print "\t",ex + print "\t",name, " did not have features generated." + failures += 1 + + candidatesProcessed+=1 + if(candidatesProcessed%10000==0):# Every 10,000 candidates - # Loop through the specified directory - for root, subFolders, filenames in os.walk(directory): + # This 'if' statement is used to provide useful feedback on feature + # generation. But it is also used to write the features collected so far, + # to the output file at set intervals. This helps a) reduce memory load, and + # b) reduce disc load (by writing out lots of features in one go, as opposed + # to one by one). - # If the file type matches one of those this program recognises - for filename in fnmatch.filter(filenames, filetype): - - cand = os.path.join(root, filename) # Gets full path to the candidate. - - # If the file does not have the expected suffix (file extension), skip to the next. - if(cand.endswith(filetype.replace("*",""))==False): - continue - - candidatesProcessed+=1 - - if(candidatesProcessed%10000==0):# Every 10,000 candidates - - # This 'if' statement is used to provide useful feedback on feature - # generation. But it is also used to write the features collected so far, - # to the output file at set intervals. This helps a) reduce memory load, and - # b) reduce disc load (by writing out lots of features in one go, as opposed - # to one by one). - - print "\tCandidates processed: ", candidatesProcessed + print "\tCandidates processed: ", candidatesProcessed + # Write out the features collected so far. + outputText="" + for s in self.featureStore: + outputText+=s+"\n" - # Write out the features collected so far. - outputText="" - for s in self.featureStore: - outputText+=s+"\n" - - self.appendToFile(output, outputText) # Write all 10,000 entries to the output file. - self.featureStore = [] # Clear the feature store, freeing up memory. - - try: - - # Create the candidate object. - c = Candidate.Candidate(cand,str(directory+cand)) - - # Get the features from the candidate. - features = c.getFeatures(feature_type,candidate_type,verbose) - - # If the user would like the output to be in ARFF format, then each candidate - # has to be associated with a label. Since this code cannot know the true label - # of a candidate, here the unknown label '?' is appended as a additional feature. - if(arff and feature_type > 0 and feature_type < 7): - features.append("?") - - # Store the features so it can later be written to the specified output file. - if(meta): - # Store with meta information - basically this means including the candidate - # name (full path) with each feature set. This means that - # each set of features will be linked to a candidate, - # useful for certain investigations (i.e. why a specific - # candidate achieved particular feature values). - self.storeFeatureMeta(cand, features) - else: - self.storeFeatureNoMeta(cand, features) # Store only the feature data. - - except Exception as e: # Catch *all* exceptions. - print "\tError reading candidate data :\n\t", sys.exc_info()[0] - print self.format_exception(e) - print "\t",cand, " did not have features generated." - failures+=1 - continue - - successes+=1 + self.appendToFile(output, outputText) # Write all 10,000 entries to the output file. + self.featureStore = [] # Clear the feature store, freeing up memory. + + # Save any remaining features, since its possible that some features # were not written to the output file in the loop above. @@ -275,4 +291,4 @@ def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff) print "\tFailures:\t", failures print "\tExecution time: ", str(end - start) - # **************************************************************************************************** \ No newline at end of file + # ****************************************************************************************************