Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions distrib_rl/Agents/MARLAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self, cfg):
self.current_ep_rew = 0
self.policies = None
self.n_agents = cfg["rlgym"]["team_size"] * 2 if cfg["rlgym"]["spawn_opponents"] else cfg["rlgym"]["team_size"]
self.extra_logger = None


@torch.no_grad()
Expand Down Expand Up @@ -87,6 +88,9 @@ def gather_timesteps(self, policy, env, num_timesteps=None, num_seconds=None, nu

self.leftover_obs = next_obs.copy()

if self.extra_logger is not None:
self.extra_logger.aggregate_data(steps=cumulative_timesteps)

for i in range(agents_to_save):
trajectories[i].final_obs = next_obs[i]
experience_trajectories.append(trajectories[i])
Expand Down
2 changes: 2 additions & 0 deletions distrib_rl/Distrib/RedisKeys.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@
MARL_MATCH_RESULTS_KEY = "MARL_MATCH_RESULTS_KEY"

MEAN_POLICY_REWARD_KEY = "MEAN_POLICY_REWARD_KEY"

EXTRA_LOG_AGGREGATE_KEY = "EXTRA_LOG_AGGREGATE_KEY"
7 changes: 6 additions & 1 deletion distrib_rl/Environments/Custom/RocketLeague/RLGymFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from distrib_rl.Environments.Custom.RocketLeague.TerminalConditionsFactory import build_terminal_conditions_from_config


def build_rlgym_from_config(config, existing_env=None):
def build_rlgym_from_config(config, existing_env=None, extra_logger=None):
cfg = config["rlgym"]

action_parser = DiscreteAction()
Expand Down Expand Up @@ -55,6 +55,11 @@ def build_rlgym_from_config(config, existing_env=None):
reward_fn = build_reward_fn_from_config(cfg["rewards"])
if cfg.get("terminal_conditions", False):
terminal_conditions = build_terminal_conditions_from_config(cfg["terminal_conditions"])

if extra_logger is not None:
for fn in [reward_fn, obs_builder]:
if hasattr(fn, "_inject_extra_logger") and callable(fn._inject_extra_logger):
fn._inject_extra_logger(extra_logger)

if existing_env:
match = existing_env._match
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ def configure(self):
self.cfg = self.client.get_cfg()

self.env, self.experience, gradient_builder, policy_gradient_optimizer, value_gradient_optimizer, \
self.agent, self.policy, self.strategy_optimizer, adaptive_omega, self.value_net, \
novelty_gradient_optimizer, learner = Configurator.build_vars(self.cfg, existing_env=env)
self.agent, self.policy, self.strategy_optimizer, adaptive_omega, value_net, \
novelty_gradient_optimizer, learner, extra_logger = Configurator.build_vars(self.cfg, existing_env=env)

self.env.reset()
self.transmit_env_spaces()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from distrib_rl.Utils import AdaptiveOmega
from distrib_rl.PolicyOptimization.Learners import *
import distrib_rl.Environments.Custom
from distrib_rl.Utils.ExtraLog import ExtraLogger
import gym
import numpy as np
import random
Expand All @@ -16,17 +17,17 @@
np.random.seed(0)
torch.manual_seed(0)

def build_env(cfg, existing_env=None):
def build_env(cfg, existing_env=None, extra_logger=None):
env_name = cfg["env_id"].lower()
if existing_env is None:
if "rocket" in env_name:
from Environments.Custom.RocketLeague import RLGymFactory
env = RLGymFactory.build_rlgym_from_config(cfg)
from distrib_rl.Environments.Custom.RocketLeague import RLGymFactory
env = RLGymFactory.build_rlgym_from_config(cfg, extra_logger=extra_logger)
else:
env = gym.make(cfg["env_id"])
elif "rocket" in env_name:
from Environments.Custom.RocketLeague import RLGymFactory
env = RLGymFactory.build_rlgym_from_config(cfg, existing_env=existing_env)
from distrib_rl.Environments.Custom.RocketLeague import RLGymFactory
env = RLGymFactory.build_rlgym_from_config(cfg, existing_env=existing_env, extra_logger=extra_logger)
else:
env = existing_env

Expand All @@ -38,8 +39,14 @@ def build_vars(cfg, existing_env=None, env_space_shapes=None):
seed = cfg["seed"]
cfg["rng"] = np.random.RandomState(seed)
device = cfg["device"]

if "extra_log" in cfg:
extra_logger = ExtraLogger(cfg["extra_log"])
else:
extra_logger = None

if env_space_shapes is None:
env = build_env(cfg, existing_env=existing_env)
env = build_env(cfg, existing_env=existing_env, extra_logger=extra_logger)
else:
env = None

Expand All @@ -50,6 +57,8 @@ def build_vars(cfg, existing_env=None, env_space_shapes=None):

experience = ExperienceReplay(cfg)
agent = AgentFactory.get_from_cfg(cfg)
if extra_logger is not None:
agent.extra_logger = extra_logger

models = PolicyFactory.get_from_cfg(cfg, env=env, env_space_shapes=env_space_shapes)

Expand Down Expand Up @@ -86,4 +95,4 @@ def build_vars(cfg, existing_env=None, env_space_shapes=None):
# learner = PPONS(strategy_optimizer, cfg, policy, value_net, policy_gradient_optimizer, value_gradient_optimizer, gradient_builder, omega)

return env, experience, gradient_builder, policy_gradient_optimizer, value_gradient_optimizer, agent, policy, \
strategy_optimizer, omega, value_net, novelty_gradient_optimizer, learner
strategy_optimizer, omega, value_net, novelty_gradient_optimizer, learner, extra_logger
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def __init__(self):
self.gradient_builder = None
self.adaptive_omega = None
self.policy_reward = None
self.extra_logger = None
self.exp_manager = None
self.experience = None
self.wandb_run = None
Expand Down Expand Up @@ -63,6 +64,10 @@ def step(self):
self.server.redis.set(RedisKeys.RUNNING_REWARD_MEAN_KEY, float(self.exp_manager.rew_mean))
self.server.redis.set(RedisKeys.RUNNING_REWARD_STD_KEY, float(self.exp_manager.rew_std))
self.epoch_info["steps_per_second"] = int(round(self.exp_manager.steps_per_second))

if self.extra_logger is not None:
self.epoch_info["extra_log"] = self.extra_logger.pop_redis_mean_aggregates()

self.report_epoch()

self.epoch_info.clear()
Expand Down Expand Up @@ -222,7 +227,8 @@ def configure(self, cfg):
self.adaptive_omega, \
self.value_net, \
self.novelty_gradient_optimizer, \
self.learner = Configurator.build_vars(cfg, env_space_shapes=(in_shape, out_shape))
self.learner, \
self.extra_logger = Configurator.build_vars(cfg, env_space_shapes=(in_shape, out_shape))

print("Starting new experience manager...")
self.exp_manager = ParallelExperienceManager(cfg)
Expand Down
199 changes: 199 additions & 0 deletions distrib_rl/Utils/ExtraLog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import numpy as np
from distrib_rl.Distrib import RedisClient, RedisKeys

from datetime import datetime
import csv
from pathlib import Path
from time import sleep

class ExtraLogger():
def __init__(self, cfg):

# How many times aggregate_data will be called before combining and sending values
self.report_every = cfg.get("rounds_per_aggregate", 60)

values_cfg = cfg["values"]
self.keys = values_cfg.keys()

self.agg_funcs = {}
self.per_funcs = {}
self.per_vals = {}
self.out_names = {}

for key, k_cfg in values_cfg.items():
agg = k_cfg.get("agg", None)
if agg is None or agg == "mean":
self.agg_funcs[key] = np.mean
self.out_names[key] = ""
elif agg == "mean_per":
self.agg_funcs[key] = np.sum
self.out_names[key] = ""
elif agg == "std":
self.agg_funcs[key] = np.std
self.out_names[key] = "std_"
elif agg == "rms":
self.agg_funcs[key] = _root_mean_square
self.out_names[key] = "rms_"

per_value = None
if "per" in k_cfg:
self.per_funcs[key] = _per
per_value = k_cfg["per"]
per_name = f"{key}_per_{per_value}"
elif "inv_per" in k_cfg:
self.per_funcs[key] = _inv_per
per_value = k_cfg["inv_per"]
per_name = f"{per_value}_per_{key}"

if per_value is None or per_value == "instance":
self.per_funcs[key] = None
self.out_names[key] += key
else:
self.per_vals[key] = per_value
self.out_names[key] += per_name

self.reset()

if cfg.get("log_to_csv", False):
self.csv_path = None
self.csv = True

if "csv_path" in cfg:
self.csv_dir = Path(cfg["csv_path"])
self.csv_dir.mkdir(exist_ok=True)
else:
self.csv_dir = Path(".")
else:
self.csv = False

if cfg.get("wandb_via_redis", False):
self.redis_client = RedisClient()
self.redis_client.connect()
else:
self.redis_client = None


def reset(self):
self.running_data = { k: [] for k in self.keys }
self.running_per_vals = { k: 0 for k in set(self.per_vals.values())}
self.last_report = 1


def log(self, key, data):
if key in self.keys:
self.running_data[key] += [ float(data) ]

def log_multi(self, key_data_dict):
log_keys = key_data_dict.keys()
for key in set(self.keys) & log_keys:
self.running_data[key] += [ float(key_data_dict[key]) ]


def aggregate_data(self, **kwargs):
"""
Keywords expected to contain values to aggregate values against.
i.e. aggregate_data(steps=1000) for data with config {"per": "step"}
"""

for k in kwargs.keys():
if k in self.running_per_vals:
self.running_per_vals[k] += kwargs[k]
else:
# Debug, take out eventually
print(k, " sent in to logger and not used")

if self.last_report < self.report_every:
self.last_report += 1
return None

agg_data = {}
for key in self.keys:
out_name = self.out_names[key]
data = self.running_data[key]
if len(data) == 0:
agg_data[out_name] = 0
else:
x = self.agg_funcs[key](data)
if self.per_funcs[key] is not None:
per_value = self.per_vals[key]
x = self.per_funcs[key]( x, self.running_per_vals[per_value] )
agg_data[out_name] = x

if self.redis_client is not None:
self.redis_client.push_data(RedisKeys.EXTRA_LOG_AGGREGATE_KEY, agg_data)

if self.csv:
self._write_to_csv(agg_data)

self.reset()

return agg_data


def pop_redis_mean_aggregates(self):
if self.redis_client is None:
return None

results = self.redis_client.atomic_pop_all(RedisKeys.EXTRA_LOG_AGGREGATE_KEY)
if len(results) == 0:
return None

combined_agg = {}
for res in results:
for k,v in res.items():
if k not in combined_agg:
combined_agg[k] = [v]
else:
combined_agg[k] += [v]

for k in combined_agg.keys():
combined_agg[k] = np.mean(combined_agg[k])

return combined_agg


def _start_csv(self):
# Kept running into multiple processes grabbing the same files, even to the microsecond! Mix it up.
sleep(np.random.uniform(0,1))
self.csv_path = self.csv_dir / datetime.now().strftime("%Y-%m-%d_%H-%M-%S.%f.csv")

with open(self.csv_path, "a", newline='') as csvfile:
writer = csv.writer(csvfile)
row = ["datetime"] + [ self.out_names[k] for k in self.keys ]
writer.writerow(row)

## Apparently closing files is a big performance penalty in windows, lets try and leave it open (even if not ideal)
## Update: This is mitigated now by only updating every N updates
## https://gregoryszorc.com/blog/2015/10/22/append-i/o-performance-on-windows/
# try:
# del self.writer
# self.cur_file.close()
# except:
# pass

# self.csv_file = open(self.csv_path, "a", newline='')
# self.writer = csv.writer(self.csv_file)
# row = ["datetime"] + [ self.out_names[k] for k in self.keys ]
# self.writer.writerow(row)

def _write_to_csv(self, agg_data):
if (self.csv_path is None) or (not self.csv_path.exists()):
self._start_csv()

row = [ datetime.now() ] + [ agg_data[self.out_names[k]] for k in self.keys ]

with open(self.csv_path, "a", newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(row)

# self.writer.writerow(row)


def _per(x, per_value):
return x / per_value

def _inv_per(x, per_value):
return per_value / x

def _root_mean_square(x):
return np.sqrt(np.mean(np.square(x)))