diff --git a/distrib_rl/Agents/MARLAgent.py b/distrib_rl/Agents/MARLAgent.py index 841f58e..e878c4b 100644 --- a/distrib_rl/Agents/MARLAgent.py +++ b/distrib_rl/Agents/MARLAgent.py @@ -17,6 +17,7 @@ def __init__(self, cfg): self.current_ep_rew = 0 self.policies = None self.n_agents = cfg["rlgym"]["team_size"] * 2 if cfg["rlgym"]["spawn_opponents"] else cfg["rlgym"]["team_size"] + self.extra_logger = None @torch.no_grad() @@ -87,6 +88,9 @@ def gather_timesteps(self, policy, env, num_timesteps=None, num_seconds=None, nu self.leftover_obs = next_obs.copy() + if self.extra_logger is not None: + self.extra_logger.aggregate_data(steps=cumulative_timesteps) + for i in range(agents_to_save): trajectories[i].final_obs = next_obs[i] experience_trajectories.append(trajectories[i]) diff --git a/distrib_rl/Distrib/RedisKeys.py b/distrib_rl/Distrib/RedisKeys.py index 5b1bdbb..7f2196f 100644 --- a/distrib_rl/Distrib/RedisKeys.py +++ b/distrib_rl/Distrib/RedisKeys.py @@ -24,3 +24,5 @@ MARL_MATCH_RESULTS_KEY = "MARL_MATCH_RESULTS_KEY" MEAN_POLICY_REWARD_KEY = "MEAN_POLICY_REWARD_KEY" + +EXTRA_LOG_AGGREGATE_KEY = "EXTRA_LOG_AGGREGATE_KEY" \ No newline at end of file diff --git a/distrib_rl/Environments/Custom/RocketLeague/RLGymFactory.py b/distrib_rl/Environments/Custom/RocketLeague/RLGymFactory.py index 81c46bd..742405b 100644 --- a/distrib_rl/Environments/Custom/RocketLeague/RLGymFactory.py +++ b/distrib_rl/Environments/Custom/RocketLeague/RLGymFactory.py @@ -12,7 +12,7 @@ from distrib_rl.Environments.Custom.RocketLeague.TerminalConditionsFactory import build_terminal_conditions_from_config -def build_rlgym_from_config(config, existing_env=None): +def build_rlgym_from_config(config, existing_env=None, extra_logger=None): cfg = config["rlgym"] action_parser = DiscreteAction() @@ -55,6 +55,11 @@ def build_rlgym_from_config(config, existing_env=None): reward_fn = build_reward_fn_from_config(cfg["rewards"]) if cfg.get("terminal_conditions", False): terminal_conditions = build_terminal_conditions_from_config(cfg["terminal_conditions"]) + + if extra_logger is not None: + for fn in [reward_fn, obs_builder]: + if hasattr(fn, "_inject_extra_logger") and callable(fn._inject_extra_logger): + fn._inject_extra_logger(extra_logger) if existing_env: match = existing_env._match diff --git a/distrib_rl/PolicyOptimization/DistribPolicyGradients/Client.py b/distrib_rl/PolicyOptimization/DistribPolicyGradients/Client.py index 2d0e4f5..a41b2ff 100644 --- a/distrib_rl/PolicyOptimization/DistribPolicyGradients/Client.py +++ b/distrib_rl/PolicyOptimization/DistribPolicyGradients/Client.py @@ -118,8 +118,8 @@ def configure(self): self.cfg = self.client.get_cfg() self.env, self.experience, gradient_builder, policy_gradient_optimizer, value_gradient_optimizer, \ - self.agent, self.policy, self.strategy_optimizer, adaptive_omega, self.value_net, \ - novelty_gradient_optimizer, learner = Configurator.build_vars(self.cfg, existing_env=env) + self.agent, self.policy, self.strategy_optimizer, adaptive_omega, value_net, \ + novelty_gradient_optimizer, learner, extra_logger = Configurator.build_vars(self.cfg, existing_env=env) self.env.reset() self.transmit_env_spaces() diff --git a/distrib_rl/PolicyOptimization/DistribPolicyGradients/Configurator.py b/distrib_rl/PolicyOptimization/DistribPolicyGradients/Configurator.py index ffae531..cba95c6 100644 --- a/distrib_rl/PolicyOptimization/DistribPolicyGradients/Configurator.py +++ b/distrib_rl/PolicyOptimization/DistribPolicyGradients/Configurator.py @@ -6,6 +6,7 @@ from distrib_rl.Utils import AdaptiveOmega from distrib_rl.PolicyOptimization.Learners import * import distrib_rl.Environments.Custom +from distrib_rl.Utils.ExtraLog import ExtraLogger import gym import numpy as np import random @@ -16,17 +17,17 @@ np.random.seed(0) torch.manual_seed(0) -def build_env(cfg, existing_env=None): +def build_env(cfg, existing_env=None, extra_logger=None): env_name = cfg["env_id"].lower() if existing_env is None: if "rocket" in env_name: - from Environments.Custom.RocketLeague import RLGymFactory - env = RLGymFactory.build_rlgym_from_config(cfg) + from distrib_rl.Environments.Custom.RocketLeague import RLGymFactory + env = RLGymFactory.build_rlgym_from_config(cfg, extra_logger=extra_logger) else: env = gym.make(cfg["env_id"]) elif "rocket" in env_name: - from Environments.Custom.RocketLeague import RLGymFactory - env = RLGymFactory.build_rlgym_from_config(cfg, existing_env=existing_env) + from distrib_rl.Environments.Custom.RocketLeague import RLGymFactory + env = RLGymFactory.build_rlgym_from_config(cfg, existing_env=existing_env, extra_logger=extra_logger) else: env = existing_env @@ -38,8 +39,14 @@ def build_vars(cfg, existing_env=None, env_space_shapes=None): seed = cfg["seed"] cfg["rng"] = np.random.RandomState(seed) device = cfg["device"] + + if "extra_log" in cfg: + extra_logger = ExtraLogger(cfg["extra_log"]) + else: + extra_logger = None + if env_space_shapes is None: - env = build_env(cfg, existing_env=existing_env) + env = build_env(cfg, existing_env=existing_env, extra_logger=extra_logger) else: env = None @@ -50,6 +57,8 @@ def build_vars(cfg, existing_env=None, env_space_shapes=None): experience = ExperienceReplay(cfg) agent = AgentFactory.get_from_cfg(cfg) + if extra_logger is not None: + agent.extra_logger = extra_logger models = PolicyFactory.get_from_cfg(cfg, env=env, env_space_shapes=env_space_shapes) @@ -86,4 +95,4 @@ def build_vars(cfg, existing_env=None, env_space_shapes=None): # learner = PPONS(strategy_optimizer, cfg, policy, value_net, policy_gradient_optimizer, value_gradient_optimizer, gradient_builder, omega) return env, experience, gradient_builder, policy_gradient_optimizer, value_gradient_optimizer, agent, policy, \ - strategy_optimizer, omega, value_net, novelty_gradient_optimizer, learner + strategy_optimizer, omega, value_net, novelty_gradient_optimizer, learner, extra_logger diff --git a/distrib_rl/PolicyOptimization/DistribPolicyGradients/Server.py b/distrib_rl/PolicyOptimization/DistribPolicyGradients/Server.py index 72577c7..6cdfc90 100644 --- a/distrib_rl/PolicyOptimization/DistribPolicyGradients/Server.py +++ b/distrib_rl/PolicyOptimization/DistribPolicyGradients/Server.py @@ -19,6 +19,7 @@ def __init__(self): self.gradient_builder = None self.adaptive_omega = None self.policy_reward = None + self.extra_logger = None self.exp_manager = None self.experience = None self.wandb_run = None @@ -63,6 +64,10 @@ def step(self): self.server.redis.set(RedisKeys.RUNNING_REWARD_MEAN_KEY, float(self.exp_manager.rew_mean)) self.server.redis.set(RedisKeys.RUNNING_REWARD_STD_KEY, float(self.exp_manager.rew_std)) self.epoch_info["steps_per_second"] = int(round(self.exp_manager.steps_per_second)) + + if self.extra_logger is not None: + self.epoch_info["extra_log"] = self.extra_logger.pop_redis_mean_aggregates() + self.report_epoch() self.epoch_info.clear() @@ -222,7 +227,8 @@ def configure(self, cfg): self.adaptive_omega, \ self.value_net, \ self.novelty_gradient_optimizer, \ - self.learner = Configurator.build_vars(cfg, env_space_shapes=(in_shape, out_shape)) + self.learner, \ + self.extra_logger = Configurator.build_vars(cfg, env_space_shapes=(in_shape, out_shape)) print("Starting new experience manager...") self.exp_manager = ParallelExperienceManager(cfg) diff --git a/distrib_rl/Utils/ExtraLog.py b/distrib_rl/Utils/ExtraLog.py new file mode 100644 index 0000000..5b08d78 --- /dev/null +++ b/distrib_rl/Utils/ExtraLog.py @@ -0,0 +1,199 @@ +import numpy as np +from distrib_rl.Distrib import RedisClient, RedisKeys + +from datetime import datetime +import csv +from pathlib import Path +from time import sleep + +class ExtraLogger(): + def __init__(self, cfg): + + # How many times aggregate_data will be called before combining and sending values + self.report_every = cfg.get("rounds_per_aggregate", 60) + + values_cfg = cfg["values"] + self.keys = values_cfg.keys() + + self.agg_funcs = {} + self.per_funcs = {} + self.per_vals = {} + self.out_names = {} + + for key, k_cfg in values_cfg.items(): + agg = k_cfg.get("agg", None) + if agg is None or agg == "mean": + self.agg_funcs[key] = np.mean + self.out_names[key] = "" + elif agg == "mean_per": + self.agg_funcs[key] = np.sum + self.out_names[key] = "" + elif agg == "std": + self.agg_funcs[key] = np.std + self.out_names[key] = "std_" + elif agg == "rms": + self.agg_funcs[key] = _root_mean_square + self.out_names[key] = "rms_" + + per_value = None + if "per" in k_cfg: + self.per_funcs[key] = _per + per_value = k_cfg["per"] + per_name = f"{key}_per_{per_value}" + elif "inv_per" in k_cfg: + self.per_funcs[key] = _inv_per + per_value = k_cfg["inv_per"] + per_name = f"{per_value}_per_{key}" + + if per_value is None or per_value == "instance": + self.per_funcs[key] = None + self.out_names[key] += key + else: + self.per_vals[key] = per_value + self.out_names[key] += per_name + + self.reset() + + if cfg.get("log_to_csv", False): + self.csv_path = None + self.csv = True + + if "csv_path" in cfg: + self.csv_dir = Path(cfg["csv_path"]) + self.csv_dir.mkdir(exist_ok=True) + else: + self.csv_dir = Path(".") + else: + self.csv = False + + if cfg.get("wandb_via_redis", False): + self.redis_client = RedisClient() + self.redis_client.connect() + else: + self.redis_client = None + + + def reset(self): + self.running_data = { k: [] for k in self.keys } + self.running_per_vals = { k: 0 for k in set(self.per_vals.values())} + self.last_report = 1 + + + def log(self, key, data): + if key in self.keys: + self.running_data[key] += [ float(data) ] + + def log_multi(self, key_data_dict): + log_keys = key_data_dict.keys() + for key in set(self.keys) & log_keys: + self.running_data[key] += [ float(key_data_dict[key]) ] + + + def aggregate_data(self, **kwargs): + """ + Keywords expected to contain values to aggregate values against. + i.e. aggregate_data(steps=1000) for data with config {"per": "step"} + """ + + for k in kwargs.keys(): + if k in self.running_per_vals: + self.running_per_vals[k] += kwargs[k] + else: + # Debug, take out eventually + print(k, " sent in to logger and not used") + + if self.last_report < self.report_every: + self.last_report += 1 + return None + + agg_data = {} + for key in self.keys: + out_name = self.out_names[key] + data = self.running_data[key] + if len(data) == 0: + agg_data[out_name] = 0 + else: + x = self.agg_funcs[key](data) + if self.per_funcs[key] is not None: + per_value = self.per_vals[key] + x = self.per_funcs[key]( x, self.running_per_vals[per_value] ) + agg_data[out_name] = x + + if self.redis_client is not None: + self.redis_client.push_data(RedisKeys.EXTRA_LOG_AGGREGATE_KEY, agg_data) + + if self.csv: + self._write_to_csv(agg_data) + + self.reset() + + return agg_data + + + def pop_redis_mean_aggregates(self): + if self.redis_client is None: + return None + + results = self.redis_client.atomic_pop_all(RedisKeys.EXTRA_LOG_AGGREGATE_KEY) + if len(results) == 0: + return None + + combined_agg = {} + for res in results: + for k,v in res.items(): + if k not in combined_agg: + combined_agg[k] = [v] + else: + combined_agg[k] += [v] + + for k in combined_agg.keys(): + combined_agg[k] = np.mean(combined_agg[k]) + + return combined_agg + + + def _start_csv(self): + # Kept running into multiple processes grabbing the same files, even to the microsecond! Mix it up. + sleep(np.random.uniform(0,1)) + self.csv_path = self.csv_dir / datetime.now().strftime("%Y-%m-%d_%H-%M-%S.%f.csv") + + with open(self.csv_path, "a", newline='') as csvfile: + writer = csv.writer(csvfile) + row = ["datetime"] + [ self.out_names[k] for k in self.keys ] + writer.writerow(row) + + ## Apparently closing files is a big performance penalty in windows, lets try and leave it open (even if not ideal) + ## Update: This is mitigated now by only updating every N updates + ## https://gregoryszorc.com/blog/2015/10/22/append-i/o-performance-on-windows/ + # try: + # del self.writer + # self.cur_file.close() + # except: + # pass + + # self.csv_file = open(self.csv_path, "a", newline='') + # self.writer = csv.writer(self.csv_file) + # row = ["datetime"] + [ self.out_names[k] for k in self.keys ] + # self.writer.writerow(row) + + def _write_to_csv(self, agg_data): + if (self.csv_path is None) or (not self.csv_path.exists()): + self._start_csv() + + row = [ datetime.now() ] + [ agg_data[self.out_names[k]] for k in self.keys ] + + with open(self.csv_path, "a", newline='') as csvfile: + writer = csv.writer(csvfile) + writer.writerow(row) + + # self.writer.writerow(row) + + +def _per(x, per_value): + return x / per_value + +def _inv_per(x, per_value): + return per_value / x + +def _root_mean_square(x): + return np.sqrt(np.mean(np.square(x))) \ No newline at end of file