Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 49 additions & 24 deletions ltu/engine/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,51 @@
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# TODO: Stat class is not thread safe, we should update it.
global stat
stat = Stat()

global oStat
oStat = Stat()

def print_result_per_action(nb_threads, actions):
"""print stat per actions"""
logger.info("Result per actions called:")
for action in actions:
# global time to excecute all the queries per an action
time = oStat.get_value('queries.' +action +'._time')
stat_path = 'queries.' + action + '.already'
#how many images to perform
nbImages = oStat.get_total(action, 'queries') - oStat.get_count(stat_path)
if time == 0:
imagesPerSec = nbImages / 1
else:
imagesPerSec = nbImages/time

stat_path = "queries." + action + ".errors"
if nbImages > 0:
bench = "{} done: {} images in {} sec on {} threads, {} images per sec, {} failded".format(action, nbImages , time, nb_threads, imagesPerSec, oStat.get_count(stat_path))
else:
bench = "{} done: 0 images, {} failed".format(action, oStat.get_count(stat_path))

logger.info(bench)

def print_stat_global():
"""print the global statistics"""
logger.info("")
logger.info("Queries Statistics: ")
logger.info("{} images to process".format(oStat.get_count('images')))
logger.info("{} queries have been correctly performed on the {} to excute".format(oStat.get_total('ok', 'queries'), oStat.get_count('queries')))
logger.info("{} actions failed to be performed: {} add, {} search and {} delete".format(oStat.get_total('errors', 'queries'), oStat.get_count('queries.add.errors'), oStat.get_count('queries.search.errors'), oStat.get_count('queries.delete.errors')))
logger.info("{} actions have been already processed and not forced to be performed again".format(oStat.get_total('already', 'quieries')))

def print_stat(nb_threads):
def print_stat(nb_threads, actions):
""" print all the statistics global and per action """
stat.print_stat_global()
print_stat_global()
logger.info("")
stat.print_result_per_action(nb_threads)

print_result_per_action(nb_threads, actions)

def get_action_name_from_function(function):
"""return from a function (add_image, search_image, delete_image) the name of the action concerned
"""
return function.__name__.split('_')[0]


def run_single_task(item):
"""Run given action for one file
"""
Expand All @@ -52,9 +79,11 @@ def run_single_task(item):
logger.debug("Finish with status %s" %(result.status_code))
if result.status_code < 0:
logger.debug('An issue occuted with the file {}. Consult the json result. file'.format(in_file))
stat.nb_errors[action] += 1
stat_path = "queries." + action + ".errors." + str(result.status_code)
oStat.incremente(stat_path)
else:
stat.treated[action] += 1
stat_path = "queries." + action + ".ok"
oStat.incremente(stat_path)

except Exception as e:
logger.critical('An issue has occured. Could not perform the action {}. The process is stopped: {}'.format(action,e))
Expand All @@ -67,7 +96,6 @@ def run_single_task(item):
logger.critical('Could not save the result for the action {}: {}'.format(action,e))
sys.exit(-1)


def run_task_mono_thread(action_function, files, action_label, nb_threads=1, offset=0):
"""Run given action on every files, one at a time.
"""
Expand All @@ -77,7 +105,6 @@ def run_task_mono_thread(action_function, files, action_label, nb_threads=1, off
logger.info("%s: %s" % (action_label, file["in"]))
run_single_task(item)


def run_task_multi_thread(action_function, files, action_label, nb_threads=2, offset=0):
"""Run given action on every files using a threading pool.
It uses a progress bar instead of a usual verbose log.
Expand All @@ -92,7 +119,6 @@ def run_task_multi_thread(action_function, files, action_label, nb_threads=2, of
for item in progress_bar_items:
pass


def generate_actions_list_per_images(actions_list, input_dir, force):
"""Generate a list of actions to process per image. For each image are saved:
- input path of the image
Expand Down Expand Up @@ -120,8 +146,7 @@ def generate_actions_list_per_images(actions_list, input_dir, force):
sys.exit(-1)

files = []
b_file = False # indicate if there are files to performed
untreated = 0
b_file = False # indicate if there are files to performe

image_path = os.path.basename(input_dir)

Expand Down Expand Up @@ -155,6 +180,7 @@ def generate_actions_list_per_images(actions_list, input_dir, force):
# files_path[action]: result json file path per action
if not file == ".DS_Store":
# Except Mac store file
oStat.incremente("images")
b_file = True
files_path = {}
b_action = False
Expand All @@ -164,11 +190,11 @@ def generate_actions_list_per_images(actions_list, input_dir, force):
# the imge will be processed
if not os.path.exists(json_path) or force:
b_action = True
stat.queries_to_treat += 1
files_path[action] = json_path
else:
# the image won't be performed
stat.already += 1
stat_path = "queries." + action + ".already"
oStat.incremente(stat_path)
logger.debug("%s action already performed for this file. You can consult the result in the Json file. To generate new result, delete the Json File or force the %s action by adding the --force parameter in the command." %(action, action))

if b_action:
Expand Down Expand Up @@ -227,18 +253,16 @@ def ltuengine_process_dir(actions: "A list(separate each action by a comma) of a
if files:
# nb images to treat
nb_files = len(files) - offset
stat.submitted += len(files)
stat.to_treat += nb_files

# create client
logger.info("")
modifyClient = ModifyClient(application_key, server_url=host)

for nb_threads in all_threads:
nb_errors_before_treatment = stat.nb_errors
for action in actions_list:
logger.info("")
start_time = time.time()
nb_files = - oStat.get_count('queries.'+action+'.ok') - oStat.get_count('queries.'+action+'.errors')
# get the appropriate function to run the task
# - run_task_mono_thread will run on 1 thread and show some logs
# - run_task_multi_thread will run on multiple threads and use a progress bar
Expand All @@ -255,11 +279,12 @@ def ltuengine_process_dir(actions: "A list(separate each action by a comma) of a
logger.info("Searching directory %s images into application %s" % (input_dir, application_key))
run_task(queryClient.search_image, files, "Searching image", nb_threads, offset)

nb_files += oStat.get_count('queries.'+action+'.ok') + oStat.get_count('queries.'+action+'.errors')
end_time = (time.time() - start_time)

# save action statistics per
stat.set_result_per_action(action, end_time)
stat_path = "queries." + action + "._time"
oStat.add_and_average_stat(stat_path, end_time)
bench = "%s done, %d images, in %f sec on %d threads, %f images per sec" % (action, nb_files, end_time, nb_threads, nb_files/end_time)
logger.debug(bench)

print_stat(nb_threads)
print_stat(nb_threads, actions)
176 changes: 120 additions & 56 deletions ltu/engine/stat.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import threading

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
Expand All @@ -8,68 +9,131 @@ class Stat(object):
"""

def __init__(self):
self.dict = {}

#images
self.treated = {} #how many images could have been successfully treated per action
self.treated["add"] = 0
self.treated["delete"] = 0
self.treated["search"] = 0
#mutex
self.mutex = threading.Lock()

self.to_treat = 0 #images to treat - offset
self.queries_to_treat = 0 #complete number of queries
self.submitted = 0 #images in the repertory - already treated
self.already = 0 #how many images already treated

self.nb_errors = {} #how many images failed to be treated
self.nb_errors["total"] = 0
self.nb_errors["add"] = 0
self.nb_errors["delete"] = 0
self.nb_errors["search"] = 0

self.time = {}
self.time["add"] = 1
self.time["delete"] = 1
self.time["search"] = 1

def add_error(self, action):
"""add one more error for one specified action"""
self.nb_errors[action] += self.nb_errors[action]
def incremente(self, path, step=1):
"""incremente of value step a counter for each key contained in path
"""
self.mutex.acquire()
keys = path.split('.')
current_element = self.dict
for key in keys:
if '_count' not in current_element:
#we create a new counter
current_element['_count'] = step
else:
# we increment existing key/value
current_element['_count'] = current_element['_count'] + step
if key not in current_element:
current_element[key] = {'_count': 0}
# check last key
if key == keys[-1]:
# we reach the key of the path
# we increment existing key/value
current_element[key]['_count'] = current_element[key]['_count'] + step
else:
# we reach the next key of the path
current_element = current_element[key]

def get_nb_errors(self):
"""return the number of errors"""
nbe = 0
for action in self.nb_errors:
nbe += self.nb_errors[action]
return nbe
logger.debug(self.dict)
self.mutex.release()

def get_nb_queries_treated(self):
"""return how manies queries succed"""
nbq = 0
for action in self.treated:
nbq += self.treated[action]
return nbq
def add_and_average_stat(self, path, value):
""" add a value to a statistic. The second time we add this value, we make an average """
self.mutex.acquire()
keys = path.split('.')
current_element = self.dict
for key in keys:
# check last key
if key == keys[-1]:
# we reach the key of the path
# we add key/value
if key in current_element:
current_element[key] += value
# we make an average of the value
current_element[key] /= 2
else:
current_element[key] = value
else:
if key not in current_element:
# create a new dict
current_element[key] = {}
# we reach the next key of the path
current_element = current_element[key]
logger.debug(self.dict)
self.mutex.release()

def set_result_per_action(self, action, end_time):
"""save in a list the result of an action"""
self.time[action] += end_time
self.time[action] /= 2
def get_count(self, path):
""" retour the count value of a stat"""
keys = path.split('.')
current_element = self.dict
for key in keys:
if key in current_element:
if key == keys[-1]:
return current_element[key]['_count']
else:
current_element = current_element[key]
else:
return 0
return 0

def print_result_per_action(self, nb_threads):
"""print stat per actions"""
logger.info("Result per actions called:")
for action in self.treated:
if self.treated[action] > 0:
bench = "%s done: %d images in %f sec on %d threads, %f images per sec, %d failded" % (action, self.treated[action], self.time[action], nb_threads, self.treated[action]/ self.time[action], self.nb_errors[action])
def get_value(self, path):
""" retour the value af a stat"""
keys = path.split('.')
current_element = self.dict
for key in keys:
if key in current_element:
if key == keys[-1]:
if key in current_element:
return current_element[key]
return 0
else:
current_element = current_element[key]
else:
bench = "%s done: 0 images, %d failed" % (action, self.nb_errors[action])
return 0
return 0

def get_total(self, key, path=None, data=None, count=0 ):
""" return the sum of all the count values for key from a path
if no path is specified, the count is totalized in all the dict
"""
if data == None:
data = self.dict
elif path != None:
#starting path
keys = path.split('.')
current_element = self.dict
for k in keys:
if k in current_element:
data = current_element[k]
else:
continue

logger.info(bench)
for k, value in data.items():
if type(value) != dict:
continue
if "_count" not in value:
continue
if k == key:
count = count + value['_count']
return count
elif k == '_count':
continue
count = self.get_total(key, None, value, count)
return count

def print_stat_global(self):
"""print the global statistics"""
logger.info("")
logger.info("Queries Statistics: ")
logger.info("{} images to process".format(self.to_treat))
logger.info("{} queries have been correctly performed on the {} to treat".format(self.get_nb_queries_treated(), self.queries_to_treat))
logger.info("{} actions failed to be performed: {} add, {} search and {} delete".format(self.get_nb_errors(), self.nb_errors["add"], self.nb_errors["search"], self.nb_errors["delete"]))
logger.info("{} actions have been already treated for an image and not forced to be processed again".format(self.already))
def print_stats(self, data=None, indent=0):
""" print all the values of the dict"""
if data == None:
data = self.dict
for key, value in data.items():
if key == '_count':
continue
if type(value)==dict:
print("{} - {}: {}".format(" " * indent, key, value['_count']))
self.print_stats(value, indent+2)
else:
print("{} - {}: {}".format(" " * indent, key, value))