From 9c6c286c2fc22f04a78ddc1202a8134d0489ec4c Mon Sep 17 00:00:00 2001 From: Sidharth Verma Date: Tue, 5 Jun 2018 05:12:27 -0400 Subject: [PATCH 1/2] Broke out scheduler to new file --- src/scheduler/run_scheduler_simulator.py | 42 ++++-- src/scheduler/types/DistributedScheduler.py | 147 ++++++++++++++++++++ 2 files changed, 180 insertions(+), 9 deletions(-) create mode 100644 src/scheduler/types/DistributedScheduler.py diff --git a/src/scheduler/run_scheduler_simulator.py b/src/scheduler/run_scheduler_simulator.py index fe3f916..00cc3d7 100644 --- a/src/scheduler/run_scheduler_simulator.py +++ b/src/scheduler/run_scheduler_simulator.py @@ -8,6 +8,7 @@ import sys sys.path.append('src/scheduler/types') import Scheduler +import DistributedScheduler sys.path.append('data') import app_data_mobilenets as app_data import numpy as np @@ -35,6 +36,7 @@ def get_args(simulator=True): parser.add_argument("--combs-no-shuffle", action='store_true') parser.add_argument("-n", "--combs-dry-run", action='store_true') parser.add_argument("--combs-max-samples", type=int) + parser.add_argument("--distributed-nodes", type=int, default=1) return parser.parse_args() @@ -65,15 +67,28 @@ def main(): for _, app_ids in app_combs: entry_id = len(app_ids) apps = apps_from_ids(app_ids, all_apps, x_vote) - s, stats = run_simulator(min_metric, - apps, - app_data.video_desc, - budget=args.budget, - dp=dp, - mode=args.mode, - verbose=args.verbose, - scheduler=args.scheduler, - agg=args.agg) + if args.distributed_nodes <= 1: + s, stats = run_simulator(min_metric, + apps, + app_data.video_desc, + budget=args.budget, + dp=dp, + mode=args.mode, + verbose=args.verbose, + scheduler=args.scheduler, + agg=args.agg) + else: + s,stats = run_distributed_scheduler(min_metric, + apps, + args.distributed_nodes, + app_data.video_desc, + budget=args.budget, + dp=dp, + mode=args.mode, + verbose=args.verbose, + scheduler=args.scheduler, + agg=args.agg) + writer.writerow(get_eval(entry_id, s, stats)) f.flush() @@ -160,6 +175,15 @@ def run_simulator(min_metric, apps, video_desc, budget=350, mode="mainstream", d stats["avg_rel_acc"] = np.average(stats["rel_accs"]) return s, stats +def run_distributed_scheduler(min_metric, apps, distributed_nodes, video_desc, budget=350, mode="mainstream", dp={}, agg='min', **kwargs): + s = DistributedScheduler.DistributedScheduler(min_metric, apps, video_desc, app_data.model_desc, + budget, mode, dp, + distributed_nodes=distributed_nodes) + print("Hello") + + s.generate_schedulers() + (stats, partition) = s.find_best_schedule() + return (None, stats) def get_eval(entry_id, s, stats): stats["recall"] = 1. - stats["fnr"] diff --git a/src/scheduler/types/DistributedScheduler.py b/src/scheduler/types/DistributedScheduler.py new file mode 100644 index 0000000..6380b25 --- /dev/null +++ b/src/scheduler/types/DistributedScheduler.py @@ -0,0 +1,147 @@ +import sys +sys.path.append('src/scheduler') +import scheduler_util +import Schedule +import Scheduler +from itertools import combinations, combinations_with_replacement, product, chain +import itertools +import operator +import pprint as pp +import zmq +import math +from collections import OrderedDict +from collections import Counter +import gc +import numpy as np + + +class DistributedScheduler: + ### Object that performs optimization of parameters + ### and feedback with Streamer + + def __init__(self, min_metric, apps, video_desc, model_desc, budget=350, mode="mainstream", dp=None, agg='avg', distributed_nodes=1, verbose=0, scheduler='greedy'): + self.min_metric = min_metric + self.apps = apps + self.video_desc = video_desc + self.model_desc = model_desc + self.budget = budget + self.mode = mode + self.dp = dp + self.agg = agg + self.distributed_nodes = distributed_nodes + self.verbose = verbose + self.scheduler = scheduler + + def app_permutations(self, apps, distributed_nodes): + partitions = [] + n = len(self.apps) + + all_partitions = list(product(range(distributed_nodes), repeat=n)) + for partition in all_partitions: + assignments = [] + for node in range(distributed_nodes): + assigned_to_node = frozenset([i for i,elem in enumerate(list(partition)) if elem == node]) + assignments.append(assigned_to_node) + + partitions.append(assignments) + return partitions + + def generate_schedulers(self): + app_indices = list(range(len(self.apps))) + app_powerset = list(chain.from_iterable(combinations(app_indices, r) for r in range(len(app_indices)+1))) + self.schedulers = dict() + + for part in app_powerset: + part_set = frozenset(part) + if len(part_set) == 0: + stats = { + "metric": 0, + "rel_accs": [], + "fnr": 0, + "fpr": 0, + "f1": 0, + "cost": 0, + "fps": [], + "frozen": [], + "avg_rel_acc": 0, + } + self.schedulers[part_set] = (None, stats, []) + continue + + part_apps_list = [self.apps[i] for i in part_set] + s = Scheduler.Scheduler(self.min_metric, part_apps_list, self.video_desc, self.model_desc, 0) + stats = { + "metric": s.optimize_parameters(self.budget, mode=self.mode, dp=self.dp), + "rel_accs": s.get_relative_accuracies(), + } + + sched = s.make_streamer_schedule() + stats["fnr"], stats["fpr"], stats["f1"], stats["cost"] = s.get_observed_performance(sched, s.target_fps_list) + stats["fps"] = s.target_fps_list + stats["frozen"] = s.num_frozen_list + stats["avg_rel_acc"] = np.average(stats["rel_accs"]) + + self.schedulers[part_set] = (s,stats, list(part_set)) + + def find_best_schedule(self): + partitions = self.app_permutations(self.apps, self.distributed_nodes) + + best_partition = None + best_metric = -1 + for partition in partitions: + if self.agg == 'min': + met = -1 + for node in partition: + (node_s, (node_stats, node_list)) = self.schedulers[node] + if met == -1: + met = node_stats['metric'] + else: + met = min(met, node_stats['metric']) + else: + met = 0 + for node in partition: + node_tup = self.schedulers[node] + node_s = node_tup[0] + node_stats = node_tup[1] + node_list = node_tup[2] + met = met + node_stats['metric'] * float(len(node)) / float(len(self.apps)) + + if met > best_metric or best_metric == -1: + best_metric = met + best_partition = partition + + stats = { + 'metric': best_metric, + 'fnr': 0, + 'fpr': 0, + 'f1': 0, + 'cost': 0, + 'fps': [0]*len(self.apps), + 'frozen': [0]*len(self.apps), + 'rel_accs': [0]*len(self.apps), + } + + print("Partition: {") + for node in best_partition: + node_tup = self.schedulers[node] + node_s = node_tup[0] + node_stats = node_tup[1] + node_list = node_tup[2] + print(node_list) + stats['fnr'] = stats['fnr'] + node_stats['fnr'] * float(len(node)) / float(len(self.apps)) + stats['fpr'] = stats['fpr'] + node_stats['fpr'] * float(len(node)) / float(len(self.apps)) + stats['f1'] = stats['f1'] + node_stats['f1'] * float(len(node)) / float(len(self.apps)) + stats['cost'] = stats['cost'] + node_stats['cost'] * float(len(node)) / float(len(self.apps)) + + node_index = 0 + for full_index in node_list: + stats['fps'][full_index] = node_stats['fps'][node_index] + stats['frozen'][full_index] = node_stats['frozen'][node_index] + stats['rel_accs'][full_index] = node_stats['rel_accs'][node_index] + node_index = node_index + 1 + + stats['avg_rel_acc'] = np.average(stats['rel_accs']) + print("}") + return (stats, best_partition) + + From 723f8ffae90b51675154caffbefe78b176a29d7b Mon Sep 17 00:00:00 2001 From: Sidharth Verma Date: Fri, 8 Jun 2018 04:00:06 -0400 Subject: [PATCH 2/2] extraneous debug statement --- src/scheduler/run_scheduler_simulator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/scheduler/run_scheduler_simulator.py b/src/scheduler/run_scheduler_simulator.py index 00cc3d7..5bd61f7 100644 --- a/src/scheduler/run_scheduler_simulator.py +++ b/src/scheduler/run_scheduler_simulator.py @@ -179,7 +179,6 @@ def run_distributed_scheduler(min_metric, apps, distributed_nodes, video_desc, b s = DistributedScheduler.DistributedScheduler(min_metric, apps, video_desc, app_data.model_desc, budget, mode, dp, distributed_nodes=distributed_nodes) - print("Hello") s.generate_schedulers() (stats, partition) = s.find_best_schedule()