From f4cafd08310c347241a2b1606b8eda3633518771 Mon Sep 17 00:00:00 2001 From: Pauline Dufour Date: Fri, 10 Nov 2017 16:59:28 +0100 Subject: [PATCH 1/3] update stats class --- ltu/engine/stat.py | 176 ++++++++++++++++++++++++++++++--------------- 1 file changed, 120 insertions(+), 56 deletions(-) diff --git a/ltu/engine/stat.py b/ltu/engine/stat.py index 061b962..8b072c7 100644 --- a/ltu/engine/stat.py +++ b/ltu/engine/stat.py @@ -1,4 +1,5 @@ import logging +import threading logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -8,68 +9,131 @@ class Stat(object): """ def __init__(self): + self.dict = {} - #images - self.treated = {} #how many images could have been successfully treated per action - self.treated["add"] = 0 - self.treated["delete"] = 0 - self.treated["search"] = 0 + #mutex + self.mutex = threading.Lock() - self.to_treat = 0 #images to treat - offset - self.queries_to_treat = 0 #complete number of queries - self.submitted = 0 #images in the repertory - already treated - self.already = 0 #how many images already treated - - self.nb_errors = {} #how many images failed to be treated - self.nb_errors["total"] = 0 - self.nb_errors["add"] = 0 - self.nb_errors["delete"] = 0 - self.nb_errors["search"] = 0 - - self.time = {} - self.time["add"] = 1 - self.time["delete"] = 1 - self.time["search"] = 1 - - def add_error(self, action): - """add one more error for one specified action""" - self.nb_errors[action] += self.nb_errors[action] + def incremente(self, path, step=1): + """incremente of value step a counter for each key contained in path + """ + self.mutex.acquire() + keys = path.split('.') + current_element = self.dict + for key in keys: + if '_count' not in current_element: + #we create a new counter + current_element['_count'] = step + else: + # we increment existing key/value + current_element['_count'] = current_element['_count'] + step + if key not in current_element: + current_element[key] = {'_count': 0} + # check last key + if key == keys[-1]: + # we reach the key of the path + # we increment existing key/value + current_element[key]['_count'] = current_element[key]['_count'] + step + else: + # we reach the next key of the path + current_element = current_element[key] - def get_nb_errors(self): - """return the number of errors""" - nbe = 0 - for action in self.nb_errors: - nbe += self.nb_errors[action] - return nbe + logger.debug(self.dict) + self.mutex.release() - def get_nb_queries_treated(self): - """return how manies queries succed""" - nbq = 0 - for action in self.treated: - nbq += self.treated[action] - return nbq + def add_and_average_stat(self, path, value): + """ add a value to a statistic. The second time we add this value, we make an average """ + self.mutex.acquire() + keys = path.split('.') + current_element = self.dict + for key in keys: + # check last key + if key == keys[-1]: + # we reach the key of the path + # we add key/value + if key in current_element: + current_element[key] += value + # we make an average of the value + current_element[key] /= 2 + else: + current_element[key] = value + else: + if key not in current_element: + # create a new dict + current_element[key] = {} + # we reach the next key of the path + current_element = current_element[key] + logger.debug(self.dict) + self.mutex.release() - def set_result_per_action(self, action, end_time): - """save in a list the result of an action""" - self.time[action] += end_time - self.time[action] /= 2 + def get_count(self, path): + """ retour the count value of a stat""" + keys = path.split('.') + current_element = self.dict + for key in keys: + if key in current_element: + if key == keys[-1]: + return current_element[key]['_count'] + else: + current_element = current_element[key] + else: + return 0 + return 0 - def print_result_per_action(self, nb_threads): - """print stat per actions""" - logger.info("Result per actions called:") - for action in self.treated: - if self.treated[action] > 0: - bench = "%s done: %d images in %f sec on %d threads, %f images per sec, %d failded" % (action, self.treated[action], self.time[action], nb_threads, self.treated[action]/ self.time[action], self.nb_errors[action]) + def get_value(self, path): + """ retour the value af a stat""" + keys = path.split('.') + current_element = self.dict + for key in keys: + if key in current_element: + if key == keys[-1]: + if key in current_element: + return current_element[key] + return 0 + else: + current_element = current_element[key] else: - bench = "%s done: 0 images, %d failed" % (action, self.nb_errors[action]) + return 0 + return 0 + + def get_total(self, key, path=None, data=None, count=0 ): + """ return the sum of all the count values for key from a path + if no path is specified, the count is totalized in all the dict + """ + if data == None: + data = self.dict + elif path != None: + #starting path + keys = path.split('.') + current_element = self.dict + for k in keys: + if k in current_element: + data = current_element[k] + else: + continue - logger.info(bench) + for k, value in data.items(): + if type(value) != dict: + continue + if "_count" not in value: + continue + if k == key: + count = count + value['_count'] + return count + elif k == '_count': + continue + count = self.get_total(key, None, value, count) + return count - def print_stat_global(self): - """print the global statistics""" - logger.info("") - logger.info("Queries Statistics: ") - logger.info("{} images to process".format(self.to_treat)) - logger.info("{} queries have been correctly performed on the {} to treat".format(self.get_nb_queries_treated(), self.queries_to_treat)) - logger.info("{} actions failed to be performed: {} add, {} search and {} delete".format(self.get_nb_errors(), self.nb_errors["add"], self.nb_errors["search"], self.nb_errors["delete"])) - logger.info("{} actions have been already treated for an image and not forced to be processed again".format(self.already)) + def print_stats(self, data=None, indent=0): + """ print all the values of the dict""" + if data == None: + data = self.dict + for key, value in data.items(): + if key == '_count': + continue + if type(value)==dict: + print("{} - {}: {}".format(" " * indent, key, value['_count'])) + self.print_stats(value, indent+2) + else: + print("{} - {}: {}".format(" " * indent, key, value)) From fef555e35612b51640e409af5c8a9b9fcba5273b Mon Sep 17 00:00:00 2001 From: Pauline Dufour Date: Fri, 10 Nov 2017 15:36:33 +0100 Subject: [PATCH 2/3] refactoring of statistic class --- ltu/engine/cli.py | 67 ++++++++++++++++++++++++++++++++++------------ ltu/engine/stat.py | 7 ----- 2 files changed, 50 insertions(+), 24 deletions(-) diff --git a/ltu/engine/cli.py b/ltu/engine/cli.py index c1ef342..e50f447 100755 --- a/ltu/engine/cli.py +++ b/ltu/engine/cli.py @@ -18,15 +18,47 @@ logger.setLevel(logging.DEBUG) # TODO: Stat class is not thread safe, we should update it. -global stat -stat = Stat() +global oStat +oStat = Stat() + +def print_result_per_action(nb_threads, actions): + """print stat per actions""" + logger.info("Result per actions called:") + for action in actions: + # global time to excecute all the queries per an action + time = oStat.get_value('queries.' +action +'._time') + stat_path = 'queries.' + action + '.already' + #how many images to perform + nbImages = oStat.get_total(action, 'queries') - oStat.get_count(stat_path) + if time == 0: + imagesPerSec = nbImages / 1 + else: + imagesPerSec = nbImages/time + + stat_path = "queries." + action + ".errors" + if nbImages > 0: + bench = "{} done: {} images in {} sec on {} threads, {} images per sec, {} failded".format(action, nbImages , time, nb_threads, imagesPerSec, oStat.get_count(stat_path)) + else: + bench = "{} done: 0 images, {} failed".format(action, oStat.get_count(stat_path)) + + logger.info(bench) + +def print_stat_global(): + """print the global statistics""" + logger.info("") + logger.info("Queries Statistics: ") + logger.info("{} images to process".format(oStat.get_count('images'))) + logger.info("{} queries have been correctly performed on the {} to excute".format(oStat.get_total('ok', 'queries'), oStat.get_count('queries'))) + logger.info("{} actions failed to be performed: {} add, {} search and {} delete".format(oStat.get_total('errors', 'queries'), oStat.get_count('queries.add.errors'), oStat.get_count('queries.search.errors'), oStat.get_count('queries.delete.errors'))) + logger.info("{} actions have been already processed and not forced to be performed again".format(oStat.get_total('already', 'quieries'))) -def print_stat(nb_threads): +def print_stat(nb_threads, actions): """ print all the statistics global and per action """ - stat.print_stat_global() + print_stat_global() logger.info("") - stat.print_result_per_action(nb_threads) + print_result_per_action(nb_threads, actions) + logger.debug(oStat.print_stats()) def get_action_name_from_function(function): @@ -52,9 +84,11 @@ def run_single_task(item): logger.debug("Finish with status %s" %(result.status_code)) if result.status_code < 0: logger.debug('An issue occuted with the file {}. Consult the json result. file'.format(in_file)) - stat.nb_errors[action] += 1 + stat_path = "queries." + action + ".errors." + str(result.status_code) + oStat.incremente(stat_path) else: - stat.treated[action] += 1 + stat_path = "queries." + action + ".ok" + oStat.incremente(stat_path) except Exception as e: logger.critical('An issue has occured. Could not perform the action {}. The process is stopped: {}'.format(action,e)) @@ -120,8 +154,7 @@ def generate_actions_list_per_images(actions_list, input_dir, force): sys.exit(-1) files = [] - b_file = False # indicate if there are files to performed - untreated = 0 + b_file = False # indicate if there are files to performe image_path = os.path.basename(input_dir) @@ -155,6 +188,7 @@ def generate_actions_list_per_images(actions_list, input_dir, force): # files_path[action]: result json file path per action if not file == ".DS_Store": # Except Mac store file + oStat.incremente("images") b_file = True files_path = {} b_action = False @@ -164,11 +198,11 @@ def generate_actions_list_per_images(actions_list, input_dir, force): # the imge will be processed if not os.path.exists(json_path) or force: b_action = True - stat.queries_to_treat += 1 files_path[action] = json_path else: # the image won't be performed - stat.already += 1 + stat_path = "queries." + action + ".already" + oStat.incremente(stat_path) logger.debug("%s action already performed for this file. You can consult the result in the Json file. To generate new result, delete the Json File or force the %s action by adding the --force parameter in the command." %(action, action)) if b_action: @@ -227,18 +261,16 @@ def ltuengine_process_dir(actions: "A list(separate each action by a comma) of a if files: # nb images to treat nb_files = len(files) - offset - stat.submitted += len(files) - stat.to_treat += nb_files # create client logger.info("") modifyClient = ModifyClient(application_key, server_url=host) for nb_threads in all_threads: - nb_errors_before_treatment = stat.nb_errors for action in actions_list: logger.info("") start_time = time.time() + nb_files = - oStat.get_count('queries.'+action+'.ok') - oStat.get_count('queries.'+action+'.errors') # get the appropriate function to run the task # - run_task_mono_thread will run on 1 thread and show some logs # - run_task_multi_thread will run on multiple threads and use a progress bar @@ -255,11 +287,12 @@ def ltuengine_process_dir(actions: "A list(separate each action by a comma) of a logger.info("Searching directory %s images into application %s" % (input_dir, application_key)) run_task(queryClient.search_image, files, "Searching image", nb_threads, offset) + nb_files += oStat.get_count('queries.'+action+'.ok') + oStat.get_count('queries.'+action+'.errors') end_time = (time.time() - start_time) - # save action statistics per - stat.set_result_per_action(action, end_time) + stat_path = "queries." + action + "._time" + oStat.add_and_average_stat(stat_path, end_time) bench = "%s done, %d images, in %f sec on %d threads, %f images per sec" % (action, nb_files, end_time, nb_threads, nb_files/end_time) logger.debug(bench) - print_stat(nb_threads) + print_stat(nb_threads, actions) diff --git a/ltu/engine/stat.py b/ltu/engine/stat.py index 8b072c7..a71e63d 100644 --- a/ltu/engine/stat.py +++ b/ltu/engine/stat.py @@ -11,13 +11,9 @@ class Stat(object): def __init__(self): self.dict = {} - #mutex - self.mutex = threading.Lock() - def incremente(self, path, step=1): """incremente of value step a counter for each key contained in path """ - self.mutex.acquire() keys = path.split('.') current_element = self.dict for key in keys: @@ -39,11 +35,9 @@ def incremente(self, path, step=1): current_element = current_element[key] logger.debug(self.dict) - self.mutex.release() def add_and_average_stat(self, path, value): """ add a value to a statistic. The second time we add this value, we make an average """ - self.mutex.acquire() keys = path.split('.') current_element = self.dict for key in keys: @@ -64,7 +58,6 @@ def add_and_average_stat(self, path, value): # we reach the next key of the path current_element = current_element[key] logger.debug(self.dict) - self.mutex.release() def get_count(self, path): """ retour the count value of a stat""" From 04ba293b76fb65db17d49bc7a4e7da9ba2ea55b8 Mon Sep 17 00:00:00 2001 From: Pauline Dufour Date: Fri, 10 Nov 2017 16:10:48 +0100 Subject: [PATCH 3/3] add mutex in stat class --- ltu/engine/cli.py | 8 -------- ltu/engine/stat.py | 7 +++++++ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/ltu/engine/cli.py b/ltu/engine/cli.py index e50f447..156b275 100755 --- a/ltu/engine/cli.py +++ b/ltu/engine/cli.py @@ -17,7 +17,6 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -# TODO: Stat class is not thread safe, we should update it. global oStat oStat = Stat() @@ -52,21 +51,17 @@ def print_stat_global(): logger.info("{} actions failed to be performed: {} add, {} search and {} delete".format(oStat.get_total('errors', 'queries'), oStat.get_count('queries.add.errors'), oStat.get_count('queries.search.errors'), oStat.get_count('queries.delete.errors'))) logger.info("{} actions have been already processed and not forced to be performed again".format(oStat.get_total('already', 'quieries'))) - def print_stat(nb_threads, actions): """ print all the statistics global and per action """ print_stat_global() logger.info("") print_result_per_action(nb_threads, actions) - logger.debug(oStat.print_stats()) - def get_action_name_from_function(function): """return from a function (add_image, search_image, delete_image) the name of the action concerned """ return function.__name__.split('_')[0] - def run_single_task(item): """Run given action for one file """ @@ -101,7 +96,6 @@ def run_single_task(item): logger.critical('Could not save the result for the action {}: {}'.format(action,e)) sys.exit(-1) - def run_task_mono_thread(action_function, files, action_label, nb_threads=1, offset=0): """Run given action on every files, one at a time. """ @@ -111,7 +105,6 @@ def run_task_mono_thread(action_function, files, action_label, nb_threads=1, off logger.info("%s: %s" % (action_label, file["in"])) run_single_task(item) - def run_task_multi_thread(action_function, files, action_label, nb_threads=2, offset=0): """Run given action on every files using a threading pool. It uses a progress bar instead of a usual verbose log. @@ -126,7 +119,6 @@ def run_task_multi_thread(action_function, files, action_label, nb_threads=2, of for item in progress_bar_items: pass - def generate_actions_list_per_images(actions_list, input_dir, force): """Generate a list of actions to process per image. For each image are saved: - input path of the image diff --git a/ltu/engine/stat.py b/ltu/engine/stat.py index a71e63d..8b072c7 100644 --- a/ltu/engine/stat.py +++ b/ltu/engine/stat.py @@ -11,9 +11,13 @@ class Stat(object): def __init__(self): self.dict = {} + #mutex + self.mutex = threading.Lock() + def incremente(self, path, step=1): """incremente of value step a counter for each key contained in path """ + self.mutex.acquire() keys = path.split('.') current_element = self.dict for key in keys: @@ -35,9 +39,11 @@ def incremente(self, path, step=1): current_element = current_element[key] logger.debug(self.dict) + self.mutex.release() def add_and_average_stat(self, path, value): """ add a value to a statistic. The second time we add this value, we make an average """ + self.mutex.acquire() keys = path.split('.') current_element = self.dict for key in keys: @@ -58,6 +64,7 @@ def add_and_average_stat(self, path, value): # we reach the next key of the path current_element = current_element[key] logger.debug(self.dict) + self.mutex.release() def get_count(self, path): """ retour the count value of a stat"""