diff --git a/src/scheduler/Scheduler.py b/src/scheduler/Scheduler.py index 7c1f9cf..8052066 100644 --- a/src/scheduler/Scheduler.py +++ b/src/scheduler/Scheduler.py @@ -6,7 +6,7 @@ import operator import pprint as pp import zmq - +from random import shuffle class Scheduler: ### Object that performs optimization of parameters @@ -16,6 +16,7 @@ def __init__(self, metric, apps, video_desc, model_desc, sigma): self.apps = apps self.video_desc = video_desc self.metric = metric + self.metrics = [] self.model = Schedule.Model(model_desc) self.num_frozen_list = [] self.target_fps_list = [] @@ -123,6 +124,34 @@ def set_schedule_values(self, schedule): return average_metric + def set_schedule_values_metric(self, schedule): + # Same as set_schedule_values except returns list of metric values + metrics = [] + target_fps_list = [] + num_frozen_list = [] + for unit in schedule: + target_fps = unit.target_fps + num_frozen = unit.num_frozen + app_id = unit.app_id + app = unit.app + metric = self.get_metric(app, + num_frozen, + target_fps) + + target_fps_list.append(target_fps) + num_frozen_list.append(num_frozen) + metrics.append(metric) + print "------------- Schedule -------------" + for unit in schedule: + print "App:", unit.app_id, "- num_frozen:", unit.num_frozen, ", target_fps:", unit.target_fps + + ## Set parameters of schedule + self.schedule = schedule + self.num_frozen_list = num_frozen_list + self.target_fps_list = target_fps_list + + return metrics + def set_max_parameters(self): # Makes schedule which uses max sharing and max target_fps # Sets self.schedule, self.num_frozen_list, self.target_fps_list @@ -201,14 +230,13 @@ def optimize_parameters(self, cost_threshold): ## Calculate all possible schedules possible_params = [] + target_fps_options = range(1, self.stream_fps + 1) for num_frozen in sorted(self.apps[0]["accuracies"].keys()): - for target_fps in range(1, self.stream_fps + 1): + for target_fps in target_fps_options: possible_params.append((num_frozen, target_fps)) cost_benefits = {} - target_fps_options = range(1, self.stream_fps + 1) - current_schedule = [] for app in self.apps: @@ -296,18 +324,138 @@ def optimize_parameters(self, cost_threshold): current_schedule = best_new_schedule average_metric = self.set_schedule_values(current_schedule) - return average_metric - def make_streamer_schedule_no_sharing(self): + def optimize_per_app(self, cost_threshold): + # Makes schedule with optimal choices for num_frozen and target_fps + # Sets self.schedule, self.num_frozen_list, self.target_fps_list - s = Schedule.StreamerSchedule() + ## Calculate all possible schedules + possible_params = [] + target_fps_options = range(1, self.stream_fps + 1) + for num_frozen in sorted(self.apps[0]["accuracies"].keys()): + for target_fps in target_fps_options: + possible_params.append((num_frozen, target_fps)) + + cost_benefits = {} + + current_schedule = [] + + app_budget = cost_threshold / len(self.apps) for app in self.apps: - num_frozen = min(app["accuracies"].keys()) - net = Schedule.NeuralNet(s.get_id(), - app["app_id"], - self.model, + app_id = app["app_id"] + cost_benefits[app_id] = {} + num_frozen_options = app["accuracies"].keys() + for num_frozen in reversed(sorted(num_frozen_options)): + if num_frozen not in cost_benefits[app_id].keys(): + cost_benefits[app_id][num_frozen] = {} + for target_fps in target_fps_options: + benefit = self.get_metric(app, + num_frozen, + target_fps) + cost = scheduler_util.get_cost(num_frozen, + target_fps, + self.model.layer_latencies) + cost_benefits[app_id][num_frozen][target_fps] = (cost, + benefit) + cheapest_target_fps = min(target_fps_options) + cheapest_num_frozen = max(num_frozen_options) + current_schedule.append(Schedule.ScheduleUnit(app, + cheapest_target_fps, + cheapest_num_frozen)) + + ## Make moves in order of maximal cost/benefit + ## which decrease the metric and fit the budget + updated = True # Stopping condition + + # After each app has been given its opportunity under equal-budget fairness, + # allow other apps to take advantage of remaining budget by falling back to normal Mainstream + ignore_fairness = False + + while (updated): + updated = False + # Get next best change to schedule + # Upgrade is (target_fps, #frozen) with larger + # cost and largest cost/benefit across all apps + max_cost_benefit = 0 + best_new_unit = -1 + for unit in current_schedule: + cur_target_fps = unit.target_fps + cur_num_frozen = unit.num_frozen + app_id = unit.app_id + app = unit.app + cur_metric = self.get_metric(app, + cur_num_frozen, + cur_target_fps) + + for potential_target_fps in target_fps_options: + for potential_num_frozen in sorted(num_frozen_options): + # Skip if it is not a change + u_apps = [u for u in current_schedule if u.app_id == app_id] + if (u_apps[0].num_frozen == potential_num_frozen and + u_apps[0].target_fps == potential_target_fps): + continue + + cost_benefit_tup = \ + cost_benefits[app_id][potential_num_frozen][potential_target_fps] + cost_benefit = cost_benefit_tup[1] / float(cost_benefit_tup[0]) + potential_metric = self.get_metric(app, + potential_num_frozen, + potential_target_fps) + if potential_metric < cur_metric and cost_benefit > max_cost_benefit: + + # Check that move its within budget + potential_unit = Schedule.ScheduleUnit(app, + potential_target_fps, + potential_num_frozen) + potential_schedule = [] + for c_unit in current_schedule: + if c_unit.app_id == potential_unit.app_id: + potential_schedule.append(potential_unit) + else: + copy_unit = Schedule.ScheduleUnit(c_unit.app, + c_unit.target_fps, + c_unit.num_frozen) + potential_schedule.append(copy_unit) + potential_sched_cost = scheduler_util.get_cost_schedule(potential_schedule, + self.model.layer_latencies, + self.model.final_layer) + ##### Fairness modifications##### + scheduled_cost_app = scheduler_util.get_cost_per_app(self.apps, + potential_schedule, + self.model.layer_latencies, + self.model.final_layer) + + # TODO: Refactor away duplicated code. + if potential_sched_cost <= cost_threshold and (scheduled_cost_app[app_id] <= app_budget or ignore_fairness): + #if potential_sched_cost <= cost_threshold: + cost = potential_sched_cost + max_cost_benefit = cost_benefit + best_new_unit = potential_unit + best_new_schedule = potential_schedule + updated = True + ##### End Fairness modifications ##### + + if updated: + current_schedule = best_new_schedule + + if not updated and not ignore_fairness: + ignore_fairness = True + updated = True + + average_metric = self.set_schedule_values(current_schedule) + return average_metric + + def make_streamer_schedule_no_sharing(self): + + s = Schedule.StreamerSchedule() + + for app in self.apps: + num_frozen = min(app["accuracies"].keys()) + net = Schedule.NeuralNet(s.get_id(), + app["app_id"], + self.model, -1, 1, self.model.final_layer, diff --git a/src/scheduler/run_scheduler_sim_fair.py b/src/scheduler/run_scheduler_sim_fair.py new file mode 100644 index 0000000..ca10cba --- /dev/null +++ b/src/scheduler/run_scheduler_sim_fair.py @@ -0,0 +1,72 @@ +import sys +sys.path.append('src/scheduler') +import Scheduler +sys.path.append('data') +import app_data_mobilenets as app_data +import pprint as pp +import numpy as np +import time +import zmq +from itertools import combinations, combinations_with_replacement +import os +import csv +import random +import argparse + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('num_apps_range', type=int) + parser.add_argument('x_vote', type=int, default=0) + parser.add_argument('outfile') + parser.add_argument('--agg', default='mean', help='agg (mean, min)') + parser.add_argument('-m', '--metric', default='f1') + parser.add_argument('-f', '--fairness', action='store_true') + + args = parser.parse_args() + num_apps_range = args.num_apps_range + x_vote = args.x_vote + outfile = args.outfile + '-mainstream-simulator' + min_metric = args.metric + fairness = args.fairness + agg = args.agg + + if agg == 'min': + agg_func = np.min + else: + agg_func = np.average + print agg, agg_func + + with open(outfile, "wb", 0) as f: + for num_apps in range(len(app_data.app_options), \ + num_apps_range+1, \ + len(app_data.app_options)): + # Get Schedule + apps = [] + + for i in range(1, num_apps + 1): + index = i % len(app_data.app_options) + app = dict(app_data.app_options[index]) + app["app_id"] = i + app["x_vote"] = x_vote + apps.append(app) + + s = Scheduler.Scheduler(min_metric, apps, app_data.video_desc, + app_data.model_desc, 0) + + metrics = [s.optimize_per_app(350)] + avg_metric = sum(metrics) / len(metrics) + rel_accs = s.get_relative_accuracies() + avg_rel_acc = agg_func(rel_accs) + print 'Rel accs: ', rel_accs + print 'FNRs:', s.metrics + print "Agg: {}, Num Apps: {}, FNR: {}, Avg Rel Acc: {}, Frozen: {}, FPS: {}".format(agg, num_apps, avg_metric, avg_rel_acc, s.num_frozen_list, s.target_fps_list) + + row = [ + str(num_apps), + str(round(avg_metric, 4)), + str(round(avg_rel_acc, 4)), + "_".join(map(str, s.metrics)), + "_".join(map(str, rel_accs)), + ] + line = ",".join(row) + "\n" + f.write(line) diff --git a/src/scheduler/run_scheduler_simulator.py b/src/scheduler/run_scheduler_simulator.py index b69d7a4..c0fb92d 100644 --- a/src/scheduler/run_scheduler_simulator.py +++ b/src/scheduler/run_scheduler_simulator.py @@ -24,6 +24,7 @@ def get_args(simulator=True): app_names = [app["name"] for app in app_data.app_options] parser.add_argument("-d", "--datasets", nargs='+', choices=app_names, required=True, help='provide one or multiple dataset names') parser.add_argument("-m", "--metric", default="f1") + parser.add_argument("-f", "--fairness", action='store_true') parser.add_argument("-x", "--x-vote", type=int, default=0) # For combinations parser.add_argument("-c", "--combs", action='store_true') @@ -39,6 +40,7 @@ def main(): all_apps = [app_data.apps_by_name[app_name] for app_name in args.datasets] x_vote = args.x_vote min_metric = args.metric + fairness = args.fairness if x_vote > 0: outfile = args.outfile_prefix + "-x" + str(x_vote) + "-mainstream-simulator" @@ -55,7 +57,7 @@ def main(): writer = csv.writer(f) for entry_id, app_ids in app_combs: apps = apps_from_ids(app_ids, all_apps, x_vote) - s, stats = run_simulator(min_metric, apps) + s, stats = run_simulator(min_metric, apps, fairness=args.fairness) writer.writerow(get_eval(entry_id, s, stats)) f.flush() @@ -117,14 +119,16 @@ def apps_hybrid(all_apps, num_apps_range): return list(zip(entry_ids, app_combinations)) -def run_simulator(min_metric, apps): +def run_simulator(min_metric, apps, fairness=None): s = Scheduler.Scheduler(min_metric, apps, app_data.video_desc, app_data.model_desc, 0) - stats = { - "metric": s.optimize_parameters(350), - "rel_accs": s.get_relative_accuracies(), - } + stats = {} + if fairness: + stats["metric"] = s.optimize_per_app(350) + else: + stats["metric"] = s.optimize_parameters(350) + stats["rel_accs"] = s.get_relative_accuracies() # Get streamer schedule sched = s.make_streamer_schedule() @@ -139,7 +143,7 @@ def run_simulator(min_metric, apps): def get_eval(entry_id, s, stats): if "metric" in stats: - print "(Metric: {metric}, FNR: {fnr}, FPR: {fpr} \n \ + print "(Metric(s): {metric}, FNR: {fnr}, FPR: {fpr} \n \ Frozen: {frozen}, FPS: {fps}, Cost: {cost}) ".format(**stats) else: print "(Observed FNR: {fnr}, FPR: {fpr} \n \ diff --git a/src/scheduler/scheduler_util.py b/src/scheduler/scheduler_util.py index 131533d..eca5704 100644 --- a/src/scheduler/scheduler_util.py +++ b/src/scheduler/scheduler_util.py @@ -55,10 +55,46 @@ def get_cost_schedule(schedule, layer_latencies, num_layers): return cost +def get_alloc_cost_per_app(apps, cost_threshold): + # Gets cost allocated per app with fairness + costs = {} + cost = cost_threshold // len(apps) + for app in apps: + costs[app["app_id"]] = cost + return costs + +def get_cost_per_app(apps, schedule, layer_latencies, num_layers): + # Gets cost of each app with stems weighted by the number of apps that stem + # from it + branch_points = list(set([unit.num_frozen for unit in schedule])) + branch_points.append(num_layers) + seg_start = 0 + costs = {} + for app in apps: + costs[app["app_id"]] = 0 + for seg_end in branch_points: + seg_latency = sum([layer_latencies[i] for i in range(seg_start, seg_end)]) + + apps_branched, apps_not_branched = get_apps_branched(schedule, seg_end) + seg_fps = 0 + not_branched_fpses = [unit.target_fps for unit in apps_not_branched] + idx = 0 + for app in apps_branched: + task_fps = apps_branched[idx].target_fps + idx += 1 + costs[app.app_id] += task_fps * seg_latency + for app in apps_not_branched: + base_fps = max(not_branched_fpses) + costs[app.app_id] += ((base_fps * seg_latency / len(apps_not_branched))) + + seg_start = seg_end + + return costs + def get_acc_dist(accuracy, sigma): # Make a distribution of accuracies, centered around accuracy value # Represents different accuracies for difference instances of events. - # E.g. a train classifier has 70% accuracy. But for trains at night, + # E.g. a train classifier has 70% accuracy. But for trains at night, # it's 60% accurate, and in the daytime 80% accurate num_events = 10000 acc_dist = [random.gauss(accuracy, sigma) for i in range(num_events)] diff --git a/src/scripts/run_fairness.sh b/src/scripts/run_fairness.sh new file mode 100644 index 0000000..2395a3d --- /dev/null +++ b/src/scripts/run_fairness.sh @@ -0,0 +1,13 @@ +#!/bin/bash +# Run fairness for F1-score +MAX_NUM_APPS=32 +METRIC=f1 +OUTFILE_PREFIX=../mainstream-analysis/output/streamer/scheduler/atc/$METRIC/$METRIC-fairness-4hybrid +# Archive old file +mv $OUTFILE_PREFIX-mainstream-simulator $OUTFILE_PREFIX-mainstream-simulator-`date +%Y%m%d-%H%M` +python src/scheduler/run_scheduler_simulator.py \ + $MAX_NUM_APPS \ + $OUTFILE_PREFIX \ + --metric $METRIC \ + --datasets pedestrian cars flowers cats \ + --fairness