Skip to content
Open
2 changes: 1 addition & 1 deletion PulsarFeatureLab/Src/Candidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,4 @@ def __str__(self):

return self.candidateName + "," + self.candidatePath

# ****************************************************************************************************
# ****************************************************************************************************
244 changes: 130 additions & 114 deletions PulsarFeatureLab/Src/DataProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,81 @@
**************************************************************************

"""

# Standard library Imports:
import sys,os,fnmatch,datetime

import multiprocessing
import itertools

# Custom file Imports:
import Utilities, Candidate


#Helper functions defined outside the class in order to allow for mutliprocessing:
#multiprocessing pickles objects, and this doesn't work well with classes.

def featureMeta(candidate,features):
"""
Returns a string of features with the metadata attached. Strips bad data fields from the list

Parameters:

candidate - The name of the candidate the features belong to.
features - A float array of candidate features.

Return:
modified version of the list
"""
# Join features into single comma separated line.
allFeatures = str(",".join(map(str, features)))
entry1 = allFeatures + ",%" + candidate
entry2 = entry1.replace("nan","0") # Remove NaNs since these cause error for ML tools like WEKA
entry3 = entry2.replace("inf","0") # Remove infinity values since these cause error for ML tools like WEKA
return entry3

# ****************************************************************************************************

def featureNoMeta(candidate,features):
"""
Returns a string of features without the metadata attached. Strips bad data fields from the list

Parameters:

candidate - The name of the candidate the features belong to.
features - A float array of candidate features.

Return:
modified version of the list
"""

# Join features into single comma separated line.
allFeatures = str(",".join(map(str, features)))
entry1 = allFeatures
entry2 = entry1.replace("nan","0") # Remove NaNs since these cause error for ML tools like WEKA
entry3 = entry2.replace("inf","0") # Remove infinity values since these cause error for ML tools like WEKA
return entry3

def worker( (name,path,feature_type,candidate_type,verbose,meta,arff) ):
"""
Creates a candidate object to perform the processing. If succesful, returns the feature
and None,None. If the worker hits an exception,return the exception information and
the name of the candidate to the main thread so it can be printed.

This needs to be defined outside the class so it can be done in parallel
"""
try:
c = Candidate.Candidate(name,path)
features = c.getFeatures(feature_type, candidate_type, verbose)
if (arff and feature_type > 0 and feature_type < 7):
features.append('?')
if meta:
return featureMeta(path,features),None, name
else:
return featureNoMeta(path,features),None, name
except Exception as e:
#return exception information to the main thread
return None,e, name

# ****************************************************************************************************
#
# CLASS DEFINITION
Expand Down Expand Up @@ -57,54 +124,29 @@ def __init__(self,debugFlag):
self.pfdRegex = "*.pfd"
self.featureStore = [] # Variable which stores the features created for a candidate.

# ****************************************************************************************************


def storeFeatureMeta(self,candidate,features):
"""
Appends candidate features to a list held by this object. This stores
each feature in memory, as opposed to writing them out to a file each time.

Parameters:

candidate - The name of the candidate the features belong to.
features - A float array of candidate features.

Return:
N/A
"""

# Join features into single comma separated line.
allFeatures = str(",".join(map(str, features)))
entry1 = allFeatures + ",%" + candidate
entry2 = entry1.replace("nan","0") # Remove NaNs since these cause error for ML tools like WEKA
entry3 = entry2.replace("inf","0") # Remove infinity values since these cause error for ML tools like WEKA
self.featureStore.append(entry3)

# ****************************************************************************************************
def storeFeatureNoMeta(self,candidate,features):

def generate_orders(self, directory, fileTypeRegexes, feature_type,candidate_type,verbose,meta,arff):
"""
Appends candidate features to a list held by this object. This records
each feature in memory as opposed to writing them out to a file each time.

Parameters:

candidate - The name of the candidate the features belong to.
features - A float array of candidate features.

Return:
N/A
Yields the parameters to be dispatched to the subprocess workers: the filename generated from
walking the directory, and the behaviour parameters (feature_type, candidate_type, verbose, meta, arff)
"""

# Join features into single comma separated line.
allFeatures = str(",".join(map(str, features)))
entry1 = allFeatures
entry2 = entry1.replace("nan","0") # Remove NaNs since these cause error for ML tools like WEKA
entry3 = entry2.replace("inf","0") # Remove infinity values since these cause error for ML tools like WEKA
self.featureStore.append(entry3)

# ****************************************************************************************************

for filetype in fileTypeRegexes:
# Loop through the specified directory
for root, subFolders, filenames in os.walk(directory):

# If the file type matches one of those this program recognises
for filename in fnmatch.filter(filenames, filetype):
cand = os.path.join(root, filename)
#yield all the arguments that need to be passed into the worker
yield cand,os.path.join(directory,cand), feature_type, candidate_type, verbose, meta, arff

# If the file does not have the expected suffix (file extension), skip to the next.
if(cand.endswith(filetype.replace("*",""))==False):
continue

def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff):
"""
Processes pulsar candidates of the type specified by 'candidate_type'.
Expand Down Expand Up @@ -149,9 +191,9 @@ def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff)
"""

# Used to monitor feature creation statistics.
candidatesProcessed = 0;
successes = 0;
failures = 0;
candidatesProcessed = 0
successes = 0
failures = 0

print "\n\t*************************"
print "\t| Searching Recursively |"
Expand All @@ -178,74 +220,48 @@ def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff)

start = datetime.datetime.now() # Used to measure feature generation time.

# For each type of file this program recognises
for filetype in fileTypeRegexes:
#spawn a pool of workers to process the individual files

worker_pool = multiprocessing.Pool(multiprocessing.cpu_count()) #try to utilize all avaliable cores

#create a generator that feeds the filenames of the candidates to process and the behaviour options to
#the processor workers
orders = self.generate_orders(directory,fileTypeRegexes, feature_type,candidate_type,verbose,meta,arff)

#dispatch the processes to the worker pool. Use imap_unordered because it doesn't matter what order
#we process the files in
for feature,ex,name in worker_pool.imap_unordered(worker, orders):
if feature is not None:
self.featureStore.append(feature)
successes += 1
else:
#worker hit an exception: display the kind of exception and the file name.
#Unfortunatly, multiprocessing makes it harder to give a better traceback
print "\tError reading candidate data :\n\t"
print "\tEncountered exception: "
print "\t",ex
print "\t",name, " did not have features generated."
failures += 1

candidatesProcessed+=1
if(candidatesProcessed%10000==0):# Every 10,000 candidates

# Loop through the specified directory
for root, subFolders, filenames in os.walk(directory):
# This 'if' statement is used to provide useful feedback on feature
# generation. But it is also used to write the features collected so far,
# to the output file at set intervals. This helps a) reduce memory load, and
# b) reduce disc load (by writing out lots of features in one go, as opposed
# to one by one).

# If the file type matches one of those this program recognises
for filename in fnmatch.filter(filenames, filetype):

cand = os.path.join(root, filename) # Gets full path to the candidate.

# If the file does not have the expected suffix (file extension), skip to the next.
if(cand.endswith(filetype.replace("*",""))==False):
continue

candidatesProcessed+=1

if(candidatesProcessed%10000==0):# Every 10,000 candidates

# This 'if' statement is used to provide useful feedback on feature
# generation. But it is also used to write the features collected so far,
# to the output file at set intervals. This helps a) reduce memory load, and
# b) reduce disc load (by writing out lots of features in one go, as opposed
# to one by one).

print "\tCandidates processed: ", candidatesProcessed
print "\tCandidates processed: ", candidatesProcessed
# Write out the features collected so far.
outputText=""
for s in self.featureStore:
outputText+=s+"\n"

# Write out the features collected so far.
outputText=""
for s in self.featureStore:
outputText+=s+"\n"

self.appendToFile(output, outputText) # Write all 10,000 entries to the output file.
self.featureStore = [] # Clear the feature store, freeing up memory.

try:

# Create the candidate object.
c = Candidate.Candidate(cand,str(directory+cand))

# Get the features from the candidate.
features = c.getFeatures(feature_type,candidate_type,verbose)

# If the user would like the output to be in ARFF format, then each candidate
# has to be associated with a label. Since this code cannot know the true label
# of a candidate, here the unknown label '?' is appended as a additional feature.
if(arff and feature_type > 0 and feature_type < 7):
features.append("?")

# Store the features so it can later be written to the specified output file.
if(meta):
# Store with meta information - basically this means including the candidate
# name (full path) with each feature set. This means that
# each set of features will be linked to a candidate,
# useful for certain investigations (i.e. why a specific
# candidate achieved particular feature values).
self.storeFeatureMeta(cand, features)
else:
self.storeFeatureNoMeta(cand, features) # Store only the feature data.

except Exception as e: # Catch *all* exceptions.
print "\tError reading candidate data :\n\t", sys.exc_info()[0]
print self.format_exception(e)
print "\t",cand, " did not have features generated."
failures+=1
continue

successes+=1
self.appendToFile(output, outputText) # Write all 10,000 entries to the output file.
self.featureStore = [] # Clear the feature store, freeing up memory.



# Save any remaining features, since its possible that some features
# were not written to the output file in the loop above.
Expand Down Expand Up @@ -275,4 +291,4 @@ def process(self,directory,output,feature_type,candidate_type,verbose,meta,arff)
print "\tFailures:\t", failures
print "\tExecution time: ", str(end - start)

# ****************************************************************************************************
# ****************************************************************************************************