diff --git a/rl/hierarchical_environment.py b/rl/hierarchical_environment.py new file mode 100644 index 00000000..3a2f1c55 --- /dev/null +++ b/rl/hierarchical_environment.py @@ -0,0 +1,248 @@ +import gymnasium as gym +import numpy as np +import ray +from ray.rllib.env.multi_agent_env import MultiAgentEnv +from ray.rllib.utils.typing import MultiAgentDict + +from mlagents_envs.base_env import ActionTuple +from mlagents_envs.environment import UnityEnvironment +from mlagents_envs.side_channel.engine_configuration_channel import ( + EngineConfigurationChannel, +) + +class HierarchicalUnityMultiAgentEnv(MultiAgentEnv): + def __init__(self, config, *args, **kwargs): + super().__init__() + self.unity_env_handle = config["unity_env_handle"] + self.initial_agent_count = config.get("initial_agent_count", 2) + self.max_episode_steps = config.get("episode_horizon", 1000) + self.episode_timesteps = 0 + + # Set up the Unity environment + ray.get(self.unity_env_handle.set_float_property.remote("initialAgentCount", self.initial_agent_count)) + ray.get(self.unity_env_handle.set_float_property.remote("MaxSteps", self.max_episode_steps)) + ray.get(self.unity_env_handle.reset.remote()) + + # Get behavior specifications + behavior_specs = ray.get(self.unity_env_handle.get_behavior_specs.remote()) + self.behavior_name = list(behavior_specs.keys())[0] + self.behavior_spec = behavior_specs[self.behavior_name] + + # Get initial number of agents + decision_steps, _ = ray.get(self.unity_env_handle.get_steps.remote(self.behavior_name)) + self.num_agents = len(decision_steps) + + # Set up agent IDs for high-level and low-level policies + self.high_level_agent_ids = [f"high_level_agent_{i}" for i in range(self.num_agents)] + self.low_level_agent_ids = [f"low_level_agent_{i}" for i in range(self.num_agents)] + self._agent_ids = set(self.high_level_agent_ids + self.low_level_agent_ids) + + # Set up observation and action spaces + self.size_of_single_agent_obs = self.behavior_spec.observation_specs[0].shape[0] + self._update_spaces() + + def _update_spaces(self): + # High-level policy + high_level_obs_space = gym.spaces.Box( + low=-np.inf, high=np.inf, shape=(self.size_of_single_agent_obs,), dtype=np.float32 + ) + high_level_action_space = gym.spaces.Discrete(5) + + # Low-level policy + low_level_obs_space = gym.spaces.Tuple(( + gym.spaces.Box(low=-np.inf, high=np.inf, shape=(self.size_of_single_agent_obs,), dtype=np.float32), + gym.spaces.Discrete(5) + )) + low_level_action_space = gym.spaces.Box( + low=np.array([-0.67, 0.0, -8.0]), + high=np.array([0.67, 4.5, 0.0]), + shape=(3,), + dtype=np.float32 + ) + + self.observation_spaces = {} + self.action_spaces = {} + + for agent_id in self.high_level_agent_ids: + self.observation_spaces[agent_id] = high_level_obs_space + self.action_spaces[agent_id] = high_level_action_space + + for agent_id in self.low_level_agent_ids: + self.observation_spaces[agent_id] = low_level_obs_space + self.action_spaces[agent_id] = low_level_action_space + + def reset(self, *, seed=None, options=None): + if seed is not None: + np.random.seed(seed) + + self.episode_timesteps = 0 + ray.get(self.unity_env_handle.reset.remote()) + + decision_steps, _ = ray.get(self.unity_env_handle.get_steps.remote(self.behavior_name)) + self.num_agents = len(decision_steps) + + obs_dict = {} + for i, agent_id in enumerate(decision_steps.agent_id): + obs = decision_steps[agent_id].obs[0].astype(np.float32) + obs_dict[f"high_level_agent_{i}"] = obs + obs_dict[f"low_level_agent_{i}"] = (obs, 0) # Initial high-level action is 0 + + return obs_dict, {} + + def step(self, action_dict: MultiAgentDict): + high_level_actions = {} + low_level_actions = {} + + for agent_id, action in action_dict.items(): + if agent_id.startswith("high_level_agent_"): + high_level_actions[agent_id] = action + elif agent_id.startswith("low_level_agent_"): + low_level_actions[agent_id] = action + + # Combine high-level and low-level actions + combined_actions = {} + for i in range(self.num_agents): + high_level_action = high_level_actions.get(f"high_level_agent_{i}", 0) + low_level_action = low_level_actions.get(f"low_level_agent_{i}", np.zeros(3)) + combined_actions[f"agent_{i}"] = (high_level_action, low_level_action) + + # Convert combined actions to Unity's ActionTuple + action_tuple = self._convert_to_action_tuple(combined_actions) + ray.get(self.unity_env_handle.set_actions.remote(self.behavior_name, action_tuple)) + + # Step the Unity environment + for _ in range(25): + ray.get(self.unity_env_handle.step.remote()) + + obs_dict, rewards_dict, terminateds_dict, truncateds_dict, infos_dict = self._get_step_results() + + # Separate results for high-level and low-level policies + high_level_obs = {} + low_level_obs = {} + high_level_rewards = {} + low_level_rewards = {} + high_level_terminateds = {} + low_level_terminateds = {} + high_level_truncateds = {} + low_level_truncateds = {} + high_level_infos = {} + low_level_infos = {} + + for i in range(self.num_agents): + agent_obs = obs_dict[f"agent_{i}"] + high_level_obs[f"high_level_agent_{i}"] = agent_obs + low_level_obs[f"low_level_agent_{i}"] = (agent_obs, high_level_actions.get(f"high_level_agent_{i}", 0)) + + reward = rewards_dict[f"agent_{i}"] + high_level_rewards[f"high_level_agent_{i}"] = reward + low_level_rewards[f"low_level_agent_{i}"] = reward + + terminated = terminateds_dict[f"agent_{i}"] + high_level_terminateds[f"high_level_agent_{i}"] = terminated + low_level_terminateds[f"low_level_agent_{i}"] = terminated + + truncated = truncateds_dict[f"agent_{i}"] + high_level_truncateds[f"high_level_agent_{i}"] = truncated + low_level_truncateds[f"low_level_agent_{i}"] = truncated + + info = infos_dict[f"agent_{i}"] + high_level_infos[f"high_level_agent_{i}"] = info + low_level_infos[f"low_level_agent_{i}"] = info + + # Combine results + obs_dict = {**high_level_obs, **low_level_obs} + rewards_dict = {**high_level_rewards, **low_level_rewards} + terminateds_dict = {**high_level_terminateds, **low_level_terminateds} + truncateds_dict = {**high_level_truncateds, **low_level_truncateds} + infos_dict = {**high_level_infos, **low_level_infos} + + # Check for episode end + self.episode_timesteps += 1 + if self.episode_timesteps > self.max_episode_steps: + truncateds_dict = dict({"__all__": True}, **{agent_id: True for agent_id in self._agent_ids}) + + terminateds_dict["__all__"] = all(terminateds_dict.values()) + truncateds_dict["__all__"] = any(truncateds_dict.values()) + + return obs_dict, rewards_dict, terminateds_dict, truncateds_dict, infos_dict + + def _convert_to_action_tuple(self, actions_dict): + """ + Converts the given actions dictionary to an ActionTuple for Unity. + + Args: + actions_dict (Dict[str, Any]): The actions dictionary to convert. + + Returns: + ActionTuple: The corresponding ActionTuple. + """ + # Split the actions into continuous and discrete actions + continuous_actions = [] + discrete_actions = [] + + for agent_id, agent_actions in actions_dict.items(): + # Agent actions is a tuple where the first element is discrete and the rest are continuous + discrete_action, continuous_action = agent_actions + + discrete_actions.append([discrete_action]) + continuous_actions.append(list(continuous_action)) + + # Convert to numpy arrays + discrete_actions = np.array(discrete_actions, dtype=np.int32) + continuous_actions = np.array(continuous_actions, dtype=np.float32) + + # Alternative use of ActionTuple.add_discrete(discrete_actions) and ActionTuple.add_continuous(continuous_actions) + return ActionTuple(continuous=continuous_actions, discrete=discrete_actions) + + def _get_step_results(self): + """Collects those agents' obs/rewards that have to act in next `step`. + + Returns: + Tuple: + obs: Multi-agent observation dict. + Only those observations for which to get new actions are + returned. + rewards: Rewards dict matching `obs`. + dones: Done dict with only an __all__ multi-agent entry in it. + __all__=True, if episode is done for all agents. + """ + # Process observations, rewards, and done flags + obs_dict = {} + rewards_dict = {} + terminateds_dict = {} + truncateds_dict = {} + infos_dict = {} + + # Get the new state + decision_steps, terminal_steps = ray.get( + self.unity_env_handle.get_steps.remote(self.behavior_name) + ) + + # Alternative, decision_steps.agent_id_to_index + for agent_id in decision_steps.agent_id: + agent_key = f"agent_{agent_id}" + obs_dict[agent_key] = decision_steps[agent_id].obs[0].astype(np.float32) + rewards_dict[agent_key] = decision_steps[agent_id].reward + terminateds_dict[agent_key] = False + truncateds_dict[agent_key] = ( + False # Assume not truncated if in decision_steps + ) + infos_dict[agent_key] = {} + + for agent_id in terminal_steps.agent_id: + agent_key = f"agent_{agent_id}" + obs_dict[agent_key] = terminal_steps[agent_id].obs[0].astype(np.float32) + rewards_dict[agent_key] = terminal_steps[agent_id].reward + terminateds_dict[agent_key] = True + truncateds_dict[agent_key] = terminal_steps[agent_id].interrupted + infos_dict[agent_key] = {} + + + # All Agents Done Check: Only use dones if all agents are done, then we should do a reset. + terminateds_dict["__all__"] = len(terminal_steps) == self.num_agents + truncateds_dict["__all__"] = all(truncateds_dict.values()) + + return obs_dict, rewards_dict, terminateds_dict, truncateds_dict, infos_dict + + def close(self): + ray.get(self.unity_env_handle.close.remote()) diff --git a/rl/hierarchical_trainer.py b/rl/hierarchical_trainer.py new file mode 100644 index 00000000..cc042cba --- /dev/null +++ b/rl/hierarchical_trainer.py @@ -0,0 +1,289 @@ +import os +import yaml +import yamale + +import gymnasium as gym +import mlflow +import numpy as np +from mlagents_envs.base_env import ActionTuple +from mlagents_envs.environment import UnityEnvironment +from mlagents_envs.side_channel.engine_configuration_channel import EngineConfigurationChannel +from mlagents_envs.side_channel.float_properties_channel import FloatPropertiesChannel +import ray +from ray import train +from ray import tune +from ray.air.integrations.mlflow import MLflowLoggerCallback +from ray.rllib.algorithms.ppo import PPO +from ray.rllib.env.env_context import EnvContext +from ray.rllib.env.multi_agent_env import MultiAgentEnv +from ray.rllib.policy.policy import PolicySpec +from ray.rllib.policy.policy import Policy +from ray.rllib.utils.typing import AgentID, MultiAgentDict, PolicyID +from ray.train import RunConfig +from ray.tune import Tuner +from ray.tune.registry import register_env +from hierarchical_environment import HierarchicalUnityMultiAgentEnv +from utils import create_unity_env + +from mlflow.models.signature import ModelSignature +from mlflow.types.schema import Schema, TensorSpec + +# Suppress DeprecationWarnings from output +os.environ["PYTHONWARNINGS"] = "ignore::DeprecationWarning" + +# Set it for the current notebook environment +version = "0" # Default "1" +os.environ["RAY_AIR_NEW_OUTPUT"] = version +os.environ['RAY_AIR_RICH_LAYOUT'] = version +gym.logger.set_level(gym.logger.DISABLED) + + +def validate_yaml_schema(data_path, schema_path): + try: + schema = yamale.make_schema(schema_path) + data = yamale.make_data(data_path) + yamale.validate(schema, data) + print("YAML is valid according to the schema!") + return True + except yamale.YamaleError as e: + print(f"Validation failed: {e}") + return False + +# Define environment creator function +def env_creator(env_config): + return HierarchicalUnityMultiAgentEnv(env_config) + +def policy_mapping_fn(agent_id, episode, worker, **kwargs): + return "shared_policy" + +def main(): + """ + Main function to initialize the Unity environment and start the training process. + + Returns: + None + """ + # Determine the current directory where the script is running + current_dir = os.path.dirname(os.path.abspath(__file__)) + + # Set YAML files paths + config_path = os.path.join(current_dir, "config.yaml") + config_schema_path = os.path.join(current_dir, 'config_schema.yaml') + + # Validate YAML file + validate_yaml_schema(config_path, config_schema_path) + + # Load configuration from YAML file + with open(config_path, "r") as config_file: + config_data = yaml.safe_load(config_file) + + # Set up MLflow logging + mlflow.set_experiment(config_data["mlflow"]["experiment_name"]) + + # Initialize Ray + ray.init( + ignore_reinit_error=True, + num_cpus=config_data["ray"]["num_cpus"], + runtime_env={ + "env_vars": { + "RAY_AIR_NEW_OUTPUT": version, + "RAY_AIR_RICH_LAYOUT": version, + } + } + ) + + # Get the base directory by moving up one level (assuming the script is in 'rl' folder) + base_dir = os.path.dirname(current_dir) + + # Construct the full path to the Unity executable + unity_executable_path = os.path.join(base_dir, "libReplicantDriveSim.app") + + # Create Unity environment + unity_env_handle = create_unity_env( + file_name=unity_executable_path, + worker_id=0, + base_port=config_data["unity_env"]["base_port"], + no_graphics=config_data["unity_env"]["no_graphics"], + ) + + # Register the environment with RLlib + env_name = "HierarchicalUnityMultiAgentEnv" + register_env(env_name, env_creator) + + # Create an instance of the environment for configuration + env_config = { + "initial_agent_count": config_data["env_config"]["initial_agent_count"], + "unity_env_handle": unity_env_handle, + "episode_horizon": config_data["env_config"]["episode_horizon"], + } + + # Define the configuration for the PPO algorithm + env = HierarchicalUnityMultiAgentEnv( + config=env_config, + unity_env_handle=unity_env_handle + ) + + config = PPO.get_default_config() + config = config.environment( + env=env_name, + env_config=env_config, + disable_env_checking=config_data["environment"]["disable_env_checking"] # Source: https://discuss.ray.io/t/agent-ids-that-are-not-the-names-of-the-agents-in-the-env/6964/3 + ) + config = config.framework(config_data["ppo_config"]["framework"]) + config = config.resources(num_gpus=config_data["ppo_config"]["num_gpus"]) + + # Multi-agent configuration + config = config.multi_agent( + policies={ + "high_level_policy": PolicySpec( + policy_class=None, # Auto-select + observation_space=env.observation_spaces["high_level_agent_0"], + action_space=env.action_spaces["high_level_agent_0"], + config={} + ), + "low_level_policy": PolicySpec( + policy_class=None, # Auto-select + observation_space=env.observation_spaces["low_level_agent_0"], + action_space=env.action_spaces["low_level_agent_0"], + config={} + ) + }, + policy_mapping_fn=lambda agent_id, episode, **kwargs: ( + "high_level_policy" if agent_id.startswith("high_level_agent_") else "low_level_policy" + ) + ) + + # Rollout configuration + # Setting num_env_runners=0 will only create the local worker, in which case both sample collection + # and training will be done by the local worker. On the other hand, setting num_env_runners=5 + # will create the local worker (responsible for training updates) + # and 5 remote workers (responsible for sample collection). + config = config.rollouts( + num_rollout_workers=config_data["rollouts"]["num_rollout_workers"], + num_envs_per_worker=config_data["rollouts"]["num_envs_per_worker"], + rollout_fragment_length=config_data["rollouts"]["rollout_fragment_length"], + batch_mode=config_data["rollouts"]["batch_mode"], + ) + + # Training configuration + config = config.training( + train_batch_size=config_data["training"]["train_batch_size"], + sgd_minibatch_size=config_data["training"]["sgd_minibatch_size"], + num_sgd_iter=config_data["training"]["num_sgd_iter"], + lr=config_data["training"]["lr"], + gamma=config_data["training"]["gamma"], + lambda_=config_data["training"]["lambda"], + clip_param=config_data["training"]["clip_param"], + vf_clip_param=config_data["training"]["vf_clip_param"], + entropy_coeff=config_data["training"]["entropy_coeff"], + kl_coeff=config_data["training"]["kl_coeff"], + vf_loss_coeff=config_data["training"]["vf_loss_coeff"], + ) + config = config.checkpointing(export_native_model_files=True) + config = config.debugging(log_level='ERROR') + + tags = { + "user_name": "chrisjcc", + "git_commit_hash": "c15d456f12bb54180b25dfa8e0d2268694dd1a9e" + } + + tuner = Tuner( + PPO, + param_space=config, + run_config=RunConfig( + name="PPO_Highway_Experiment", + local_dir="./ray_results", + checkpoint_config=train.CheckpointConfig( + num_to_keep=1, + checkpoint_frequency=1, + #checkpoint_at_end=True, + ), + callbacks=[ + MLflowLoggerCallback( + experiment_name=config_data["mlflow"]["experiment_name"], + tracking_uri=mlflow.get_tracking_uri(), + registry_uri=None, + save_artifact=True, + tags=tags, + ) + ], + stop={"training_iteration": config_data["stop"]["training_iteration"]}, # Stop after 100 iterations, each iteration can consist of multiple episodes, depending on how the rollout and batch sizes are configured. + ), + tune_config=tune.TuneConfig( + num_samples=1, + max_concurrent_trials=1, + metric="episode_reward_mean", + mode="max", + ), + ) + + results = tuner.fit() + + # Print the results dictionary of the training to inspect the structure + print("Training results: ", results) + + # Check if results is not empty + if results: + # Get all checkpoints + checkpoints = results.get_checkpoints() + + if checkpoints: + # Get the last checkpoint + last_checkpoint = checkpoints[-1] + checkpoint_to_use = last_checkpoint + + # Try to get the best result + try: + best_result = results.get_best_result(metric="episode_reward_mean", mode="max") + if best_result and best_result.checkpoint: + print("Setting the checkpoint to load the best trial result") + checkpoint_to_use = best_result.checkpoint + except Exception as e: + print(f"Error getting best result: {e}. Using last checkpoint.") + + # Load the policy from the selected checkpoint + try: + policy = Policy.from_checkpoint(checkpoint_to_use)["shared_policy"] + print(f"Loaded policy: {policy}") + + policy.export_model( + "saved_model", + onnx=None, # OpSet 14-15: These are more recent versions that may include newer features. + ) + + # The most recent experiment and run will be the first one + experiment = mlflow.get_experiment_by_name(config_data["mlflow"]["experiment_name"]) + experiment_id = experiment.experiment_id + runs = mlflow.search_runs(experiment_ids=[experiment.experiment_id]) + run_id = runs.iloc[0].run_id + run_name = f"PPO_HierarchicalUnityMultiAgentEnv_{checkpoint_to_use.trial_id}" + + input_schema = Schema([ + TensorSpec(np.dtype(np.float32), (-1, env.size_of_single_agent_obs), agent_id) + for agent_id in env._agent_ids + ]) + + model_signature = ModelSignature(inputs=input_schema) + + # Register the model, resume a run with the specified run ID + with mlflow.start_run(run_name=run_name, experiment_id=experiment_id, run_id=run_id, tags=tags) as run: + mlflow.pytorch.log_model( + pytorch_model=policy.model, + artifact_path="ppo_model", + registered_model_name="PPO_Highway_Model", + signature=model_signature, + ) + print(f"Model registered with run ID: {run.info.run_id}") + except Exception as e: + print(f"Error loading policy or registering model: {e}") + else: + print("No checkpoints found. Model registration skipped.") + else: + print("No results returned from tuner.fit(). Model registration skipped.") + + # Make sure to close the Unity environment at the end + ray.get(unity_env_handle.close.remote()) + ray.shutdown() + +if __name__ == "__main__": + main()