From 50fe8444bc0e77486d720c7f3c6652377cabb2c3 Mon Sep 17 00:00:00 2001 From: Ian Cunnyngham Date: Wed, 8 Jun 2022 13:58:42 -1000 Subject: [PATCH 1/4] Initial version of ExtraLog which just logs aggregated stats to a local csv for now --- distrib_rl/Agents/MARLAgent.py | 4 + .../Custom/RocketLeague/RLGymFactory.py | 6 +- .../DistribPolicyGradients/Configurator.py | 22 ++-- distrib_rl/Utils/ExtraLog.py | 103 ++++++++++++++++++ 4 files changed, 127 insertions(+), 8 deletions(-) create mode 100644 distrib_rl/Utils/ExtraLog.py diff --git a/distrib_rl/Agents/MARLAgent.py b/distrib_rl/Agents/MARLAgent.py index 841f58e..e878c4b 100644 --- a/distrib_rl/Agents/MARLAgent.py +++ b/distrib_rl/Agents/MARLAgent.py @@ -17,6 +17,7 @@ def __init__(self, cfg): self.current_ep_rew = 0 self.policies = None self.n_agents = cfg["rlgym"]["team_size"] * 2 if cfg["rlgym"]["spawn_opponents"] else cfg["rlgym"]["team_size"] + self.extra_logger = None @torch.no_grad() @@ -87,6 +88,9 @@ def gather_timesteps(self, policy, env, num_timesteps=None, num_seconds=None, nu self.leftover_obs = next_obs.copy() + if self.extra_logger is not None: + self.extra_logger.aggregate_data(steps=cumulative_timesteps) + for i in range(agents_to_save): trajectories[i].final_obs = next_obs[i] experience_trajectories.append(trajectories[i]) diff --git a/distrib_rl/Environments/Custom/RocketLeague/RLGymFactory.py b/distrib_rl/Environments/Custom/RocketLeague/RLGymFactory.py index 81c46bd..d18b51b 100644 --- a/distrib_rl/Environments/Custom/RocketLeague/RLGymFactory.py +++ b/distrib_rl/Environments/Custom/RocketLeague/RLGymFactory.py @@ -12,7 +12,7 @@ from distrib_rl.Environments.Custom.RocketLeague.TerminalConditionsFactory import build_terminal_conditions_from_config -def build_rlgym_from_config(config, existing_env=None): +def build_rlgym_from_config(config, existing_env=None, extra_logger=None): cfg = config["rlgym"] action_parser = DiscreteAction() @@ -55,6 +55,10 @@ def build_rlgym_from_config(config, existing_env=None): reward_fn = build_reward_fn_from_config(cfg["rewards"]) if cfg.get("terminal_conditions", False): terminal_conditions = build_terminal_conditions_from_config(cfg["terminal_conditions"]) + + if extra_logger is not None: + if hasattr(reward_fn, "_inject_extra_logger") and callable(reward_fn._inject_extra_logger): + reward_fn._inject_extra_logger(extra_logger) if existing_env: match = existing_env._match diff --git a/distrib_rl/PolicyOptimization/DistribPolicyGradients/Configurator.py b/distrib_rl/PolicyOptimization/DistribPolicyGradients/Configurator.py index ffae531..5b9d5dc 100644 --- a/distrib_rl/PolicyOptimization/DistribPolicyGradients/Configurator.py +++ b/distrib_rl/PolicyOptimization/DistribPolicyGradients/Configurator.py @@ -5,7 +5,7 @@ from distrib_rl.Strategy import StrategyOptimizer from distrib_rl.Utils import AdaptiveOmega from distrib_rl.PolicyOptimization.Learners import * -import distrib_rl.Environments.Custom +from distrib_rl.Utils.ExtraLog import ExtraLogClient import gym import numpy as np import random @@ -16,17 +16,17 @@ np.random.seed(0) torch.manual_seed(0) -def build_env(cfg, existing_env=None): +def build_env(cfg, existing_env=None, extra_logger=None): env_name = cfg["env_id"].lower() if existing_env is None: if "rocket" in env_name: - from Environments.Custom.RocketLeague import RLGymFactory - env = RLGymFactory.build_rlgym_from_config(cfg) + from distrib_rl.Environments.Custom.RocketLeague import RLGymFactory + env = RLGymFactory.build_rlgym_from_config(cfg, extra_logger=extra_logger) else: env = gym.make(cfg["env_id"]) elif "rocket" in env_name: - from Environments.Custom.RocketLeague import RLGymFactory - env = RLGymFactory.build_rlgym_from_config(cfg, existing_env=existing_env) + from distrib_rl.Environments.Custom.RocketLeague import RLGymFactory + env = RLGymFactory.build_rlgym_from_config(cfg, existing_env=existing_env, extra_logger=extra_logger) else: env = existing_env @@ -38,8 +38,14 @@ def build_vars(cfg, existing_env=None, env_space_shapes=None): seed = cfg["seed"] cfg["rng"] = np.random.RandomState(seed) device = cfg["device"] + + if "extra_logging" in cfg: + extra_logger = ExtraLogClient(cfg["extra_logging"]) + else: + extra_logger = None + if env_space_shapes is None: - env = build_env(cfg, existing_env=existing_env) + env = build_env(cfg, existing_env=existing_env, extra_logger=extra_logger) else: env = None @@ -50,6 +56,8 @@ def build_vars(cfg, existing_env=None, env_space_shapes=None): experience = ExperienceReplay(cfg) agent = AgentFactory.get_from_cfg(cfg) + if extra_logger is not None: + agent.extra_logger = extra_logger models = PolicyFactory.get_from_cfg(cfg, env=env, env_space_shapes=env_space_shapes) diff --git a/distrib_rl/Utils/ExtraLog.py b/distrib_rl/Utils/ExtraLog.py new file mode 100644 index 0000000..84bf55a --- /dev/null +++ b/distrib_rl/Utils/ExtraLog.py @@ -0,0 +1,103 @@ +import numpy as np + +DEBUG_WRITE_TO_CSV = True + +if DEBUG_WRITE_TO_CSV: + from datetime import datetime + import csv + +class ExtraLogClient(): + def __init__(self, cfg): + self.keys = cfg.keys() + self.reset() + + self.agg_funcs = {} + self.per_funcs = {} + self.per_vals = {} + self.out_names = {} + + for key, k_cfg in cfg.items(): + agg = k_cfg.get("agg", None) + if agg is None or agg == "mean": + self.agg_funcs[key] = np.mean + self.out_names[key] = "mean_" + elif agg == "std": + self.agg_funcs[key] = np.std + self.out_names[key] = "std_" + elif agg == "rms": + self.agg_funcs[key] = _root_mean_square + self.out_names[key] = "rms_" + + per_value = None + if "per" in k_cfg: + self.per_funcs[key] = _per + per_value = k_cfg["per"] + per_name = f"{key}_per_{per_value}" + elif "inv_per" in k_cfg: + self.per_funcs[key] = _inv_per + per_value = k_cfg["inv_per"] + per_name = f"{per_value}_per_{key}" + + if per_value is None or per_value == "instance": + self.per_funcs[key] = None + self.out_names[key] += key + else: + self.per_vals[key] = per_value + self.out_names[key] += per_name + + + if DEBUG_WRITE_TO_CSV: + self.csv_path = datetime.now().strftime("%Y-%m-%d_%H-%M-%S.csv") + with open(self.csv_path, "a", newline='') as csvfile: + writer = csv.writer(csvfile) + row = [ self.out_names[k] for k in self.keys ] + writer.writerow(row) + + def reset(self): + self.running_data = { k: [] for k in self.keys } + + def log(self, key, data): + self.running_data[key] += [ float(data) ] + + def aggregate_data(self, **kwargs): + """ + Keywords expected to contain values to aggregate values against. + i.e. aggregate_data(steps=1000) for data with config {"per": "step"} + """ + + agg_data = {} + for key in self.keys: + out_name = self.out_names[key] + data = self.running_data[key] + if len(data) == 0: + agg_data[out_name] = 0 + else: + x = self.agg_funcs[key](data) + if self.per_funcs[key] is not None: + per_value = self.per_vals[key] + x = self.per_funcs[key]( x, kwargs[per_value] ) + agg_data[out_name] = x + + if DEBUG_WRITE_TO_CSV: + self._write_to_csv(agg_data) + + self.reset() + + return agg_data + + def _write_to_csv(self, agg_data): + with open(self.csv_path, "a", newline='') as csvfile: + # writer = csv.DictWriter(csvfile, fieldnames=agg_data.keys()) + # writer.writerow(agg_data) + writer = csv.writer(csvfile) + row = [ agg_data[self.out_names[k]] for k in self.keys ] + writer.writerow(row) + +def _per(x, per_value): + return x / per_value + +def _inv_per(x, per_value): + return per_value / x + +def _root_mean_square(x): + return np.sqrt(np.mean(np.square(x))) \ No newline at end of file From e12542578a749f9ec042dfaf5c93279060c86aeb Mon Sep 17 00:00:00 2001 From: Ian Cunnyngham Date: Sat, 18 Jun 2022 08:30:43 -1000 Subject: [PATCH 2/4] Added aggregating via Redis and logging to wandb. Tons of turd polishing for csvs --- distrib_rl/Distrib/RedisKeys.py | 2 + .../Custom/RocketLeague/RLGymFactory.py | 5 +- .../DistribPolicyGradients/Client.py | 4 +- .../DistribPolicyGradients/Configurator.py | 8 +- .../DistribPolicyGradients/Server.py | 8 +- distrib_rl/Utils/ExtraLog.py | 142 +++++++++++++++--- 6 files changed, 136 insertions(+), 33 deletions(-) diff --git a/distrib_rl/Distrib/RedisKeys.py b/distrib_rl/Distrib/RedisKeys.py index 5b1bdbb..7f2196f 100644 --- a/distrib_rl/Distrib/RedisKeys.py +++ b/distrib_rl/Distrib/RedisKeys.py @@ -24,3 +24,5 @@ MARL_MATCH_RESULTS_KEY = "MARL_MATCH_RESULTS_KEY" MEAN_POLICY_REWARD_KEY = "MEAN_POLICY_REWARD_KEY" + +EXTRA_LOG_AGGREGATE_KEY = "EXTRA_LOG_AGGREGATE_KEY" \ No newline at end of file diff --git a/distrib_rl/Environments/Custom/RocketLeague/RLGymFactory.py b/distrib_rl/Environments/Custom/RocketLeague/RLGymFactory.py index d18b51b..742405b 100644 --- a/distrib_rl/Environments/Custom/RocketLeague/RLGymFactory.py +++ b/distrib_rl/Environments/Custom/RocketLeague/RLGymFactory.py @@ -57,8 +57,9 @@ def build_rlgym_from_config(config, existing_env=None, extra_logger=None): terminal_conditions = build_terminal_conditions_from_config(cfg["terminal_conditions"]) if extra_logger is not None: - if hasattr(reward_fn, "_inject_extra_logger") and callable(reward_fn._inject_extra_logger): - reward_fn._inject_extra_logger(extra_logger) + for fn in [reward_fn, obs_builder]: + if hasattr(fn, "_inject_extra_logger") and callable(fn._inject_extra_logger): + fn._inject_extra_logger(extra_logger) if existing_env: match = existing_env._match diff --git a/distrib_rl/PolicyOptimization/DistribPolicyGradients/Client.py b/distrib_rl/PolicyOptimization/DistribPolicyGradients/Client.py index 6ebdd88..81d8a94 100644 --- a/distrib_rl/PolicyOptimization/DistribPolicyGradients/Client.py +++ b/distrib_rl/PolicyOptimization/DistribPolicyGradients/Client.py @@ -117,8 +117,8 @@ def configure(self): self.cfg = self.client.get_cfg() self.env, self.experience, gradient_builder, policy_gradient_optimizer, value_gradient_optimizer, \ - self.agent, self.policy, self.strategy_optimizer, adaptive_omega, self.value_net, \ - novelty_gradient_optimizer, learner = Configurator.build_vars(self.cfg, existing_env=env) + self.agent, self.policy, self.strategy_optimizer, adaptive_omega, value_net, \ + novelty_gradient_optimizer, learner, extra_logger = Configurator.build_vars(self.cfg, existing_env=env) self.env.reset() self.transmit_env_spaces() diff --git a/distrib_rl/PolicyOptimization/DistribPolicyGradients/Configurator.py b/distrib_rl/PolicyOptimization/DistribPolicyGradients/Configurator.py index 5b9d5dc..21ae68a 100644 --- a/distrib_rl/PolicyOptimization/DistribPolicyGradients/Configurator.py +++ b/distrib_rl/PolicyOptimization/DistribPolicyGradients/Configurator.py @@ -5,7 +5,7 @@ from distrib_rl.Strategy import StrategyOptimizer from distrib_rl.Utils import AdaptiveOmega from distrib_rl.PolicyOptimization.Learners import * -from distrib_rl.Utils.ExtraLog import ExtraLogClient +from distrib_rl.Utils.ExtraLog import ExtraLogger import gym import numpy as np import random @@ -39,8 +39,8 @@ def build_vars(cfg, existing_env=None, env_space_shapes=None): cfg["rng"] = np.random.RandomState(seed) device = cfg["device"] - if "extra_logging" in cfg: - extra_logger = ExtraLogClient(cfg["extra_logging"]) + if "extra_log" in cfg: + extra_logger = ExtraLogger(cfg["extra_log"]) else: extra_logger = None @@ -94,4 +94,4 @@ def build_vars(cfg, existing_env=None, env_space_shapes=None): # learner = PPONS(strategy_optimizer, cfg, policy, value_net, policy_gradient_optimizer, value_gradient_optimizer, gradient_builder, omega) return env, experience, gradient_builder, policy_gradient_optimizer, value_gradient_optimizer, agent, policy, \ - strategy_optimizer, omega, value_net, novelty_gradient_optimizer, learner + strategy_optimizer, omega, value_net, novelty_gradient_optimizer, learner, extra_logger diff --git a/distrib_rl/PolicyOptimization/DistribPolicyGradients/Server.py b/distrib_rl/PolicyOptimization/DistribPolicyGradients/Server.py index 67a785d..c13098f 100644 --- a/distrib_rl/PolicyOptimization/DistribPolicyGradients/Server.py +++ b/distrib_rl/PolicyOptimization/DistribPolicyGradients/Server.py @@ -19,6 +19,7 @@ def __init__(self): self.gradient_builder = None self.adaptive_omega = None self.policy_reward = None + self.extra_logger = None self.exp_manager = None self.experience = None self.wandb_run = None @@ -63,6 +64,10 @@ def step(self): self.server.redis.set(RedisKeys.RUNNING_REWARD_MEAN_KEY, float(self.exp_manager.rew_mean)) self.server.redis.set(RedisKeys.RUNNING_REWARD_STD_KEY, float(self.exp_manager.rew_std)) self.epoch_info["steps_per_second"] = int(round(self.exp_manager.steps_per_second)) + + if self.extra_logger is not None: + self.epoch_info["extra_log"] = self.extra_logger.pop_redis_mean_aggregates() + self.report_epoch() self.epoch_info.clear() @@ -212,7 +217,8 @@ def configure(self, cfg): self.adaptive_omega, \ self.value_net, \ self.novelty_gradient_optimizer, \ - self.learner = Configurator.build_vars(cfg, env_space_shapes=(in_shape, out_shape)) + self.learner, \ + self.extra_logger = Configurator.build_vars(cfg, env_space_shapes=(in_shape, out_shape)) print("Starting new experience manager...") self.exp_manager = ParallelExperienceManager(cfg) diff --git a/distrib_rl/Utils/ExtraLog.py b/distrib_rl/Utils/ExtraLog.py index 84bf55a..8bd894c 100644 --- a/distrib_rl/Utils/ExtraLog.py +++ b/distrib_rl/Utils/ExtraLog.py @@ -1,26 +1,33 @@ import numpy as np +from distrib_rl.Distrib import RedisClient, RedisKeys -DEBUG_WRITE_TO_CSV = True +from datetime import datetime +import csv +from pathlib import Path +from time import sleep -if DEBUG_WRITE_TO_CSV: - from datetime import datetime - import csv - -class ExtraLogClient(): +class ExtraLogger(): def __init__(self, cfg): - self.keys = cfg.keys() - self.reset() + + # How many times aggregate_data will be called before combining and sending values + self.report_every = cfg.get("rounds_per_aggregate", 60) + + values_cfg = cfg["values"] + self.keys = values_cfg.keys() self.agg_funcs = {} self.per_funcs = {} self.per_vals = {} self.out_names = {} - for key, k_cfg in cfg.items(): + for key, k_cfg in values_cfg.items(): agg = k_cfg.get("agg", None) if agg is None or agg == "mean": self.agg_funcs[key] = np.mean - self.out_names[key] = "mean_" + self.out_names[key] = "" + elif agg == "mean_per": + self.agg_funcs[key] = np.sum + self.out_names[key] = "" elif agg == "std": self.agg_funcs[key] = np.std self.out_names[key] = "std_" @@ -45,19 +52,40 @@ def __init__(self, cfg): self.per_vals[key] = per_value self.out_names[key] += per_name - - if DEBUG_WRITE_TO_CSV: - self.csv_path = datetime.now().strftime("%Y-%m-%d_%H-%M-%S.csv") - with open(self.csv_path, "a", newline='') as csvfile: - writer = csv.writer(csvfile) - row = [ self.out_names[k] for k in self.keys ] - writer.writerow(row) + self.reset() + + if cfg.get("log_to_csv", False): + self.csv_path = None + self.csv = True + + if "csv_path" in cfg: + self.csv_dir = Path(cfg["csv_path"]) + self.csv_dir.mkdir(exist_ok=True) + else: + self.csv_dir = Path(".") + + if cfg.get("wandb_via_redis", False): + self.redis_client = RedisClient() + self.redis_client.connect() + else: + self.redis_client = None + def reset(self): self.running_data = { k: [] for k in self.keys } + self.running_per_vals = { k: 0 for k in set(self.per_vals.values())} + self.last_report = 1 + def log(self, key, data): - self.running_data[key] += [ float(data) ] + if key in self.keys: + self.running_data[key] += [ float(data) ] + + def log_multi(self, key_data_dict): + log_keys = key_data_dict.keys() + for key in set(self.keys) & log_keys: + self.running_data[key] += [ float(key_data_dict[key]) ] + def aggregate_data(self, **kwargs): """ @@ -65,6 +93,17 @@ def aggregate_data(self, **kwargs): i.e. aggregate_data(steps=1000) for data with config {"per": "step"} """ + for k in kwargs.keys(): + if k in self.running_per_vals: + self.running_per_vals[k] += kwargs[k] + else: + # Debug, take out eventually + print(k, " sent in to logger and not used") + + if self.last_report < self.report_every: + self.last_report += 1 + return None + agg_data = {} for key in self.keys: out_name = self.out_names[key] @@ -75,24 +114,79 @@ def aggregate_data(self, **kwargs): x = self.agg_funcs[key](data) if self.per_funcs[key] is not None: per_value = self.per_vals[key] - x = self.per_funcs[key]( x, kwargs[per_value] ) + x = self.per_funcs[key]( x, self.running_per_vals[per_value] ) agg_data[out_name] = x - if DEBUG_WRITE_TO_CSV: - self._write_to_csv(agg_data) + if self.redis_client is not None: + self.redis_client.push_data(RedisKeys.EXTRA_LOG_AGGREGATE_KEY, agg_data) + if self.csv: + self._write_to_csv(agg_data) + self.reset() return agg_data + + + def pop_redis_mean_aggregates(self): + if self.redis_client is None: + return None + + results = self.redis_client.atomic_pop_all(RedisKeys.EXTRA_LOG_AGGREGATE_KEY) + if len(results) == 0: + return None + + combined_agg = {} + for res in results: + for k,v in res.items(): + if k not in combined_agg: + combined_agg[k] = [v] + else: + combined_agg[k] += [v] + + for k in combined_agg.keys(): + combined_agg[k] = np.mean(combined_agg[k]) + + return combined_agg + + def _start_csv(self): + # Kept running into multiple processes grabbing the same files, even to the microsecond! Mix it up. + sleep(np.random.uniform(0,1)) + self.csv_path = self.csv_dir / datetime.now().strftime("%Y-%m-%d_%H-%M-%S.%f.csv") + + with open(self.csv_path, "a", newline='') as csvfile: + writer = csv.writer(csvfile) + row = ["datetime"] + [ self.out_names[k] for k in self.keys ] + writer.writerow(row) + + ## Apparently closing files is a big performance penalty in windows, lets try and leave it open (even if not ideal) + ## Update: This is mitigated now by only updating every N updates + ## https://gregoryszorc.com/blog/2015/10/22/append-i/o-performance-on-windows/ + # try: + # del self.writer + # self.cur_file.close() + # except: + # pass + + # self.csv_file = open(self.csv_path, "a", newline='') + # self.writer = csv.writer(self.csv_file) + # row = ["datetime"] + [ self.out_names[k] for k in self.keys ] + # self.writer.writerow(row) + def _write_to_csv(self, agg_data): + if (self.csv_path is None) or (not self.csv_path.exists()): + self._start_csv() + + row = [ datetime.now() ] + [ agg_data[self.out_names[k]] for k in self.keys ] + with open(self.csv_path, "a", newline='') as csvfile: - # writer = csv.DictWriter(csvfile, fieldnames=agg_data.keys()) - # writer.writerow(agg_data) writer = csv.writer(csvfile) - row = [ agg_data[self.out_names[k]] for k in self.keys ] writer.writerow(row) + # self.writer.writerow(row) + + def _per(x, per_value): return x / per_value From 0394fca4f3760864eb36c1abb637ba7a58770bdb Mon Sep 17 00:00:00 2001 From: Ian Cunnyngham Date: Sat, 18 Jun 2022 09:53:20 -1000 Subject: [PATCH 3/4] fix random line delete from merge --- .../PolicyOptimization/DistribPolicyGradients/Configurator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distrib_rl/PolicyOptimization/DistribPolicyGradients/Configurator.py b/distrib_rl/PolicyOptimization/DistribPolicyGradients/Configurator.py index 21ae68a..cba95c6 100644 --- a/distrib_rl/PolicyOptimization/DistribPolicyGradients/Configurator.py +++ b/distrib_rl/PolicyOptimization/DistribPolicyGradients/Configurator.py @@ -5,6 +5,7 @@ from distrib_rl.Strategy import StrategyOptimizer from distrib_rl.Utils import AdaptiveOmega from distrib_rl.PolicyOptimization.Learners import * +import distrib_rl.Environments.Custom from distrib_rl.Utils.ExtraLog import ExtraLogger import gym import numpy as np From e0d13754caceec5b55a9b506b1b1fd252e191bd5 Mon Sep 17 00:00:00 2001 From: Ian Cunnyngham Date: Sat, 18 Jun 2022 13:32:12 -1000 Subject: [PATCH 4/4] oops --- distrib_rl/Utils/ExtraLog.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distrib_rl/Utils/ExtraLog.py b/distrib_rl/Utils/ExtraLog.py index 8bd894c..5b08d78 100644 --- a/distrib_rl/Utils/ExtraLog.py +++ b/distrib_rl/Utils/ExtraLog.py @@ -63,6 +63,8 @@ def __init__(self, cfg): self.csv_dir.mkdir(exist_ok=True) else: self.csv_dir = Path(".") + else: + self.csv = False if cfg.get("wandb_via_redis", False): self.redis_client = RedisClient()