From a37083b94befdb13ac400784ccc58aae636ca80b Mon Sep 17 00:00:00 2001 From: Frances Liu Date: Tue, 29 Oct 2024 13:11:36 -0700 Subject: [PATCH] Upgrade gym --- python/requirements.txt | 2 +- .../ml/rllib-test-requirements.txt | 35 +- python/requirements_compiled.txt | 31 +- python/setup.py | 2 +- .../byod/requirements_byod_3.9.txt | 14 +- rllib/BUILD | 4 +- rllib/algorithms/algorithm_config.py | 2 +- rllib/algorithms/dreamerv3/README.md | 2 +- .../dreamerv3/tests/test_dreamerv3.py | 2 +- .../algorithms/dreamerv3/utils/env_runner.py | 376 +++++++------- rllib/algorithms/ppo/tests/test_ppo.py | 2 +- .../ppo/tests/test_ppo_old_api_stack.py | 4 +- .../ppo/tests/test_ppo_rl_module.py | 4 +- .../algorithms/tests/test_algorithm_config.py | 6 +- .../tests/test_callbacks_on_env_runner.py | 6 +- rllib/benchmarks/ppo/benchmark_atari_ppo.py | 110 ++-- .../torch_compile/run_inference_bm.py | 2 +- .../run_ppo_with_inference_bm.py | 2 +- rllib/env/multi_agent_env_runner.py | 25 +- rllib/env/single_agent_env_runner.py | 471 ++++++------------ rllib/env/single_agent_episode.py | 6 + .../env/tests/test_single_agent_env_runner.py | 24 +- rllib/env/utils/__init__.py | 7 + rllib/env/wrappers/atari_wrappers.py | 7 +- rllib/env/wrappers/kaggle_wrapper.py | 189 ------- rllib/env/wrappers/model_vector_env.py | 164 ------ rllib/env/wrappers/recsim.py | 270 ---------- rllib/env/wrappers/recsim_wrapper.py | 14 - rllib/env/wrappers/uncertainty_wrappers.py | 23 - .../_old_api_stack/custom_keras_model.py | 4 +- rllib/examples/connectors/frame_stacking.py | 2 +- .../euclidian_distance_based_curiosity.py | 9 +- ...trinsic_curiosity_model_based_curiosity.py | 6 +- .../envs/env_rendering_and_recording.py | 15 +- .../examples/evaluation/custom_evaluation.py | 4 +- .../metrics/custom_metrics_in_env_runners.py | 2 +- rllib/examples/ray_tune/custom_experiment.py | 2 +- .../rl_modules/custom_cnn_rl_module.py | 2 +- rllib/models/tests/test_preprocessors.py | 4 +- .../bc/benchmark_atari_pong_bc.py | 2 +- rllib/tuned_examples/impala/pong_impala.py | 2 +- .../impala/pong_impala_pb2_hyperopt.py | 2 +- rllib/tuned_examples/ppo/atari_ppo.py | 5 +- rllib/utils/error.py | 2 +- .../utils/exploration/tests/test_curiosity.py | 204 +------- 45 files changed, 530 insertions(+), 1543 deletions(-) delete mode 100644 rllib/env/wrappers/kaggle_wrapper.py delete mode 100644 rllib/env/wrappers/model_vector_env.py delete mode 100644 rllib/env/wrappers/recsim.py delete mode 100644 rllib/env/wrappers/recsim_wrapper.py delete mode 100644 rllib/env/wrappers/uncertainty_wrappers.py diff --git a/python/requirements.txt b/python/requirements.txt index e565575a238d..baad08de44db 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -38,7 +38,7 @@ colorful rich opentelemetry-sdk fastapi -gymnasium==0.28.1 +gymnasium==1.0.0 virtualenv!=20.21.1,>=20.0.24 opentelemetry-api opencensus diff --git a/python/requirements/ml/rllib-test-requirements.txt b/python/requirements/ml/rllib-test-requirements.txt index 1c47364f6b65..887d515d96c7 100644 --- a/python/requirements/ml/rllib-test-requirements.txt +++ b/python/requirements/ml/rllib-test-requirements.txt @@ -3,43 +3,28 @@ # Environment adapters. # --------------------- # Atari -gymnasium==0.28.1; python_version < "3.12" -imageio; python_version < "3.12" -ale_py==0.8.1; python_version < "3.12" +imageio==2.34.2 +ale_py==0.10.1 # For testing MuJoCo envs with gymnasium. -mujoco==2.3.6; python_version < "3.12" +mujoco==3.2.4 dm_control==1.0.12; python_version < "3.12" # For tests on PettingZoo's multi-agent envs. -pettingzoo==1.23.1 -# When installing pettingzoo, chess is missing, even though its a dependancy -# TODO: remove if a future pettingzoo and/or ray version fixes this dependancy issue. -chess==1.7.0 +pettingzoo==1.24.3 pymunk==6.2.1 -supersuit==3.8.0; python_version < "3.12" -tinyscaler==1.2.6; python_version < "3.12" -shimmy - -# Kaggle envs. -kaggle_environments==1.7.11 -# Unity3D testing -# TODO(sven): Add this back to rllib-requirements.txt once mlagents no longer pins torch<1.9.0 version. -#mlagents==0.28.0 -mlagents_envs==0.28.0 +tinyscaler==1.2.8 +shimmy==2.0.0 +supersuit==3.9.3 # For tests on minigrid. -minigrid -# For tests on RecSim and Kaggle envs. -# Explicitly depends on `tensorflow` and doesn't accept `tensorflow-macos` -recsim==0.2.4; (sys_platform != 'darwin' or platform_machine != 'arm64') and python_version < "3.12" -# recsim depends on dopamine-rl, but dopamine-rl pins gym <= 0.25.2, which break some envs -dopamine-rl==4.0.5; (sys_platform != 'darwin' or platform_machine != 'arm64') and python_version < "3.12" +minigrid==2.3.1 tensorflow_estimator # DeepMind's OpenSpiel open-spiel==1.4 +# Unity3D testing +mlagents_envs==0.28.0 # Requires libtorrent which is unavailable for arm64 -autorom[accept-rom-license]; platform_machine != "arm64" h5py==3.10.0 # Requirements for rendering. diff --git a/python/requirements_compiled.txt b/python/requirements_compiled.txt index a1043afc5b51..1347afee24c5 100644 --- a/python/requirements_compiled.txt +++ b/python/requirements_compiled.txt @@ -75,10 +75,10 @@ aiosqlite==0.19.0 # via ypy-websocket alabaster==0.7.13 # via sphinx -ale-py==0.8.1 ; python_version < "3.12" +ale-py==0.10.1 # via # -r /ray/ci/../python/requirements/ml/rllib-test-requirements.txt - # gym + # gymnasium alembic==1.12.1 # via # aim @@ -272,8 +272,6 @@ charset-normalizer==3.3.2 # via # requests # snowflake-connector-python -chess==1.7.0 - # via -r /ray/ci/../python/requirements/ml/rllib-test-requirements.txt chex==0.1.7 # via optax clang-format==12.0.1 @@ -306,7 +304,6 @@ cloudpickle==2.2.0 # -r /ray/ci/../python/requirements/test-requirements.txt # dask # distributed - # gym # gymnasium # hyperopt # mlagents-envs @@ -704,13 +701,7 @@ gsutil==5.27 # via -r /ray/ci/../python/requirements/docker/ray-docker-requirements.txt gunicorn==20.1.0 # via mlflow -gym==0.26.2 - # via - # dopamine-rl - # recsim -gym-notices==0.0.8 - # via gym -gymnasium==0.28.1 ; python_version < "3.12" +gymnasium==1.0.0 # via # -r /ray/ci/../python/requirements.txt # -r /ray/ci/../python/requirements/ml/rllib-test-requirements.txt @@ -1126,7 +1117,7 @@ msrestazure==0.6.4 # via # -r /ray/ci/../python/requirements/test-requirements.txt # azure-cli-core -mujoco==2.3.6 ; python_version < "3.12" +mujoco==3.2.4 # via # -r /ray/ci/../python/requirements/ml/rllib-test-requirements.txt # dm-control @@ -1246,7 +1237,6 @@ numpy==1.26.4 # flax # gpy # gradio - # gym # gymnasium # h5py # hpbandster @@ -1290,7 +1280,6 @@ numpy==1.26.4 # pyro-ppl # pytorch-lightning # raydp - # recsim # scikit-image # scikit-learn # scipy @@ -1489,7 +1478,7 @@ pbr==6.0.0 # sarif-om peewee==3.17.0 # via semgrep -pettingzoo==1.23.1 +pettingzoo==1.24.3 # via -r /ray/ci/../python/requirements/ml/rllib-test-requirements.txt pexpect==4.8.0 # via @@ -1862,8 +1851,6 @@ querystring-parser==1.2.4 # via raydp raydp==1.7.0b20231020.dev0 # via -r /ray/ci/../python/requirements/ml/data-test-requirements.txt -recsim==0.2.4 ; (sys_platform != "darwin" or platform_machine != "arm64") and python_version < "3.12" - # via -r /ray/ci/../python/requirements/ml/rllib-test-requirements.txt redis==4.4.2 # via -r /ray/ci/../python/requirements/test-requirements.txt regex==2024.5.15 @@ -2049,7 +2036,7 @@ shellcheck-py==0.7.1.1 # via -r /ray/ci/../python/requirements/lint-requirements.txt shellingham==1.5.4 # via typer -shimmy==1.3.0 +shimmy==2.0.0 # via -r /ray/ci/../python/requirements/ml/rllib-test-requirements.txt shortuuid==1.0.1 # via -r /ray/ci/../python/requirements/ml/tune-test-requirements.txt @@ -2167,9 +2154,7 @@ statsmodels==0.14.0 # via # hpbandster # statsforecast -strictyaml==1.7.3 - # via pyiceberg -supersuit==3.8.0 ; python_version < "3.12" +supersuit==3.9.3 # via -r /ray/ci/../python/requirements/ml/rllib-test-requirements.txt sympy==1.13.1 # via @@ -2256,7 +2241,7 @@ timm==0.9.2 # via -r /ray/ci/../python/requirements/ml/tune-test-requirements.txt tinycss2==1.3.0 # via nbconvert -tinyscaler==1.2.6 ; python_version < "3.12" +tinyscaler==1.2.8 # via # -r /ray/ci/../python/requirements/ml/rllib-test-requirements.txt # supersuit diff --git a/python/setup.py b/python/setup.py index 92b9d5c8adea..1a2e67885e2a 100644 --- a/python/setup.py +++ b/python/setup.py @@ -299,7 +299,7 @@ def get_packages(self): setup_spec.extras["rllib"] = setup_spec.extras["tune"] + [ "dm_tree", - "gymnasium==0.28.1", + "gymnasium==1.0.0", "lz4", "scikit-image", "pyyaml", diff --git a/release/ray_release/byod/requirements_byod_3.9.txt b/release/ray_release/byod/requirements_byod_3.9.txt index d55e3d79a7a8..1806b5686e91 100644 --- a/release/ray_release/byod/requirements_byod_3.9.txt +++ b/release/ray_release/byod/requirements_byod_3.9.txt @@ -116,7 +116,7 @@ aiosignal==1.3.1 \ # via # -c release/ray_release/byod/requirements_compiled.txt # aiohttp -ale-py==0.8.1 \ +ale-py==0.9.0 \ --hash=sha256:0006d80dfe7745eb5a93444492337203c8bc7eb594a2c24c6a651c5c5b0eaf09 \ --hash=sha256:0856ca777473ec4ae8a59f3af9580259adb0fd4a47d586a125a440c62e82fc10 \ --hash=sha256:0ffecb5c956749596030e464827642945162170a132d093c3d4fa2d7e5725c18 \ @@ -1242,17 +1242,6 @@ gsutil==5.27 \ # via # -c release/ray_release/byod/requirements_compiled.txt # -r release/ray_release/byod/requirements_byod_3.9.in -gym[atari]==0.26.2 \ - --hash=sha256:e0d882f4b54f0c65f203104c24ab8a38b039f1289986803c7d02cdbe214fbcc4 - # via - # -c release/ray_release/byod/requirements_compiled.txt - # -r release/ray_release/byod/requirements_byod_3.9.in -gym-notices==0.0.8 \ - --hash=sha256:ad25e200487cafa369728625fe064e88ada1346618526102659b4640f2b4b911 \ - --hash=sha256:e5f82e00823a166747b4c2a07de63b6560b1acb880638547e0cabf825a01e463 - # via - # -c release/ray_release/byod/requirements_compiled.txt - # gym h5py==3.10.0 \ --hash=sha256:012ab448590e3c4f5a8dd0f3533255bc57f80629bf7c5054cf4c87b30085063c \ --hash=sha256:212bb997a91e6a895ce5e2f365ba764debeaef5d2dca5c6fb7098d66607adf99 \ @@ -1739,7 +1728,6 @@ numpy==1.26.4 \ # ale-py # bokeh # dask - # gym # h5py # lightgbm # ml-dtypes diff --git a/rllib/BUILD b/rllib/BUILD index 9854e95adc98..d41d0a43b3ab 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -2543,8 +2543,8 @@ py_test( name = "examples/envs/env_rendering_and_recording", srcs = ["examples/envs/env_rendering_and_recording.py"], tags = ["team:rllib", "exclusive", "examples"], - size = "small", - args = ["--enable-new-api-stack", "--env=CartPole-v1", "--stop-iters=3"] + size = "medium", + args = ["--enable-new-api-stack", "--env=CartPole-v1", "--stop-iters=2"] ) #@OldAPIStack diff --git a/rllib/algorithms/algorithm_config.py b/rllib/algorithms/algorithm_config.py index f4a3a3fad2b3..124a0d07be43 100644 --- a/rllib/algorithms/algorithm_config.py +++ b/rllib/algorithms/algorithm_config.py @@ -3559,7 +3559,7 @@ def is_atari(self) -> bool: # Not yet determined, try to figure this out. if self._is_atari is None: # Atari envs are usually specified via a string like "PongNoFrameskip-v4" - # or "ALE/Breakout-v5". + # or "ale_py:ALE/Breakout-v5". # We do NOT attempt to auto-detect Atari env for other specified types like # a callable, to avoid running heavy logics in validate(). # For these cases, users can explicitly set `environment(atari=True)`. diff --git a/rllib/algorithms/dreamerv3/README.md b/rllib/algorithms/dreamerv3/README.md index a92918273f64..13a773bb02dd 100644 --- a/rllib/algorithms/dreamerv3/README.md +++ b/rllib/algorithms/dreamerv3/README.md @@ -49,7 +49,7 @@ in combination with the following scripts and command lines in order to run RLli ### [Atari100k](../../tuned_examples/dreamerv3/atari_100k.py) ```shell $ cd ray/rllib/tuned_examples/dreamerv3/ -$ python atari_100k.py --env ALE/Pong-v5 +$ python atari_100k.py --env ale_py:ALE/Pong-v5 ``` ### [DeepMind Control Suite (vision)](../../tuned_examples/dreamerv3/dm_control_suite_vision.py) diff --git a/rllib/algorithms/dreamerv3/tests/test_dreamerv3.py b/rllib/algorithms/dreamerv3/tests/test_dreamerv3.py index 7fbb8fd55c2a..87c46e2a2eac 100644 --- a/rllib/algorithms/dreamerv3/tests/test_dreamerv3.py +++ b/rllib/algorithms/dreamerv3/tests/test_dreamerv3.py @@ -63,7 +63,7 @@ def test_dreamerv3_compilation(self): for env in [ "FrozenLake-v1", "CartPole-v1", - "ALE/MsPacman-v5", + "ale_py:ALE/MsPacman-v5", "Pendulum-v1", ]: print("Env={}".format(env)) diff --git a/rllib/algorithms/dreamerv3/utils/env_runner.py b/rllib/algorithms/dreamerv3/utils/env_runner.py index df725f39f4b2..19e906bdaaf9 100644 --- a/rllib/algorithms/dreamerv3/utils/env_runner.py +++ b/rllib/algorithms/dreamerv3/utils/env_runner.py @@ -12,6 +12,7 @@ from typing import Collection, List, Optional, Tuple, Union import gymnasium as gym +from gymnasium.wrappers.vector import DictInfoToList import numpy as np import tree # pip install dm_tree @@ -75,7 +76,7 @@ def __init__( # Create the gym.vector.Env object. # Atari env. - if self.config.env.startswith("ALE/"): + if self.config.env.startswith("ale_py:ALE/"): # TODO (sven): This import currently causes a Tune test to fail. Either way, # we need to figure out how to properly setup the CI environment with # the correct versions of all gymnasium-related packages. @@ -114,17 +115,21 @@ def _entry_point(): gym.register("rllib-single-agent-env-v0", entry_point=_entry_point) - self.env = gym.vector.make( - "rllib-single-agent-env-v0", - num_envs=self.config.num_envs_per_env_runner, - asynchronous=self.config.remote_worker_envs, - wrappers=[ - partial(gym.wrappers.TimeLimit, max_episode_steps=108000), - partial(resize_v1, x_size=64, y_size=64), # resize to 64x64 - NormalizedImageEnv, - NoopResetEnv, - MaxAndSkipEnv, - ], + self.env = DictInfoToList( + gym.make_vec( + "rllib-single-agent-env-v0", + num_envs=self.config.num_envs_per_env_runner, + vectorization_mode=( + "async" if self.config.remote_worker_envs else "sync" + ), + wrappers=[ + partial(gym.wrappers.TimeLimit, max_episode_steps=108000), + partial(resize_v1, x_size=64, y_size=64), # resize to 64x64 + NormalizedImageEnv, + NoopResetEnv, + MaxAndSkipEnv, + ], + ) ) # DeepMind Control. elif self.config.env.startswith("DMC/"): @@ -139,12 +144,16 @@ def _entry_point(): parts[1], parts[2], from_pixels=from_pixels, channels_first=False ), ) - self.env = gym.vector.make( - "dmc_env-v0", - wrappers=[ActionClip], - num_envs=self.config.num_envs_per_env_runner, - asynchronous=self.config.remote_worker_envs, - **dict(self.config.env_config), + self.env = DictInfoToList( + gym.make_vec( + "dmc_env-v0", + wrappers=[ActionClip], + num_envs=self.config.num_envs_per_env_runner, + vectorization_mode=( + "async" if self.config.remote_worker_envs else "sync" + ), + **dict(self.config.env_config), + ) ) # All other envs (gym or `tune.register_env()`'d by the user). else: @@ -162,11 +171,15 @@ def _entry_point(): env_descriptor=self.config.env, ), ) - # Create the vectorized gymnasium env. - self.env = gym.vector.make( - "dreamerv3-custom-env-v0", - num_envs=self.config.num_envs_per_env_runner, - asynchronous=False, # self.config.remote_worker_envs, + # Wrap into `DictInfoToList` wrapper to get infos as lists. + self.env = DictInfoToList( + gym.make_vec( + "dreamerv3-custom-env-v0", + num_envs=self.config.num_envs_per_env_runner, + vectorization_mode=( + "async" if self.config.remote_worker_envs else "sync" + ), + ) ) self.num_envs = self.env.num_envs assert self.num_envs == self.config.num_envs_per_env_runner @@ -185,6 +198,8 @@ def _entry_point(): # TODO (sven): DreamerV3 is currently single-agent only. self.module = self.multi_rl_module_spec.build()[DEFAULT_MODULE_ID] + self._cached_to_module = None + self.metrics = MetricsLogger() self._device = None @@ -258,7 +273,7 @@ def sample( # Sample n timesteps. if num_timesteps is not None: - return self._sample_timesteps( + return self._sample( num_timesteps=num_timesteps, explore=explore, random_actions=random_actions, @@ -269,7 +284,7 @@ def sample( # `_sample_episodes` returns only one list (with completed episodes) # return empty list for incomplete ones. return ( - self._sample_episodes( + self._sample( num_episodes=num_episodes, explore=explore, random_actions=random_actions, @@ -277,18 +292,18 @@ def sample( [], ) - def _sample_timesteps( + def _sample( self, - num_timesteps: int, + *, + num_timesteps: Optional[int] = None, + num_episodes: Optional[int] = None, explore: bool = True, random_actions: bool = False, force_reset: bool = False, ) -> List[SingleAgentEpisode]: - """Helper method to run n timesteps. + """Helper method to sample n timesteps or m episodes.""" - See docstring of self.sample() for more details. - """ - done_episodes_to_return = [] + done_episodes_to_return: List[SingleAgentEpisode] = [] # Get initial states for all `batch_size_B` rows in the forward batch. initial_states = tree.map_structure( @@ -297,193 +312,151 @@ def _sample_timesteps( ) # Have to reset the env (on all vector sub-envs). - if force_reset or self._needs_initial_reset: - obs, _ = self.env.reset() + if force_reset or num_episodes is not None or self._needs_initial_reset: + episodes = self._episodes = [None for _ in range(self.num_envs)] + self._reset_envs(episodes, initial_states) + # We just reset the env. Don't have to force this again in the next + # call to `self._sample()`. self._needs_initial_reset = False - self._episodes = [SingleAgentEpisode() for _ in range(self.num_envs)] - # Set initial obs and states in the episodes. for i in range(self.num_envs): - self._episodes[i].add_env_reset(observation=obs[i]) self._states[i] = None - - # Don't reset existing envs; continue in already started episodes. else: - # Pick up stored observations and states from previous timesteps. - obs = np.stack([eps.observations[-1] for eps in self._episodes]) + episodes = self._episodes - # Loop through env for n timesteps. + # Loop through `num_timesteps` timesteps or `num_episodes` episodes. ts = 0 - while ts < num_timesteps: + eps = 0 + while ( + (ts < num_timesteps) if num_timesteps is not None else (eps < num_episodes) + ): # Act randomly. if random_actions: actions = self.env.action_space.sample() - # Compute an action using our RLModule. + # Compute an action using the RLModule. else: - is_first = np.zeros((self.num_envs,)) - for i, eps in enumerate(self._episodes): - if self._states[i] is None: - is_first[i] = 1.0 - self._states[i] = {k: s[i] for k, s in initial_states.items()} - to_module = { - Columns.STATE_IN: tree.map_structure( - lambda s: self.convert_to_tensor(s), batch(self._states) - ), - Columns.OBS: self.convert_to_tensor(obs), - "is_first": self.convert_to_tensor(is_first), - } - # Explore or not. + # Env-to-module connector (already cached). + to_module = self._cached_to_module + assert to_module is not None + self._cached_to_module = None + + # RLModule forward pass: Explore or not. if explore: - outs = self.module.forward_exploration(to_module) + to_env = self.module.forward_exploration(to_module) else: - outs = self.module.forward_inference(to_module) + to_env = self.module.forward_inference(to_module) # Model outputs one-hot actions (if discrete). Convert to int actions # as well. - actions = convert_to_numpy(outs[Columns.ACTIONS]) + actions = convert_to_numpy(to_env[Columns.ACTIONS]) if isinstance(self.env.single_action_space, gym.spaces.Discrete): actions = np.argmax(actions, axis=-1) - self._states = unbatch(convert_to_numpy(outs[Columns.STATE_OUT])) + self._states = unbatch(convert_to_numpy(to_env[Columns.STATE_OUT])) - obs, rewards, terminateds, truncateds, infos = self.env.step(actions) - ts += self.num_envs + observations, rewards, terminateds, truncateds, infos = self.env.step( + actions + ) - for i in range(self.num_envs): - # The last entry in self.observations[i] is already the reset - # obs of the new episode. - if terminateds[i] or truncateds[i]: - # Finish the episode with the actual terminal observation stored in - # the info dict. - self._episodes[i].add_env_step( - observation=infos["final_observation"][i], - action=actions[i], - reward=rewards[i], - terminated=terminateds[i], - truncated=truncateds[i], + call_on_episode_start = set() + for env_index in range(self.num_envs): + # Episode has no data in it yet -> Was just reset and needs to be called + # with its `add_env_reset()` method. + if not episodes[env_index].is_reset: + episodes[env_index].add_env_reset( + observation=observations[env_index], + infos=infos[env_index], ) - self._states[i] = None - done_episodes_to_return.append(self._episodes[i]) - # Create a new episode object. - self._episodes[i] = SingleAgentEpisode(observations=[obs[i]]) + call_on_episode_start.add(env_index) + self._states[env_index] = None + + # Call `add_env_step()` method on episode. else: - self._episodes[i].add_env_step( - observation=obs[i], - action=actions[i], - reward=rewards[i], + # Only increase ts when we actually stepped (not reset'd as a reset + # does not count as a timestep). + ts += 1 + episodes[env_index].add_env_step( + observation=observations[env_index], + action=actions[env_index], + reward=rewards[env_index], + infos=infos[env_index], + terminated=terminateds[env_index], + truncated=truncateds[env_index], ) - # Return done episodes ... - self._done_episodes_for_metrics.extend(done_episodes_to_return) - # ... and all ongoing episode chunks. Also, make sure, we return - # a copy and start new chunks so that callers of this function - # don't alter our ongoing and returned Episode objects. - ongoing_episodes = self._episodes - self._episodes = [eps.cut() for eps in self._episodes] - for eps in ongoing_episodes: - self._ongoing_episodes_for_metrics[eps.id_].append(eps) - - self._increase_sampled_metrics(ts) - - return done_episodes_to_return + ongoing_episodes - - def _sample_episodes( - self, - num_episodes: int, - explore: bool = True, - random_actions: bool = False, - ) -> List[SingleAgentEpisode]: - """Helper method to run n episodes. - - See docstring of `self.sample()` for more details. - """ - done_episodes_to_return = [] - - obs, _ = self.env.reset() - episodes = [SingleAgentEpisode() for _ in range(self.num_envs)] - - # Multiply states n times according to our vector env batch size (num_envs). - states = tree.map_structure( - lambda s: np.repeat(s, self.num_envs, axis=0), - convert_to_numpy(self.module.get_initial_state()), - ) - is_first = np.ones((self.num_envs,)) - - for i in range(self.num_envs): - episodes[i].add_env_reset(observation=obs[i]) - - eps = 0 - while eps < num_episodes: - if random_actions: - actions = self.env.action_space.sample() - else: - batch = { + # Cache results as we will do the RLModule forward pass only in the next + # `while`-iteration. + if self.module is not None: + is_first = np.zeros((self.num_envs,)) + for env_index, episode in enumerate(episodes): + if self._states[env_index] is None: + is_first[env_index] = 1.0 + self._states[env_index] = { + k: s[env_index] for k, s in initial_states.items() + } + self._cached_to_module = { Columns.STATE_IN: tree.map_structure( - lambda s: self.convert_to_tensor(s), states + lambda s: self.convert_to_tensor(s), batch(self._states) ), - Columns.OBS: self.convert_to_tensor(obs), + Columns.OBS: self.convert_to_tensor(observations), "is_first": self.convert_to_tensor(is_first), } - if explore: - outs = self.module.forward_exploration(batch) - else: - outs = self.module.forward_inference(batch) + for env_index in range(self.num_envs): + # Episode is not done. + if not episodes[env_index].is_done: + continue - actions = convert_to_numpy(outs[Columns.ACTIONS]) - if isinstance(self.env.single_action_space, gym.spaces.Discrete): - actions = np.argmax(actions, axis=-1) - states = convert_to_numpy(outs[Columns.STATE_OUT]) + eps += 1 - obs, rewards, terminateds, truncateds, infos = self.env.step(actions) + # Then finalize (numpy'ize) the episode. + done_episodes_to_return.append(episodes[env_index].finalize()) - for i in range(self.num_envs): - # The last entry in self.observations[i] is already the reset - # obs of the new episode. - if terminateds[i] or truncateds[i]: - eps += 1 - - episodes[i].add_env_step( - observation=infos["final_observation"][i], - action=actions[i], - reward=rewards[i], - terminated=terminateds[i], - truncated=truncateds[i], - ) - done_episodes_to_return.append(episodes[i]) - - # Also early-out if we reach the number of episodes within this - # for-loop. - if eps == num_episodes: - break - - # Reset h-states to the model's initial ones b/c we are starting a - # new episode. - for k, v in convert_to_numpy( - self.module.get_initial_state() - ).items(): - states[k][i] = v - is_first[i] = True - - episodes[i] = SingleAgentEpisode(observations=[obs[i]]) - else: - episodes[i].add_env_step( - observation=obs[i], - action=actions[i], - reward=rewards[i], - ) - is_first[i] = False + # Also early-out if we reach the number of episodes within this + # for-loop. + if eps == num_episodes: + break + + # Create a new episode object with no data in it and execute + # `on_episode_created` callback (before the `env.reset()` call). + episodes[env_index] = SingleAgentEpisode( + observation_space=self.env.single_observation_space, + action_space=self.env.single_action_space, + ) + # Return done episodes ... + # TODO (simon): Check, how much memory this attribute uses. self._done_episodes_for_metrics.extend(done_episodes_to_return) + # ... and all ongoing episode chunks. - # If user calls sample(num_timesteps=..) after this, we must reset again - # at the beginning. - self._needs_initial_reset = True + # Also, make sure we start new episode chunks (continuing the ongoing episodes + # from the to-be-returned chunks). + ongoing_episodes_to_return = [] + # Only if we are doing individual timesteps: We have to maybe cut an ongoing + # episode and continue building it on the next call to `sample()`. + if num_timesteps is not None: + ongoing_episodes_continuations = [ + episode.cut(len_lookback_buffer=self.config.episode_lookback_horizon) + for episode in episodes + ] + + for episode in episodes: + # Just started Episodes do not have to be returned. There is no data + # in them anyway. + if episode.t == 0: + continue + episode.validate() + self._ongoing_episodes_for_metrics[episode.id_].append(episode) + # Return finalized (numpy'ized) Episodes. + ongoing_episodes_to_return.append(episode.finalize()) + + # Continue collecting into the cut Episode chunks. + self._episodes = ongoing_episodes_continuations - ts = sum(map(len, done_episodes_to_return)) self._increase_sampled_metrics(ts) - return done_episodes_to_return + # Return collected episode data. + return done_episodes_to_return + ongoing_episodes_to_return def get_spaces(self): return { @@ -564,6 +537,51 @@ def stop(self): # Close our env object via gymnasium's API. self.env.close() + def _reset_envs(self, episodes, initial_states): + # Create n new episodes and make the `on_episode_created` callbacks. + for env_index in range(self.num_envs): + self._new_episode(env_index, episodes) + + # Erase all cached ongoing episodes (these will never be completed and + # would thus never be returned/cleaned by `get_metrics` and cause a memory + # leak). + self._ongoing_episodes_for_metrics.clear() + + observations, infos = self.env.reset() + observations = unbatch(observations) + + # Set initial obs and infos in the episodes. + for env_index in range(self.num_envs): + episodes[env_index].add_env_reset( + observation=observations[env_index], + infos=infos[env_index], + ) + + # Run the env-to-module connector to make sure the reset-obs/infos have + # properly been processed (if applicable). + self._cached_to_module = None + if self.module: + is_first = np.zeros((self.num_envs,)) + for i, eps in enumerate(self._episodes): + if self._states[i] is None: + is_first[i] = 1.0 + self._states[i] = {k: s[i] for k, s in initial_states.items()} + self._cached_to_module = { + Columns.STATE_IN: tree.map_structure( + lambda s: self.convert_to_tensor(s), batch(self._states) + ), + Columns.OBS: self.convert_to_tensor(observations), + "is_first": self.convert_to_tensor(is_first), + } + # self._cached_to_module = TODO!! + + def _new_episode(self, env_index, episodes=None): + episodes = episodes if episodes is not None else self._episodes + episodes[env_index] = SingleAgentEpisode( + observation_space=self.env.single_observation_space, + action_space=self.env.single_action_space, + ) + def _increase_sampled_metrics(self, num_steps): # Per sample cycle stats. self.metrics.log_value( diff --git a/rllib/algorithms/ppo/tests/test_ppo.py b/rllib/algorithms/ppo/tests/test_ppo.py index ae51de75389d..3febf97fb2ca 100644 --- a/rllib/algorithms/ppo/tests/test_ppo.py +++ b/rllib/algorithms/ppo/tests/test_ppo.py @@ -98,7 +98,7 @@ def test_ppo_compilation_and_schedule_mixins(self): # "CliffWalking-v0", "CartPole-v1", "Pendulum-v1", - ]: # "ALE/Breakout-v5"]: + ]: # "ale_py:ALE/Breakout-v5"]: print("Env={}".format(env)) for lstm in [False]: print("LSTM={}".format(lstm)) diff --git a/rllib/algorithms/ppo/tests/test_ppo_old_api_stack.py b/rllib/algorithms/ppo/tests/test_ppo_old_api_stack.py index 24453758f6f0..edb2b3b3122e 100644 --- a/rllib/algorithms/ppo/tests/test_ppo_old_api_stack.py +++ b/rllib/algorithms/ppo/tests/test_ppo_old_api_stack.py @@ -155,7 +155,7 @@ def test_ppo_compilation_w_connectors(self): num_iterations = 2 - for env in ["FrozenLake-v1", "ALE/MsPacman-v5"]: + for env in ["FrozenLake-v1", "ale_py:ALE/MsPacman-v5"]: print("Env={}".format(env)) for lstm in [False, True]: print("LSTM={}".format(lstm)) @@ -216,7 +216,7 @@ def test_ppo_compilation_and_schedule_mixins(self): num_iterations = 2 - for env in ["FrozenLake-v1", "ALE/MsPacman-v5"]: + for env in ["FrozenLake-v1", "ale_py:ALE/MsPacman-v5"]: print("Env={}".format(env)) for lstm in [False, True]: print("LSTM={}".format(lstm)) diff --git a/rllib/algorithms/ppo/tests/test_ppo_rl_module.py b/rllib/algorithms/ppo/tests/test_ppo_rl_module.py index de3d3f42f424..2b1df1bf33e8 100644 --- a/rllib/algorithms/ppo/tests/test_ppo_rl_module.py +++ b/rllib/algorithms/ppo/tests/test_ppo_rl_module.py @@ -63,7 +63,7 @@ def tearDownClass(cls): def test_rollouts(self): # TODO: Add FrozenLake-v1 to cover LSTM case. - env_names = ["CartPole-v1", "Pendulum-v1", "ALE/Breakout-v5"] + env_names = ["CartPole-v1", "Pendulum-v1", "ale_py:ALE/Breakout-v5"] fwd_fns = ["forward_exploration", "forward_inference"] lstm = [True, False] config_combinations = [env_names, fwd_fns, lstm] @@ -98,7 +98,7 @@ def test_rollouts(self): def test_forward_train(self): # TODO: Add FrozenLake-v1 to cover LSTM case. - env_names = ["CartPole-v1", "Pendulum-v1", "ALE/Breakout-v5"] + env_names = ["CartPole-v1", "Pendulum-v1", "ale_py:ALE/Breakout-v5"] lstm = [False, True] config_combinations = [env_names, lstm] for config in itertools.product(*config_combinations): diff --git a/rllib/algorithms/tests/test_algorithm_config.py b/rllib/algorithms/tests/test_algorithm_config.py index 1d7a32e87a2a..11d55a741be3 100644 --- a/rllib/algorithms/tests/test_algorithm_config.py +++ b/rllib/algorithms/tests/test_algorithm_config.py @@ -145,11 +145,11 @@ def test_rollout_fragment_length(self): def test_detect_atari_env(self): """Tests that we can properly detect Atari envs.""" config = AlgorithmConfig().environment( - env="ALE/Breakout-v5", env_config={"frameskip": 1} + env="ale_py:ALE/Breakout-v5", env_config={"frameskip": 1} ) self.assertTrue(config.is_atari) - config = AlgorithmConfig().environment(env="ALE/Pong-v5") + config = AlgorithmConfig().environment(env="ale_py:ALE/Pong-v5") self.assertTrue(config.is_atari) config = AlgorithmConfig().environment(env="CartPole-v1") @@ -158,7 +158,7 @@ def test_detect_atari_env(self): config = AlgorithmConfig().environment( env=lambda ctx: gym.make( - "ALE/Breakout-v5", + "ale_py:ALE/Breakout-v5", frameskip=1, ) ) diff --git a/rllib/algorithms/tests/test_callbacks_on_env_runner.py b/rllib/algorithms/tests/test_callbacks_on_env_runner.py index 42abf7091841..ae8443b5b811 100644 --- a/rllib/algorithms/tests/test_callbacks_on_env_runner.py +++ b/rllib/algorithms/tests/test_callbacks_on_env_runner.py @@ -24,19 +24,19 @@ def on_environment_created(self, *args, env_runner, metrics_logger, env, **kwarg def on_episode_start(self, *args, env_runner, metrics_logger, env, **kwargs): assert isinstance(env_runner, EnvRunner) assert isinstance(metrics_logger, MetricsLogger) - assert isinstance(env, gym.Env) + assert isinstance(env, (gym.Env, gym.vector.VectorEnv)) self.counts.update({"start": 1}) def on_episode_step(self, *args, env_runner, metrics_logger, env, **kwargs): assert isinstance(env_runner, EnvRunner) assert isinstance(metrics_logger, MetricsLogger) - assert isinstance(env, gym.Env) + assert isinstance(env, (gym.Env, gym.vector.VectorEnv)) self.counts.update({"step": 1}) def on_episode_end(self, *args, env_runner, metrics_logger, env, **kwargs): assert isinstance(env_runner, EnvRunner) assert isinstance(metrics_logger, MetricsLogger) - assert isinstance(env, gym.Env) + assert isinstance(env, (gym.Env, gym.vector.VectorEnv)) self.counts.update({"end": 1}) def on_sample_end(self, *args, env_runner, metrics_logger, **kwargs): diff --git a/rllib/benchmarks/ppo/benchmark_atari_ppo.py b/rllib/benchmarks/ppo/benchmark_atari_ppo.py index 0b697ff4b902..e434f2ac078f 100644 --- a/rllib/benchmarks/ppo/benchmark_atari_ppo.py +++ b/rllib/benchmarks/ppo/benchmark_atari_ppo.py @@ -6,7 +6,7 @@ --num-gpus=4 --num-env-runners=95` In order to only run individual or lists of envs, you can provide a list of env-strings -under the `--env` arg, such as `--env ALE/Pong-v5,ALE/Breakout-v5`. +under the `--env` arg, such as `--env=ale_py:ALE/Pong-v5,ale_py:ALE/Breakout-v5`. For logging to your WandB account, use: `--wandb-key=[your WandB API key] --wandb-project=[some project name] @@ -34,60 +34,60 @@ # rainbow). # Note that for PPO, we simply run everything for 6M ts. benchmark_envs = { - "ALE/Alien-v5": (6022.9, 200000000), - "ALE/Amidar-v5": (202.8, 200000000), - "ALE/Assault-v5": (14491.7, 200000000), - "ALE/Asterix-v5": (280114.0, 200000000), - "ALE/Asteroids-v5": (2249.4, 200000000), - "ALE/Atlantis-v5": (814684.0, 200000000), - "ALE/BankHeist-v5": (826.0, 200000000), - "ALE/BattleZone-v5": (52040.0, 200000000), - "ALE/BeamRider-v5": (21768.5, 200000000), - "ALE/Berzerk-v5": (1793.4, 200000000), - "ALE/Bowling-v5": (39.4, 200000000), - "ALE/Boxing-v5": (54.9, 200000000), - "ALE/Breakout-v5": (379.5, 200000000), - "ALE/Centipede-v5": (7160.9, 200000000), - "ALE/ChopperCommand-v5": (10916.0, 200000000), - "ALE/CrazyClimber-v5": (143962.0, 200000000), - "ALE/Defender-v5": (47671.3, 200000000), - "ALE/DemonAttack-v5": (109670.7, 200000000), - "ALE/DoubleDunk-v5": (-0.6, 200000000), - "ALE/Enduro-v5": (2061.1, 200000000), - "ALE/FishingDerby-v5": (22.6, 200000000), - "ALE/Freeway-v5": (29.1, 200000000), - "ALE/Frostbite-v5": (4141.1, 200000000), - "ALE/Gopher-v5": (72595.7, 200000000), - "ALE/Gravitar-v5": (567.5, 200000000), - "ALE/Hero-v5": (50496.8, 200000000), - "ALE/IceHockey-v5": (-11685.8, 200000000), - "ALE/Kangaroo-v5": (10841.0, 200000000), - "ALE/Krull-v5": (6715.5, 200000000), - "ALE/KungFuMaster-v5": (28999.8, 200000000), - "ALE/MontezumaRevenge-v5": (154.0, 200000000), - "ALE/MsPacman-v5": (2570.2, 200000000), - "ALE/NameThisGame-v5": (11686.5, 200000000), - "ALE/Phoenix-v5": (103061.6, 200000000), - "ALE/Pitfall-v5": (-37.6, 200000000), - "ALE/Pong-v5": (19.0, 200000000), - "ALE/PrivateEye-v5": (1704.4, 200000000), - "ALE/Qbert-v5": (18397.6, 200000000), - "ALE/RoadRunner-v5": (54261.0, 200000000), - "ALE/Robotank-v5": (55.2, 200000000), - "ALE/Seaquest-v5": (19176.0, 200000000), - "ALE/Skiing-v5": (-11685.8, 200000000), - "ALE/Solaris-v5": (2860.7, 200000000), - "ALE/SpaceInvaders-v5": (12629.0, 200000000), - "ALE/StarGunner-v5": (123853.0, 200000000), - "ALE/Surround-v5": (7.0, 200000000), - "ALE/Tennis-v5": (-2.2, 200000000), - "ALE/TimePilot-v5": (11190.5, 200000000), - "ALE/Tutankham-v5": (126.9, 200000000), - "ALE/Venture-v5": (45.0, 200000000), - "ALE/VideoPinball-v5": (506817.2, 200000000), - "ALE/WizardOfWor-v5": (14631.5, 200000000), - "ALE/YarsRevenge-v5": (93007.9, 200000000), - "ALE/Zaxxon-v5": (19658.0, 200000000), + "ale_py:ALE/Alien-v5": (6022.9, 200000000), + "ale_py:ALE/Amidar-v5": (202.8, 200000000), + "ale_py:ALE/Assault-v5": (14491.7, 200000000), + "ale_py:ALE/Asterix-v5": (280114.0, 200000000), + "ale_py:ALE/Asteroids-v5": (2249.4, 200000000), + "ale_py:ALE/Atlantis-v5": (814684.0, 200000000), + "ale_py:ALE/BankHeist-v5": (826.0, 200000000), + "ale_py:ALE/BattleZone-v5": (52040.0, 200000000), + "ale_py:ALE/BeamRider-v5": (21768.5, 200000000), + "ale_py:ALE/Berzerk-v5": (1793.4, 200000000), + "ale_py:ALE/Bowling-v5": (39.4, 200000000), + "ale_py:ALE/Boxing-v5": (54.9, 200000000), + "ale_py:ALE/Breakout-v5": (379.5, 200000000), + "ale_py:ALE/Centipede-v5": (7160.9, 200000000), + "ale_py:ALE/ChopperCommand-v5": (10916.0, 200000000), + "ale_py:ALE/CrazyClimber-v5": (143962.0, 200000000), + "ale_py:ALE/Defender-v5": (47671.3, 200000000), + "ale_py:ALE/DemonAttack-v5": (109670.7, 200000000), + "ale_py:ALE/DoubleDunk-v5": (-0.6, 200000000), + "ale_py:ALE/Enduro-v5": (2061.1, 200000000), + "ale_py:ALE/FishingDerby-v5": (22.6, 200000000), + "ale_py:ALE/Freeway-v5": (29.1, 200000000), + "ale_py:ALE/Frostbite-v5": (4141.1, 200000000), + "ale_py:ALE/Gopher-v5": (72595.7, 200000000), + "ale_py:ALE/Gravitar-v5": (567.5, 200000000), + "ale_py:ALE/Hero-v5": (50496.8, 200000000), + "ale_py:ALE/IceHockey-v5": (-11685.8, 200000000), + "ale_py:ALE/Kangaroo-v5": (10841.0, 200000000), + "ale_py:ALE/Krull-v5": (6715.5, 200000000), + "ale_py:ALE/KungFuMaster-v5": (28999.8, 200000000), + "ale_py:ALE/MontezumaRevenge-v5": (154.0, 200000000), + "ale_py:ALE/MsPacman-v5": (2570.2, 200000000), + "ale_py:ALE/NameThisGame-v5": (11686.5, 200000000), + "ale_py:ALE/Phoenix-v5": (103061.6, 200000000), + "ale_py:ALE/Pitfall-v5": (-37.6, 200000000), + "ale_py:ALE/Pong-v5": (19.0, 200000000), + "ale_py:ALE/PrivateEye-v5": (1704.4, 200000000), + "ale_py:ALE/Qbert-v5": (18397.6, 200000000), + "ale_py:ALE/RoadRunner-v5": (54261.0, 200000000), + "ale_py:ALE/Robotank-v5": (55.2, 200000000), + "ale_py:ALE/Seaquest-v5": (19176.0, 200000000), + "ale_py:ALE/Skiing-v5": (-11685.8, 200000000), + "ale_py:ALE/Solaris-v5": (2860.7, 200000000), + "ale_py:ALE/SpaceInvaders-v5": (12629.0, 200000000), + "ale_py:ALE/StarGunner-v5": (123853.0, 200000000), + "ale_py:ALE/Surround-v5": (7.0, 200000000), + "ale_py:ALE/Tennis-v5": (-2.2, 200000000), + "ale_py:ALE/TimePilot-v5": (11190.5, 200000000), + "ale_py:ALE/Tutankham-v5": (126.9, 200000000), + "ale_py:ALE/Venture-v5": (45.0, 200000000), + "ale_py:ALE/VideoPinball-v5": (506817.2, 200000000), + "ale_py:ALE/WizardOfWor-v5": (14631.5, 200000000), + "ale_py:ALE/YarsRevenge-v5": (93007.9, 200000000), + "ale_py:ALE/Zaxxon-v5": (19658.0, 200000000), } diff --git a/rllib/benchmarks/torch_compile/run_inference_bm.py b/rllib/benchmarks/torch_compile/run_inference_bm.py index a92e49b9cb50..e15b87be5965 100644 --- a/rllib/benchmarks/torch_compile/run_inference_bm.py +++ b/rllib/benchmarks/torch_compile/run_inference_bm.py @@ -92,7 +92,7 @@ def main(pargs): json.dump(config, f) # Create the environment. - env = wrap_atari_for_new_api_stack(gym.make("ALE/Breakout-v5")) + env = wrap_atari_for_new_api_stack(gym.make("ale_py:ALE/Breakout-v5")) # setup RLModule model_cfg = MODEL_DEFAULTS.copy() diff --git a/rllib/benchmarks/torch_compile/run_ppo_with_inference_bm.py b/rllib/benchmarks/torch_compile/run_ppo_with_inference_bm.py index fa046b05285d..23c0cba79676 100644 --- a/rllib/benchmarks/torch_compile/run_ppo_with_inference_bm.py +++ b/rllib/benchmarks/torch_compile/run_ppo_with_inference_bm.py @@ -29,7 +29,7 @@ def main(pargs): config = ( PPOConfig() .environment( - "ALE/Breakout-v5", + "ale_py:ALE/Breakout-v5", clip_rewards=True, env_config={ "frameskip": 1, diff --git a/rllib/env/multi_agent_env_runner.py b/rllib/env/multi_agent_env_runner.py index 8cc4c6e4e2df..03b8105fbedb 100644 --- a/rllib/env/multi_agent_env_runner.py +++ b/rllib/env/multi_agent_env_runner.py @@ -90,7 +90,9 @@ def __init__(self, config: AlgorithmConfig, **kwargs): self.make_env() # Create the env-to-module connector pipeline. - self._env_to_module = self.config.build_env_to_module_connector(self.env) + self._env_to_module = self.config.build_env_to_module_connector( + self.env.unwrapped + ) # Cached env-to-module results taken at the end of a `_sample_timesteps()` # call to make sure the final observation (before an episode cut) gets properly # processed (and maybe postprocessed and re-stored into the episode). @@ -104,7 +106,7 @@ def __init__(self, config: AlgorithmConfig, **kwargs): # Construct the MultiRLModule. try: module_spec: MultiRLModuleSpec = self.config.get_multi_rl_module_spec( - env=self.env, spaces=self.get_spaces(), inference_only=True + env=self.env.unwrapped, spaces=self.get_spaces(), inference_only=True ) # Build the module from its spec. self.module = module_spec.build() @@ -114,7 +116,9 @@ def __init__(self, config: AlgorithmConfig, **kwargs): self.module = None # Create the two connector pipelines: env-to-module and module-to-env. - self._module_to_env = self.config.build_module_to_env_connector(self.env) + self._module_to_env = self.config.build_module_to_env_connector( + self.env.unwrapped + ) self._needs_initial_reset: bool = True self._episode: Optional[MultiAgentEpisode] = None @@ -259,7 +263,7 @@ def _sample_timesteps( to_env = { Columns.ACTIONS: [ { - aid: self.env.get_action_space(aid).sample() + aid: self.env.unwrapped.get_action_space(aid).sample() for aid in self._episode.get_agents_to_act() } ] @@ -461,7 +465,7 @@ def _sample_episodes( to_env = { Columns.ACTIONS: [ { - aid: self.env.get_action_space(aid).sample() + aid: self.env.unwrapped.get_action_space(aid).sample() for aid in self._episode.get_agents_to_act() } ] @@ -869,7 +873,7 @@ def make_env(self): self._callbacks.on_environment_created( env_runner=self, metrics_logger=self.metrics, - env=self.env, + env=self.env.unwrapped, env_context=env_ctx, ) @@ -889,11 +893,12 @@ def _setup_metrics(self): def _new_episode(self): return MultiAgentEpisode( observation_space={ - aid: self.env.get_observation_space(aid) - for aid in self.env.possible_agents + aid: self.env.unwrapped.get_observation_space(aid) + for aid in self.env.unwrapped.possible_agents }, action_space={ - aid: self.env.get_action_space(aid) for aid in self.env.possible_agents + aid: self.env.unwrapped.get_action_space(aid) + for aid in self.env.unwrapped.possible_agents }, agent_to_module_mapping_fn=self.config.policy_mapping_fn, ) @@ -904,7 +909,7 @@ def _make_on_episode_callback(self, which: str, episode=None): episode=episode, env_runner=self, metrics_logger=self.metrics, - env=self.env, + env=self.env.unwrapped, rl_module=self.module, env_index=0, ) diff --git a/rllib/env/single_agent_env_runner.py b/rllib/env/single_agent_env_runner.py index 967d4ec174b3..14bf1fd635b8 100644 --- a/rllib/env/single_agent_env_runner.py +++ b/rllib/env/single_agent_env_runner.py @@ -1,10 +1,12 @@ -import time from collections import defaultdict from functools import partial import logging +import time from typing import Collection, DefaultDict, List, Optional, Union import gymnasium as gym +from gymnasium.wrappers.vector import DictInfoToList +from gymnasium.envs.registration import VectorizeMode from ray.rllib.algorithms.algorithm_config import AlgorithmConfig from ray.rllib.algorithms.callbacks import DefaultCallbacks @@ -81,7 +83,7 @@ def __init__(self, config: AlgorithmConfig, **kwargs): self._callbacks: DefaultCallbacks = self.config.callbacks_class() # Create the vectorized gymnasium env. - self.env: Optional[gym.Wrapper] = None + self.env: Optional[gym.vector.VectorEnvWrapper] = None self.num_envs: int = 0 self.make_env() @@ -100,7 +102,7 @@ def __init__(self, config: AlgorithmConfig, **kwargs): # Create the RLModule. try: module_spec: RLModuleSpec = self.config.get_rl_module_spec( - env=self.env, spaces=self.get_spaces(), inference_only=True + env=self.env.unwrapped, spaces=self.get_spaces(), inference_only=True ) # Build the module from its spec. self.module = module_spec.build() @@ -186,7 +188,7 @@ def sample( # Sample n timesteps. if num_timesteps is not None: - samples = self._sample_timesteps( + samples = self._sample( num_timesteps=num_timesteps, explore=explore, random_actions=random_actions, @@ -194,19 +196,16 @@ def sample( ) # Sample m episodes. elif num_episodes is not None: - samples = self._sample_episodes( + samples = self._sample( num_episodes=num_episodes, explore=explore, random_actions=random_actions, ) - # For complete episodes mode, sample a single episode and - # leave coordination of sampling to `synchronous_parallel_sample`. - # TODO (simon, sven): The coordination will eventually move - # to `EnvRunnerGroup` in the future. So from the algorithm one - # would do `EnvRunnerGroup.sample()`. + # For complete episodes mode, sample as long as the number of timesteps + # done is smaller than the `train_batch_size`. else: - samples = self._sample_episodes( - num_episodes=1, + samples = self._sample( + num_episodes=self.num_envs, explore=explore, random_actions=random_actions, ) @@ -222,57 +221,40 @@ def sample( return samples - def _sample_timesteps( + def _sample( self, - num_timesteps: int, + *, + num_timesteps: Optional[int] = None, + num_episodes: Optional[int] = None, explore: bool, random_actions: bool = False, force_reset: bool = False, ) -> List[SingleAgentEpisode]: - """Helper method to sample n timesteps.""" + """Helper method to sample n timesteps or m episodes.""" done_episodes_to_return: List[SingleAgentEpisode] = [] # Have to reset the env (on all vector sub_envs). - if force_reset or self._needs_initial_reset: - # Create n new episodes. - # TODO (sven): Add callback `on_episode_created` as soon as - # `gymnasium-v1.0.0a2` PR is coming. - self._episodes = [] - for env_index in range(self.num_envs): - self._episodes.append(self._new_episode()) - self._shared_data = {} - - # Erase all cached ongoing episodes (these will never be completed and - # would thus never be returned/cleaned by `get_metrics` and cause a memory - # leak). - self._ongoing_episodes_for_metrics.clear() - - # Try resetting the environment. - # TODO (simon): Check, if we need here the seed from the config. - obs, infos = self._try_env_reset() - obs = unbatch(obs) - self._cached_to_module = None - - # Call `on_episode_start()` callbacks. - for env_index in range(self.num_envs): - self._make_on_episode_callback("on_episode_start", env_index) - + if force_reset or num_episodes is not None or self._needs_initial_reset: + episodes = self._episodes = [None for _ in range(self.num_envs)] + shared_data = self._shared_data = {} + self._reset_envs(episodes, shared_data, explore) # We just reset the env. Don't have to force this again in the next # call to `self._sample_timesteps()`. self._needs_initial_reset = False + else: + episodes = self._episodes + shared_data = self._shared_data - # Set initial obs and infos in the episodes. - for env_index in range(self.num_envs): - self._episodes[env_index].add_env_reset( - observation=obs[env_index], - infos=infos[env_index], - ) + if num_episodes is not None: + self._needs_initial_reset = True - # Loop through timesteps. + # Loop through `num_timesteps` timesteps or `num_episodes` episodes. ts = 0 - - while ts < num_timesteps: + eps = 0 + while ( + (ts < num_timesteps) if num_timesteps is not None else (eps < num_episodes) + ): # Act randomly. if random_actions: to_env = { @@ -280,13 +262,9 @@ def _sample_timesteps( } # Compute an action using the RLModule. else: - # Env-to-module connector. - to_module = self._cached_to_module or self._env_to_module( - rl_module=self.module, - episodes=self._episodes, - explore=explore, - shared_data=self._shared_data, - ) + # Env-to-module connector (already cached). + to_module = self._cached_to_module + assert to_module is not None self._cached_to_module = None # RLModule forward pass: Explore or not. @@ -305,9 +283,9 @@ def _sample_timesteps( to_env = self._module_to_env( rl_module=self.module, batch=to_env, - episodes=self._episodes, + episodes=episodes, explore=explore, - shared_data=self._shared_data, + shared_data=shared_data, ) # Extract the (vectorized) actions (to be sent to the env) from the @@ -320,264 +298,78 @@ def _sample_timesteps( # Try stepping the environment. results = self._try_env_step(actions_for_env) if results == ENV_STEP_FAILURE: - return self._sample_timesteps( + return self._sample( num_timesteps=num_timesteps, + num_episodes=num_episodes, explore=explore, random_actions=random_actions, force_reset=True, ) - obs, rewards, terminateds, truncateds, infos = results - obs, actions = unbatch(obs), unbatch(actions) - - ts += self.num_envs + observations, rewards, terminateds, truncateds, infos = results + observations, actions = unbatch(observations), unbatch(actions) + call_on_episode_start = set() for env_index in range(self.num_envs): - # TODO (simon): This might be unfortunate if a user needs to set a - # certain env parameter during different episodes (for example for - # benchmarking). extra_model_output = {k: v[env_index] for k, v in to_env.items()} extra_model_output[WEIGHTS_SEQ_NO] = self._weights_seq_no - # In inference, we have only the action logits. - if terminateds[env_index] or truncateds[env_index]: - # Finish the episode with the actual terminal observation stored in - # the info dict. - self._episodes[env_index].add_env_step( - # Gym vector env provides the `"final_observation"`. - # Pop these out of the infos dict so this information doesn't - # appear in the next episode as well (at index=0). - infos[env_index].pop("final_observation"), - actions[env_index], - rewards[env_index], - infos=infos[env_index].pop("final_info"), - terminated=terminateds[env_index], - truncated=truncateds[env_index], - extra_model_outputs=extra_model_output, - ) - # Make the `on_episode_step` and `on_episode_end` callbacks (before - # finalizing the episode object). - self._make_on_episode_callback("on_episode_step", env_index) - - # We have to perform an extra env-to-module pass here, just in case - # the user's connector pipeline performs (permanent) transforms - # on each observation (including this final one here). Without such - # a call and in case the structure of the observations change - # sufficiently, the following `finalize()` call on the episode will - # fail. - if self.module is not None: - self._env_to_module( - episodes=[self._episodes[env_index]], - explore=explore, - rl_module=self.module, - shared_data=self._shared_data, - ) - - self._make_on_episode_callback("on_episode_end", env_index) - - # Then finalize (numpy'ize) the episode. - done_episodes_to_return.append(self._episodes[env_index].finalize()) - - # Create a new episode object with already the reset data in it. - self._episodes[env_index] = SingleAgentEpisode( - observations=[obs[env_index]], - infos=[infos[env_index]], - observation_space=self.env.single_observation_space, - action_space=self.env.single_action_space, + # Episode has no data in it yet -> Was just reset and needs to be called + # with its `add_env_reset()` method. + if not self._episodes[env_index].is_reset: + episodes[env_index].add_env_reset( + observation=observations[env_index], + infos=infos[env_index], ) + call_on_episode_start.add(env_index) - # Make the `on_episode_start` callback. - self._make_on_episode_callback("on_episode_start", env_index) - + # Call `add_env_step()` method on episode. else: - self._episodes[env_index].add_env_step( - obs[env_index], - actions[env_index], - rewards[env_index], + # Only increase ts when we actually stepped (not reset'd as a reset + # does not count as a timestep). + ts += 1 + episodes[env_index].add_env_step( + observation=observations[env_index], + action=actions[env_index], + reward=rewards[env_index], infos=infos[env_index], + terminated=terminateds[env_index], + truncated=truncateds[env_index], extra_model_outputs=extra_model_output, ) - # Make the `on_episode_step` callback. - self._make_on_episode_callback("on_episode_step", env_index) - - # Already perform env-to-module connector call for next call to - # `_sample_timesteps()`. See comment in c'tor for `self._cached_to_module`. - if self.module is not None: - self._cached_to_module = self._env_to_module( - rl_module=self.module, - episodes=self._episodes, - explore=explore, - shared_data=self._shared_data, - ) - - # Return done episodes ... - # TODO (simon): Check, how much memory this attribute uses. - self._done_episodes_for_metrics.extend(done_episodes_to_return) - # ... and all ongoing episode chunks. - - # Also, make sure we start new episode chunks (continuing the ongoing episodes - # from the to-be-returned chunks). - ongoing_episodes_continuations = [ - eps.cut(len_lookback_buffer=self.config.episode_lookback_horizon) - for eps in self._episodes - ] - - ongoing_episodes_to_return = [] - for eps in self._episodes: - # Just started Episodes do not have to be returned. There is no data - # in them anyway. - if eps.t == 0: - continue - eps.validate() - self._ongoing_episodes_for_metrics[eps.id_].append(eps) - # Return finalized (numpy'ized) Episodes. - ongoing_episodes_to_return.append(eps.finalize()) - - # Continue collecting into the cut Episode chunks. - self._episodes = ongoing_episodes_continuations - - self._increase_sampled_metrics(ts) - - # Return collected episode data. - return done_episodes_to_return + ongoing_episodes_to_return - - def _sample_episodes( - self, - num_episodes: int, - explore: bool, - random_actions: bool = False, - ) -> List[SingleAgentEpisode]: - """Helper method to run n episodes. - - See docstring of `self.sample()` for more details. - """ - # If user calls sample(num_timesteps=..) after this, we must reset again - # at the beginning. - self._needs_initial_reset = True - - done_episodes_to_return: List[SingleAgentEpisode] = [] - - episodes = [] - for env_index in range(self.num_envs): - episodes.append(self._new_episode()) - # TODO (sven): Add callback `on_episode_created` as soon as - # `gymnasium-v1.0.0a2` PR is coming. - _shared_data = {} - - # Try resetting the environment. - # TODO (simon): Check, if we need here the seed from the config. - obs, infos = self._try_env_reset() - for env_index in range(self.num_envs): - episodes[env_index].add_env_reset( - observation=unbatch(obs)[env_index], - infos=infos[env_index], - ) - self._make_on_episode_callback("on_episode_start", env_index, episodes) - - # Loop over episodes. - eps = 0 - ts = 0 - while eps < num_episodes: - # Act randomly. - if random_actions: - to_env = { - Columns.ACTIONS: self.env.action_space.sample(), - } - # Compute an action using the RLModule. - else: - # Env-to-module connector. - to_module = self._env_to_module( - rl_module=self.module, + # Env-to-module connector pass (cache results as we will do the RLModule + # forward pass only in the next `while`-iteration. + if self.module is not None: + self._cached_to_module = self._env_to_module( episodes=episodes, explore=explore, - shared_data=_shared_data, - ) - - # RLModule forward pass: Explore or not. - if explore: - env_steps_lifetime = ( - self.metrics.peek(NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0) - + ts - ) - to_env = self.module.forward_exploration( - to_module, t=env_steps_lifetime - ) - else: - to_env = self.module.forward_inference(to_module) - - # Module-to-env connector. - to_env = self._module_to_env( rl_module=self.module, - batch=to_env, - episodes=episodes, - explore=explore, - shared_data=_shared_data, + shared_data=shared_data, ) - # Extract the (vectorized) actions (to be sent to the env) from the - # module/connector output. Note that these actions are fully ready (e.g. - # already unsquashed/clipped) to be sent to the environment) and might not - # be identical to the actions produced by the RLModule/distribution, which - # are the ones stored permanently in the episode objects. - actions = to_env.pop(Columns.ACTIONS) - actions_for_env = to_env.pop(Columns.ACTIONS_FOR_ENV, actions) - # Try stepping the environment. - results = self._try_env_step(actions_for_env) - if results == ENV_STEP_FAILURE: - return self._sample_episodes( - num_episodes=num_episodes, - explore=explore, - random_actions=random_actions, - ) - obs, rewards, terminateds, truncateds, infos = results - obs, actions = unbatch(obs), unbatch(actions) - ts += self.num_envs - for env_index in range(self.num_envs): - extra_model_output = {k: v[env_index] for k, v in to_env.items()} - extra_model_output[WEIGHTS_SEQ_NO] = self._weights_seq_no - - if terminateds[env_index] or truncateds[env_index]: - eps += 1 - - episodes[env_index].add_env_step( - infos[env_index].pop("final_observation"), - actions[env_index], - rewards[env_index], - infos=infos[env_index].pop("final_info"), - terminated=terminateds[env_index], - truncated=truncateds[env_index], - extra_model_outputs=extra_model_output, + # Call `on_episode_start()` callback (always after reset). + if env_index in call_on_episode_start: + self._make_on_episode_callback( + "on_episode_start", env_index, episodes ) - # Make `on_episode_step` and `on_episode_end` callbacks before - # finalizing the episode. + # Make the `on_episode_step` callbacks. + else: self._make_on_episode_callback( "on_episode_step", env_index, episodes ) - # We have to perform an extra env-to-module pass here, just in case - # the user's connector pipeline performs (permanent) transforms - # on each observation (including this final one here). Without such - # a call and in case the structure of the observations change - # sufficiently, the following `finalize()` call on the episode will - # fail. - if self.module is not None: - self._env_to_module( - episodes=[episodes[env_index]], - explore=explore, - rl_module=self.module, - shared_data=_shared_data, - ) - - # Make the `on_episode_end` callback (before finalizing the episode, - # but after(!) the last env-to-module connector call has been made. - # -> All obs (even the terminal one) should have been processed now - # (by the connector, if applicable). + # Episode is done. + if episodes[env_index].is_done: + eps += 1 + + # Make the `on_episode_end` callbacks (before finalizing the episode + # object). self._make_on_episode_callback( "on_episode_end", env_index, episodes ) - # Finalize (numpy'ize) the episode. + # Then finalize (numpy'ize) the episode. done_episodes_to_return.append(episodes[env_index].finalize()) # Also early-out if we reach the number of episodes within this @@ -585,38 +377,46 @@ def _sample_episodes( if eps == num_episodes: break - # Create a new episode object. + # Create a new episode object with no data in it and execute + # `on_episode_created` callback (before the `env.reset()` call). episodes[env_index] = SingleAgentEpisode( - observations=[obs[env_index]], - infos=[infos[env_index]], observation_space=self.env.single_observation_space, action_space=self.env.single_action_space, ) - # Make `on_episode_start` callback. - self._make_on_episode_callback( - "on_episode_start", env_index, episodes - ) - else: - episodes[env_index].add_env_step( - obs[env_index], - actions[env_index], - rewards[env_index], - infos=infos[env_index], - extra_model_outputs=extra_model_output, - ) - # Make `on_episode_step` callback. - self._make_on_episode_callback( - "on_episode_step", env_index, episodes - ) + # Return done episodes ... + # TODO (simon): Check, how much memory this attribute uses. self._done_episodes_for_metrics.extend(done_episodes_to_return) + # ... and all ongoing episode chunks. - # Initialized episodes have to be removed as they lack `extra_model_outputs`. - samples = [episode for episode in done_episodes_to_return if episode.t > 0] + # Also, make sure we start new episode chunks (continuing the ongoing episodes + # from the to-be-returned chunks). + ongoing_episodes_to_return = [] + # Only if we are doing individual timesteps: We have to maybe cut an ongoing + # episode and continue building it on the next call to `sample()`. + if num_timesteps is not None: + ongoing_episodes_continuations = [ + eps.cut(len_lookback_buffer=self.config.episode_lookback_horizon) + for eps in self._episodes + ] + + for eps in self._episodes: + # Just started Episodes do not have to be returned. There is no data + # in them anyway. + if eps.t == 0: + continue + eps.validate() + self._ongoing_episodes_for_metrics[eps.id_].append(eps) + # Return finalized (numpy'ized) Episodes. + ongoing_episodes_to_return.append(eps.finalize()) + + # Continue collecting into the cut Episode chunks. + self._episodes = ongoing_episodes_continuations self._increase_sampled_metrics(ts) - return samples + # Return collected episode data. + return done_episodes_to_return + ongoing_episodes_to_return @override(EnvRunner) def get_spaces(self): @@ -820,12 +620,15 @@ def make_env(self) -> None: ) gym.register("rllib-single-agent-env-v0", entry_point=entry_point) - # Wrap into `VectorListInfo`` wrapper to get infos as lists. - self.env: gym.Wrapper = gym.wrappers.VectorListInfo( - gym.vector.make( + self.env = DictInfoToList( + gym.make_vec( "rllib-single-agent-env-v0", num_envs=self.config.num_envs_per_env_runner, - asynchronous=self.config.remote_worker_envs, + vectorization_mode=( + VectorizeMode.ASYNC + if self.config.remote_worker_envs + else VectorizeMode.SYNC + ), ) ) @@ -839,7 +642,7 @@ def make_env(self) -> None: self._callbacks.on_environment_created( env_runner=self, metrics_logger=self.metrics, - env=self.env, + env=self.env.unwrapped, env_context=env_ctx, ) @@ -848,19 +651,57 @@ def stop(self): # Close our env object via gymnasium's API. self.env.close() - def _new_episode(self): - return SingleAgentEpisode( + def _reset_envs(self, episodes, shared_data, explore): + # Create n new episodes and make the `on_episode_created` callbacks. + for env_index in range(self.num_envs): + self._new_episode(env_index, episodes) + + # Erase all cached ongoing episodes (these will never be completed and + # would thus never be returned/cleaned by `get_metrics` and cause a memory + # leak). + self._ongoing_episodes_for_metrics.clear() + + # Try resetting the environment. + # TODO (simon): Check, if we need here the seed from the config. + observations, infos = self._try_env_reset() + observations = unbatch(observations) + + # Set initial obs and infos in the episodes. + for env_index in range(self.num_envs): + episodes[env_index].add_env_reset( + observation=observations[env_index], + infos=infos[env_index], + ) + + # Run the env-to-module connector to make sure the reset-obs/infos have + # properly been processed (if applicable). + self._cached_to_module = None + if self.module: + self._cached_to_module = self._env_to_module( + rl_module=self.module, + episodes=episodes, + explore=explore, + shared_data=shared_data, + ) + + # Call `on_episode_start()` callbacks (always after reset). + for env_index in range(self.num_envs): + self._make_on_episode_callback("on_episode_start", env_index, episodes) + + def _new_episode(self, env_index, episodes=None): + episodes = episodes if episodes is not None else self._episodes + episodes[env_index] = SingleAgentEpisode( observation_space=self.env.single_observation_space, action_space=self.env.single_action_space, ) + self._make_on_episode_callback("on_episode_created", env_index, episodes) - def _make_on_episode_callback(self, which: str, idx: int, episodes=None): - episodes = episodes if episodes is not None else self._episodes + def _make_on_episode_callback(self, which: str, idx: int, episodes): getattr(self._callbacks, which)( episode=episodes[idx], env_runner=self, metrics_logger=self.metrics, - env=self.env, + env=self.env.unwrapped, rl_module=self.module, env_index=idx, ) diff --git a/rllib/env/single_agent_episode.py b/rllib/env/single_agent_episode.py index dd4f48039470..b11cdd678374 100644 --- a/rllib/env/single_agent_episode.py +++ b/rllib/env/single_agent_episode.py @@ -362,6 +362,7 @@ def add_env_reset( observation: The initial observation returned by `env.reset()`. infos: An (optional) info dict returned by `env.reset()`. """ + assert not self.is_reset assert not self.is_done assert len(self.observations) == 0 # Assume that this episode is completely empty and has not stepped yet. @@ -485,6 +486,11 @@ def validate(self) -> None: for k, v in self.extra_model_outputs.items(): assert len(v) == len(self.observations) - 1 + @property + def is_reset(self) -> bool: + """Returns True if `self.add_env_reset()` has already been called.""" + return len(self.observations) > 0 + @property def is_finalized(self) -> bool: """True, if the data in this episode is already stored as numpy arrays.""" diff --git a/rllib/env/tests/test_single_agent_env_runner.py b/rllib/env/tests/test_single_agent_env_runner.py index d6dbf7082985..4d5f8808aa84 100644 --- a/rllib/env/tests/test_single_agent_env_runner.py +++ b/rllib/env/tests/test_single_agent_env_runner.py @@ -9,6 +9,7 @@ from ray.rllib.env.single_agent_env_runner import SingleAgentEnvRunner from ray.rllib.env.utils import _gym_env_creator from ray.rllib.examples.envs.classes.simple_corridor import SimpleCorridor +from ray.rllib.utils.test_utils import check class TestSingleAgentEnvRunner(unittest.TestCase): @@ -53,7 +54,7 @@ def test_sample(self): # Sample 10 episodes (5 per env) 100 times. for _ in range(100): episodes = env_runner.sample(num_episodes=10, random_actions=True) - self.assertTrue(len(episodes) == 10) + check(len(episodes), 10) # Since we sampled complete episodes, there should be no ongoing episodes # being returned. self.assertTrue(all(e.is_done for e in episodes)) @@ -61,20 +62,22 @@ def test_sample(self): # Sample 10 timesteps (5 per env) 100 times. for _ in range(100): episodes = env_runner.sample(num_timesteps=10, random_actions=True) - # Check, whether the sum of lengths of all episodes returned is 20 - self.assertTrue(sum(len(e) for e in episodes) == 10) + # Check the sum of lengths of all episodes returned. + sum_ = sum(map(len, episodes)) + self.assertTrue(sum_ in [10, 11]) # Sample (by default setting: rollout_fragment_length=64) 10 times. for _ in range(100): episodes = env_runner.sample(random_actions=True) # Check, whether the sum of lengths of all episodes returned is 128 # 2 (num_env_per_worker) * 64 (rollout_fragment_length). - self.assertTrue(sum(len(e) for e in episodes) == 128) + sum_ = sum(map(len, episodes)) + self.assertTrue(sum_ in [128, 129]) def test_async_vector_env(self): """Tests, whether SingleAgentGymEnvRunner can run with vector envs.""" - for env in ["TestEnv-v0", "CartPole-v1", SimpleCorridor, "tune-registered"]: + for env in ["CartPole-v1", SimpleCorridor, "tune-registered"]: config = ( AlgorithmConfig().environment(env) # Vectorize x5 and by default, rollout 64 timesteps per individual env. @@ -110,7 +113,7 @@ def test_distributed_env_runner(self): for env_spec in ["tune-registered", "CartPole-v1", SimpleCorridor]: config = ( AlgorithmConfig().environment(env_spec) - # Vectorize x5 and by default, rollout 64 timesteps per individual + # Vectorize x5 and by default, rollout 10 timesteps per individual # env. .env_runners( num_env_runners=5, @@ -129,9 +132,14 @@ def test_distributed_env_runner(self): # Loop over individual EnvRunner Actor's results and inspect each. for episodes in results: # Assert length of all fragments is `rollout_fragment_length`. - self.assertEqual( + self.assertIn( sum(len(e) for e in episodes), - config.num_envs_per_env_runner * config.rollout_fragment_length, + [ + config.num_envs_per_env_runner + * config.rollout_fragment_length + + i + for i in range(config.num_envs_per_env_runner) + ], ) diff --git a/rllib/env/utils/__init__.py b/rllib/env/utils/__init__.py index 67dc49efd76b..09dfbe227e5a 100644 --- a/rllib/env/utils/__init__.py +++ b/rllib/env/utils/__init__.py @@ -103,6 +103,13 @@ def _gym_env_creator( except (AttributeError, ModuleNotFoundError, ImportError): pass + # If env descriptor is a str, starting with "ale_py:ALE/", for now, register all ALE + # envs from ale_py. + if isinstance(env_descriptor, str) and env_descriptor.startswith("ale_py:ALE/"): + import ale_py + + gym.register_envs(ale_py) + # Try creating a gym env. If this fails we can output a # decent error message. try: diff --git a/rllib/env/wrappers/atari_wrappers.py b/rllib/env/wrappers/atari_wrappers.py index 2edefd58208b..3bb0f3ff7719 100644 --- a/rllib/env/wrappers/atari_wrappers.py +++ b/rllib/env/wrappers/atari_wrappers.py @@ -13,7 +13,8 @@ def is_atari(env: Union[gym.Env, str]) -> bool: """Returns, whether a given env object or env descriptor (str) is an Atari env. Args: - env: The gym.Env object or a string descriptor of the env (e.g. "ALE/Pong-v5"). + env: The gym.Env object or a string descriptor of the env (for example, + "ale_py:ALE/Pong-v5"). Returns: Whether `env` is an Atari environment. @@ -28,9 +29,9 @@ def is_atari(env: Union[gym.Env, str]) -> bool: ): return False return "AtariEnv None: - """Initializes a Kaggle football environment. - - Args: - configuration (Optional[Dict[str, Any]]): configuration of the - football environment. For detailed information, see: - https://github.com/Kaggle/kaggle-environments/blob/master/kaggle_\ - environments/envs/football/football.json - """ - super().__init__() - self.kaggle_env = kaggle_environments.make( - "football", configuration=configuration or {} - ) - self.last_cumulative_reward = None - - def reset( - self, - *, - seed: Optional[int] = None, - options: Optional[dict] = None, - ) -> Tuple[MultiAgentDict, MultiAgentDict]: - kaggle_state = self.kaggle_env.reset() - self.last_cumulative_reward = None - return { - f"agent{idx}": self._convert_obs(agent_state["observation"]) - for idx, agent_state in enumerate(kaggle_state) - if agent_state["status"] == "ACTIVE" - }, {} - - def step( - self, action_dict: Dict[AgentID, int] - ) -> Tuple[ - MultiAgentDict, MultiAgentDict, MultiAgentDict, MultiAgentDict, MultiAgentDict - ]: - # Convert action_dict (used by RLlib) to a list of actions (used by - # kaggle_environments) - action_list = [None] * len(self.kaggle_env.state) - for idx, agent_state in enumerate(self.kaggle_env.state): - if agent_state["status"] == "ACTIVE": - action = action_dict[f"agent{idx}"] - action_list[idx] = [action] - self.kaggle_env.step(action_list) - - # Parse (obs, reward, terminated, truncated, info) from kaggle's "state" - # representation. - obs = {} - cumulative_reward = {} - terminated = {"__all__": self.kaggle_env.done} - truncated = {"__all__": False} - info = {} - for idx in range(len(self.kaggle_env.state)): - agent_state = self.kaggle_env.state[idx] - agent_name = f"agent{idx}" - if agent_state["status"] == "ACTIVE": - obs[agent_name] = self._convert_obs(agent_state["observation"]) - cumulative_reward[agent_name] = agent_state["reward"] - terminated[agent_name] = agent_state["status"] != "ACTIVE" - truncated[agent_name] = False - info[agent_name] = agent_state["info"] - # Compute the step rewards from the cumulative rewards - if self.last_cumulative_reward is not None: - reward = { - agent_id: agent_reward - self.last_cumulative_reward[agent_id] - for agent_id, agent_reward in cumulative_reward.items() - } - else: - reward = cumulative_reward - self.last_cumulative_reward = cumulative_reward - return obs, reward, terminated, truncated, info - - def _convert_obs(self, obs: Dict[str, Any]) -> Dict[str, Any]: - """Convert raw observations - - These conversions are necessary to make the observations fall into the - observation space defined below. - """ - new_obs = deepcopy(obs) - if new_obs["players_raw"][0]["ball_owned_team"] == -1: - new_obs["players_raw"][0]["ball_owned_team"] = 2 - if new_obs["players_raw"][0]["ball_owned_player"] == -1: - new_obs["players_raw"][0]["ball_owned_player"] = 11 - new_obs["players_raw"][0]["steps_left"] = [ - new_obs["players_raw"][0]["steps_left"] - ] - return new_obs - - def build_agent_spaces(self) -> Tuple[Space, Space]: - """Construct the action and observation spaces - - Description of actions and observations: - https://github.com/google-research/football/blob/master/gfootball/doc/ - observation.md - """ # noqa: E501 - action_space = Discrete(19) - # The football field's corners are [+-1., +-0.42]. However, the players - # and balls may get out of the field. Thus we multiply those limits by - # a factor of 2. - xlim = 1.0 * 2 - ylim = 0.42 * 2 - num_players: int = 11 - xy_space = Box( - np.array([-xlim, -ylim], dtype=np.float32), - np.array([xlim, ylim], dtype=np.float32), - ) - xyz_space = Box( - np.array([-xlim, -ylim, 0], dtype=np.float32), - np.array([xlim, ylim, np.inf], dtype=np.float32), - ) - observation_space = DictSpace( - { - "controlled_players": Discrete(2), - "players_raw": TupleSpace( - [ - DictSpace( - { - # ball information - "ball": xyz_space, - "ball_direction": Box(-np.inf, np.inf, (3,)), - "ball_rotation": Box(-np.inf, np.inf, (3,)), - "ball_owned_team": Discrete(3), - "ball_owned_player": Discrete(num_players + 1), - # left team - "left_team": TupleSpace([xy_space] * num_players), - "left_team_direction": TupleSpace( - [xy_space] * num_players - ), - "left_team_tired_factor": Box(0.0, 1.0, (num_players,)), - "left_team_yellow_card": MultiBinary(num_players), - "left_team_active": MultiBinary(num_players), - "left_team_roles": MultiDiscrete([10] * num_players), - # right team - "right_team": TupleSpace([xy_space] * num_players), - "right_team_direction": TupleSpace( - [xy_space] * num_players - ), - "right_team_tired_factor": Box( - 0.0, 1.0, (num_players,) - ), - "right_team_yellow_card": MultiBinary(num_players), - "right_team_active": MultiBinary(num_players), - "right_team_roles": MultiDiscrete([10] * num_players), - # controlled player information - "active": Discrete(num_players), - "designated": Discrete(num_players), - "sticky_actions": MultiBinary(10), - # match state - "score": Box(-np.inf, np.inf, (2,)), - "steps_left": Box(0, np.inf, (1,)), - "game_mode": Discrete(7), - } - ) - ] - ), - } - ) - return action_space, observation_space diff --git a/rllib/env/wrappers/model_vector_env.py b/rllib/env/wrappers/model_vector_env.py deleted file mode 100644 index 8facedab25e8..000000000000 --- a/rllib/env/wrappers/model_vector_env.py +++ /dev/null @@ -1,164 +0,0 @@ -import logging -from gymnasium.spaces import Discrete -import numpy as np - -from ray.rllib.utils.annotations import override -from ray.rllib.env.vector_env import VectorEnv -from ray.rllib.evaluation.rollout_worker import get_global_worker -from ray.rllib.env.base_env import BaseEnv, convert_to_base_env -from ray.rllib.utils.typing import EnvType - -logger = logging.getLogger(__name__) - - -def model_vector_env(env: EnvType) -> BaseEnv: - """Returns a VectorizedEnv wrapper around the given environment. - - To obtain worker configs, one can call get_global_worker(). - - Args: - env: The input environment (of any supported environment - type) to be convert to a _VectorizedModelGymEnv (wrapped as - an RLlib BaseEnv). - - Returns: - BaseEnv: The BaseEnv converted input `env`. - """ - worker = get_global_worker() - worker_index = worker.worker_index - if worker_index: - env = _VectorizedModelGymEnv( - make_env=worker.make_sub_env_fn, - existing_envs=[env], - num_envs=worker.config.num_envs_per_env_runner, - observation_space=env.observation_space, - action_space=env.action_space, - ) - return convert_to_base_env( - env, - make_env=worker.make_sub_env_fn, - num_envs=worker.config.num_envs_per_env_runner, - remote_envs=False, - remote_env_batch_wait_ms=0, - ) - - -class _VectorizedModelGymEnv(VectorEnv): - """Vectorized Environment Wrapper for MB-MPO. - - Primary change is in the `vector_step` method, which calls the dynamics - models for next_obs "calculation" (instead of the actual env). Also, the - actual envs need to have two extra methods implemented: `reward(obs)` and - (optionally) `done(obs)`. If `done` is not implemented, we will assume - that episodes in the env do not terminate, ever. - """ - - def __init__( - self, - make_env=None, - existing_envs=None, - num_envs=1, - *, - observation_space=None, - action_space=None, - env_config=None - ): - self.make_env = make_env - self.envs = existing_envs - self.num_envs = num_envs - while len(self.envs) < num_envs: - self.envs.append(self.make_env(len(self.envs))) - self._timesteps = [0 for _ in range(self.num_envs)] - self.cur_obs = [None for _ in range(self.num_envs)] - - super().__init__( - observation_space=observation_space or self.envs[0].observation_space, - action_space=action_space or self.envs[0].action_space, - num_envs=num_envs, - ) - worker = get_global_worker() - self.model, self.device = worker.foreach_policy( - lambda x, y: (x.dynamics_model, x.device) - )[0] - - @override(VectorEnv) - def vector_reset(self, *, seeds=None, options=None): - """Override parent to store actual env obs for upcoming predictions.""" - seeds = seeds or [None] * self.num_envs - options = options or [None] * self.num_envs - reset_results = [ - e.reset(seed=seeds[i], options=options[i]) for i, e in enumerate(self.envs) - ] - self.cur_obs = [io[0] for io in reset_results] - infos = [io[1] for io in reset_results] - self._timesteps = [0 for _ in range(self.num_envs)] - return self.cur_obs, infos - - @override(VectorEnv) - def reset_at(self, index, *, seed=None, options=None): - """Override parent to store actual env obs for upcoming predictions.""" - obs, infos = self.envs[index].reset(seed=seed, options=options) - self.cur_obs[index] = obs - self._timesteps[index] = 0 - return obs, infos - - @override(VectorEnv) - def vector_step(self, actions): - if self.cur_obs is None: - raise ValueError("Need to reset env first") - - for idx in range(self.num_envs): - self._timesteps[idx] += 1 - - # If discrete, need to one-hot actions - if isinstance(self.action_space, Discrete): - act = np.array(actions) - new_act = np.zeros((act.size, act.max() + 1)) - new_act[np.arange(act.size), act] = 1 - actions = new_act.astype("float32") - - # Batch the TD-model prediction. - obs_batch = np.stack(self.cur_obs, axis=0) - action_batch = np.stack(actions, axis=0) - # Predict the next observation, given previous a) real obs - # (after a reset), b) predicted obs (any other time). - next_obs_batch = self.model.predict_model_batches( - obs_batch, action_batch, device=self.device - ) - next_obs_batch = np.clip(next_obs_batch, -1000, 1000) - - # Call env's reward function. - # Note: Each actual env must implement one to output exact rewards. - rew_batch = self.envs[0].reward(obs_batch, action_batch, next_obs_batch) - - # If env has a `done` method, use it. - if hasattr(self.envs[0], "done"): - dones_batch = self.envs[0].done(next_obs_batch) - # Our sub-environments have timestep limits. - elif hasattr(self.envs[0], "_max_episode_steps"): - dones_batch = np.array( - [ - self._timesteps[idx] >= self.envs[0]._max_episode_steps - for idx in range(self.num_envs) - ] - ) - # Otherwise, assume the episode does not end. - else: - dones_batch = np.asarray([False for _ in range(self.num_envs)]) - truncateds_batch = [False for _ in range(self.num_envs)] - - info_batch = [{} for _ in range(self.num_envs)] - - self.cur_obs = next_obs_batch - - return ( - list(next_obs_batch), - list(rew_batch), - list(dones_batch), - truncateds_batch, - info_batch, - ) - - @override(VectorEnv) - def get_sub_environments(self): - return self.envs diff --git a/rllib/env/wrappers/recsim.py b/rllib/env/wrappers/recsim.py deleted file mode 100644 index b1d3e749e514..000000000000 --- a/rllib/env/wrappers/recsim.py +++ /dev/null @@ -1,270 +0,0 @@ -"""Tools and utils to create RLlib-ready recommender system envs using RecSim. - -For examples on how to generate a RecSim env class (usable in RLlib): -See ray.rllib.examples.envs.classes.recommender_system_envs_with_recsim.py - -For more information on google's RecSim itself: -https://github.com/google-research/recsim -""" - -from collections import OrderedDict -import gymnasium as gym -from gymnasium.spaces import Dict, Discrete, MultiDiscrete -from gymnasium.wrappers import EnvCompatibility -import numpy as np -from recsim.document import AbstractDocumentSampler -from recsim.simulator import environment, recsim_gym -from recsim.user import AbstractUserModel, AbstractResponse -from typing import Callable, List, Optional, Type - -from ray.rllib.env.env_context import EnvContext -from ray.rllib.utils.error import UnsupportedSpaceException -from ray.rllib.utils.spaces.space_utils import convert_element_to_space_type - - -class RecSimObservationSpaceWrapper(gym.ObservationWrapper): - """Fix RecSim environment's observation space - - In RecSim's observation spaces, the "doc" field is a dictionary keyed by - document IDs. Those IDs are changing every step, thus generating a - different observation space in each time. This causes issues for RLlib - because it expects the observation space to remain the same across steps. - - This environment wrapper fixes that by reindexing the documents by their - positions in the list. - """ - - def __init__(self, env: gym.Env): - super().__init__(env) - obs_space = self.env.observation_space - doc_space = Dict( - OrderedDict( - [ - (str(k), doc) - for k, (_, doc) in enumerate(obs_space["doc"].spaces.items()) - ] - ) - ) - self.observation_space = Dict( - OrderedDict( - [ - ("user", obs_space["user"]), - ("doc", doc_space), - ("response", obs_space["response"]), - ] - ) - ) - self._sampled_obs = self.observation_space.sample() - self.action_space = self.env.action_space - - def observation(self, obs): - new_obs = OrderedDict() - new_obs["user"] = obs["user"] - new_obs["doc"] = {str(k): v for k, (_, v) in enumerate(obs["doc"].items())} - new_obs["response"] = obs["response"] - new_obs = convert_element_to_space_type(new_obs, self._sampled_obs) - return new_obs - - -class RecSimObservationBanditWrapper(gym.ObservationWrapper): - """Fix RecSim environment's observation format - - RecSim's observations are keyed by document IDs, and nested under - "doc" key. - Our Bandits agent expects the observations to be flat 2D array - and under "item" key. - - This environment wrapper converts obs into the right format. - """ - - def __init__(self, env: gym.Env): - super().__init__(env) - obs_space = self.env.observation_space - - num_items = len(obs_space["doc"]) - embedding_dim = next(iter(obs_space["doc"].values())).shape[-1] - self.observation_space = Dict( - OrderedDict( - [ - ( - "item", - gym.spaces.Box( - low=-1.0, high=1.0, shape=(num_items, embedding_dim) - ), - ), - ] - ) - ) - self._sampled_obs = self.observation_space.sample() - self.action_space = self.env.action_space - - def observation(self, obs): - new_obs = OrderedDict() - new_obs["item"] = np.vstack(list(obs["doc"].values())) - new_obs = convert_element_to_space_type(new_obs, self._sampled_obs) - return new_obs - - -class RecSimResetWrapper(gym.Wrapper): - """Fix RecSim environment's reset() and close() function - - RecSim's reset() function returns an observation without the "response" - field, breaking RLlib's check. This wrapper fixes that by assigning a - random "response". - - RecSim's close() function raises NotImplementedError. We change the - behavior to doing nothing. - """ - - def __init__(self, env: gym.Env): - super().__init__(env) - self._sampled_obs = self.env.observation_space.sample() - - def reset(self, *, seed=None, options=None): - obs, info = super().reset() - obs["response"] = self.env.observation_space["response"].sample() - obs = convert_element_to_space_type(obs, self._sampled_obs) - return obs, info - - def close(self): - pass - - -class MultiDiscreteToDiscreteActionWrapper(gym.ActionWrapper): - """Convert the action space from MultiDiscrete to Discrete - - At this moment, RLlib's DQN algorithms only work on Discrete action space. - This wrapper allows us to apply DQN algorithms to the RecSim environment. - """ - - def __init__(self, env: gym.Env): - super().__init__(env) - - if not isinstance(env.action_space, MultiDiscrete): - raise UnsupportedSpaceException( - f"Action space {env.action_space} " - f"is not supported by {self.__class__.__name__}" - ) - self.action_space_dimensions = env.action_space.nvec - self.action_space = Discrete(np.prod(self.action_space_dimensions)) - - def action(self, action: int) -> List[int]: - """Convert a Discrete action to a MultiDiscrete action""" - multi_action = [None] * len(self.action_space_dimensions) - for idx, n in enumerate(self.action_space_dimensions): - action, dim_action = divmod(action, n) - multi_action[idx] = dim_action - return multi_action - - -def recsim_gym_wrapper( - recsim_gym_env: gym.Env, - convert_to_discrete_action_space: bool = False, - wrap_for_bandits: bool = False, -) -> gym.Env: - """Makes sure a RecSim gym.Env can ba handled by RLlib. - - In RecSim's observation spaces, the "doc" field is a dictionary keyed by - document IDs. Those IDs are changing every step, thus generating a - different observation space in each time. This causes issues for RLlib - because it expects the observation space to remain the same across steps. - - Also, RecSim's reset() function returns an observation without the - "response" field, breaking RLlib's check. This wrapper fixes that by - assigning a random "response". - - Args: - recsim_gym_env: The RecSim gym.Env instance. Usually resulting from a - raw RecSim env having been passed through RecSim's utility function: - `recsim.simulator.recsim_gym.RecSimGymEnv()`. - convert_to_discrete_action_space: Optional bool indicating, whether - the action space of the created env class should be Discrete - (rather than MultiDiscrete, even if slate size > 1). This is useful - for algorithms that don't support MultiDiscrete action spaces, - such as RLlib's DQN. If None, `convert_to_discrete_action_space` - may also be provided via the EnvContext (config) when creating an - actual env instance. - wrap_for_bandits: Bool indicating, whether this RecSim env should be - wrapped for use with our Bandits agent. - - Returns: - An RLlib-ready gym.Env instance. - """ - env = RecSimResetWrapper(recsim_gym_env) - env = RecSimObservationSpaceWrapper(env) - if convert_to_discrete_action_space: - env = MultiDiscreteToDiscreteActionWrapper(env) - if wrap_for_bandits: - env = RecSimObservationBanditWrapper(env) - return env - - -def make_recsim_env( - recsim_user_model_creator: Callable[[EnvContext], AbstractUserModel], - recsim_document_sampler_creator: Callable[[EnvContext], AbstractDocumentSampler], - reward_aggregator: Callable[[List[AbstractResponse]], float], -) -> Type[gym.Env]: - """Creates a RLlib-ready gym.Env class given RecSim user and doc models. - - See https://github.com/google-research/recsim for more information on how to - build the required components from scratch in python using RecSim. - - Args: - recsim_user_model_creator: A callable taking an EnvContext and returning - a RecSim AbstractUserModel instance to use. - recsim_document_sampler_creator: A callable taking an EnvContext and - returning a RecSim AbstractDocumentSampler - to use. This will include a AbstractDocument as well. - reward_aggregator: Callable taking a list of RecSim - AbstractResponse instances and returning a float (aggregated - reward). - - Returns: - An RLlib-ready gym.Env class to use inside an Algorithm. - """ - - class _RecSimEnv(gym.Wrapper): - def __init__(self, config: Optional[EnvContext] = None): - - # Override with default values, in case they are not set by the user. - default_config = { - "num_candidates": 10, - "slate_size": 2, - "resample_documents": True, - "seed": 0, - "convert_to_discrete_action_space": False, - "wrap_for_bandits": False, - } - if config is None or isinstance(config, dict): - config = EnvContext(config or default_config, worker_index=0) - config.set_defaults(default_config) - - # Create the RecSim user model instance. - recsim_user_model = recsim_user_model_creator(config) - # Create the RecSim document sampler instance. - recsim_document_sampler = recsim_document_sampler_creator(config) - - # Create a raw RecSim environment (not yet a gym.Env!). - raw_recsim_env = environment.SingleUserEnvironment( - recsim_user_model, - recsim_document_sampler, - config["num_candidates"], - config["slate_size"], - resample_documents=config["resample_documents"], - ) - # Convert raw RecSim env to a gym.Env. - gym_env = recsim_gym.RecSimGymEnv(raw_recsim_env, reward_aggregator) - # Wrap for the new gym API (RecSim does not support this). - gym_env = EnvCompatibility(gym_env) - - # Fix observation space and - if necessary - convert to discrete - # action space (from multi-discrete). - env = recsim_gym_wrapper( - gym_env, - config["convert_to_discrete_action_space"], - config["wrap_for_bandits"], - ) - # Call the super (Wrapper constructor) passing it the created env. - super().__init__(env=env) - - return _RecSimEnv diff --git a/rllib/env/wrappers/recsim_wrapper.py b/rllib/env/wrappers/recsim_wrapper.py deleted file mode 100644 index 3251ea1a3a3e..000000000000 --- a/rllib/env/wrappers/recsim_wrapper.py +++ /dev/null @@ -1,14 +0,0 @@ -# Deprecated module: Use ray.rllib.env.wrappers.recsim instead! -from ray.rllib.env.wrappers.recsim import ( # noqa: F401 - make_recsim_env, - MultiDiscreteToDiscreteActionWrapper, - RecSimObservationSpaceWrapper, - RecSimResetWrapper, -) -from ray.rllib.utils.deprecation import deprecation_warning - -deprecation_warning( - old="ray.rllib.env.wrappers.recsim_wrapper", - new="ray.rllib.env.wrappers.recsim", - error=True, -) diff --git a/rllib/env/wrappers/uncertainty_wrappers.py b/rllib/env/wrappers/uncertainty_wrappers.py deleted file mode 100644 index e8e2d1fa4833..000000000000 --- a/rllib/env/wrappers/uncertainty_wrappers.py +++ /dev/null @@ -1,23 +0,0 @@ -########## -# Contribution by the Center on Long-Term Risk: -# https://github.com/longtermrisk/marltoolbox -########## -import numpy as np - - -def add_RewardUncertaintyEnvClassWrapper( - EnvClass, reward_uncertainty_std, reward_uncertainty_mean=0.0 -): - class RewardUncertaintyEnvClassWrapper(EnvClass): - def step(self, action): - observations, rewards, done, info = super().step(action) - return observations, self.reward_wrapper(rewards), done, info - - def reward_wrapper(self, reward_dict): - for k in reward_dict.keys(): - reward_dict[k] += np.random.normal( - loc=reward_uncertainty_mean, scale=reward_uncertainty_std, size=() - ) - return reward_dict - - return RewardUncertaintyEnvClassWrapper diff --git a/rllib/examples/_old_api_stack/custom_keras_model.py b/rllib/examples/_old_api_stack/custom_keras_model.py index cdf1f516ef32..e3ccad874b30 100644 --- a/rllib/examples/_old_api_stack/custom_keras_model.py +++ b/rllib/examples/_old_api_stack/custom_keras_model.py @@ -127,7 +127,9 @@ def on_train_result(self, *, algorithm, result, **kwargs): config = ( get_trainable_cls(args.run) .get_default_config() - .environment("ALE/Breakout-v5" if args.use_vision_network else "CartPole-v1") + .environment( + "ale_py:ALE/Breakout-v5" if args.use_vision_network else "CartPole-v1" + ) .framework("tf") .callbacks(MyCallbacks) .training( diff --git a/rllib/examples/connectors/frame_stacking.py b/rllib/examples/connectors/frame_stacking.py index 554bd1c8f20d..103ae8de5f11 100644 --- a/rllib/examples/connectors/frame_stacking.py +++ b/rllib/examples/connectors/frame_stacking.py @@ -97,7 +97,7 @@ # Use Pong by default. parser.set_defaults( enable_new_api_stack=True, - env="ALE/Pong-v5", + env="ale_py:ALE/Pong-v5", ) parser.add_argument( "--num-frames", diff --git a/rllib/examples/curiosity/euclidian_distance_based_curiosity.py b/rllib/examples/curiosity/euclidian_distance_based_curiosity.py index 0d73c6b50c1f..d471c17f1858 100644 --- a/rllib/examples/curiosity/euclidian_distance_based_curiosity.py +++ b/rllib/examples/curiosity/euclidian_distance_based_curiosity.py @@ -67,12 +67,11 @@ ) from ray.tune.registry import get_trainable_cls -# TODO (sven): SB3's PPO does seem to learn MountainCar-v0 until a reward of ~-110. -# We might have to play around some more with different initializations, more -# randomized SGD minibatching (we don't shuffle batch rn), etc.. to get to these -# results as well. +# TODO (sven): SB3's PPO learns MountainCar-v0 until a reward of ~-110. +# We might have to play around some more with different initializations, etc.. +# to get to these results as well. parser = add_rllib_example_script_args( - default_reward=-130.0, default_iters=2000, default_timesteps=1000000 + default_reward=-140.0, default_iters=2000, default_timesteps=1000000 ) parser.set_defaults( enable_new_api_stack=True, diff --git a/rllib/examples/curiosity/intrinsic_curiosity_model_based_curiosity.py b/rllib/examples/curiosity/intrinsic_curiosity_model_based_curiosity.py index 323bc20c8a58..b70cc89bdbe7 100644 --- a/rllib/examples/curiosity/intrinsic_curiosity_model_based_curiosity.py +++ b/rllib/examples/curiosity/intrinsic_curiosity_model_based_curiosity.py @@ -73,6 +73,8 @@ """ from collections import defaultdict +import numpy as np + from ray import tune from ray.rllib.algorithms.algorithm_config import AlgorithmConfig from ray.rllib.algorithms.callbacks import DefaultCallbacks @@ -132,9 +134,9 @@ def on_episode_step( rl_module, **kwargs, ): - obs = episode.get_observations(-1) num_rows = env.envs[0].unwrapped.nrow num_cols = env.envs[0].unwrapped.ncol + obs = np.argmax(episode.get_observations(-1)) row = obs // num_cols col = obs % num_rows curr_dist = (row**2 + col**2) ** 0.5 @@ -298,7 +300,7 @@ def on_sample_end( success_key = f"{ENV_RUNNER_RESULTS}/max_dist_travelled_across_running_episodes" stop = { - success_key: 8.0, + success_key: 12.0, f"{ENV_RUNNER_RESULTS}/{EPISODE_RETURN_MEAN}": args.stop_reward, NUM_ENV_STEPS_SAMPLED_LIFETIME: args.stop_timesteps, } diff --git a/rllib/examples/envs/env_rendering_and_recording.py b/rllib/examples/envs/env_rendering_and_recording.py index ba02f50b7f16..77669649e66c 100644 --- a/rllib/examples/envs/env_rendering_and_recording.py +++ b/rllib/examples/envs/env_rendering_and_recording.py @@ -73,7 +73,10 @@ from ray import tune parser = add_rllib_example_script_args(default_reward=20.0) -parser.set_defaults(env="ALE/Pong-v5") +parser.set_defaults( + enable_new_api_stack=True, + env="ale_py:ALE/Pong-v5", +) class EnvRenderCallback(DefaultCallbacks): @@ -129,10 +132,10 @@ def on_episode_step( # If we have a vector env, only render the sub-env at index 0. if isinstance(env.unwrapped, gym.vector.VectorEnv): - image = env.envs[0].render() + image = env.unwrapped.envs[0].render() # Render the gym.Env. else: - image = env.render() + image = env.unwrapped.render() # Original render images for CartPole are 400x600 (hxw). We'll downsize here to # a very small dimension (to save space and bandwidth). @@ -239,14 +242,10 @@ def on_sample_end( if __name__ == "__main__": args = parser.parse_args() - assert ( - args.enable_new_api_stack - ), "Must set --enable-new-api-stack when running this script!" - # Register our environment with tune. def _env_creator(cfg): cfg.update({"render_mode": "rgb_array"}) - if args.env.startswith("ALE/"): + if args.env.startswith("ale_py:ALE/"): cfg.update( { # Make analogous to old v4 + NoFrameskip. diff --git a/rllib/examples/evaluation/custom_evaluation.py b/rllib/examples/evaluation/custom_evaluation.py index a6d4a1c3e029..f4d05ea3bd26 100644 --- a/rllib/examples/evaluation/custom_evaluation.py +++ b/rllib/examples/evaluation/custom_evaluation.py @@ -112,12 +112,12 @@ def custom_eval_function( # `set_corridor_length` method on these. eval_workers.foreach_worker( func=lambda worker: ( - env.set_corridor_length( + env.unwrapped.set_corridor_length( args.corridor_length_eval_worker_1 if worker.worker_index == 1 else args.corridor_length_eval_worker_2 ) - for env in worker.env.envs + for env in worker.env.unwrapped.envs ) ) diff --git a/rllib/examples/metrics/custom_metrics_in_env_runners.py b/rllib/examples/metrics/custom_metrics_in_env_runners.py index 3b10ac496641..cba86a50afb6 100644 --- a/rllib/examples/metrics/custom_metrics_in_env_runners.py +++ b/rllib/examples/metrics/custom_metrics_in_env_runners.py @@ -301,7 +301,7 @@ def _get_pacman_yx_pos(self, env): register_env( "env", lambda cfg: wrap_atari_for_new_api_stack( - gym.make("ALE/MsPacman-v5", **cfg, **{"render_mode": "rgb_array"}), + gym.make("ale_py:ALE/MsPacman-v5", **cfg, **{"render_mode": "rgb_array"}), framestack=4, ), ) diff --git a/rllib/examples/ray_tune/custom_experiment.py b/rllib/examples/ray_tune/custom_experiment.py index d0e424911d46..779c5c1fd041 100644 --- a/rllib/examples/ray_tune/custom_experiment.py +++ b/rllib/examples/ray_tune/custom_experiment.py @@ -105,7 +105,7 @@ def my_experiment(config: Dict): # Extract the gymnasium env object from the created algo (its local # SingleAgentEnvRunner worker). Note that the env in this single-agent # case is a gymnasium vector env and that we get its first sub-env here. - env = local_env_runner.env.envs[0] + env = local_env_runner.env.unwrapped.envs[0] # The local worker (SingleAgentEnvRunner) rl_module = local_env_runner.module diff --git a/rllib/examples/rl_modules/custom_cnn_rl_module.py b/rllib/examples/rl_modules/custom_cnn_rl_module.py index a8aac2980530..4001f3e21d6b 100644 --- a/rllib/examples/rl_modules/custom_cnn_rl_module.py +++ b/rllib/examples/rl_modules/custom_cnn_rl_module.py @@ -66,7 +66,7 @@ parser = add_rllib_example_script_args(default_iters=100, default_timesteps=600000) parser.set_defaults( enable_new_api_stack=True, - env="ALE/Pong-v5", + env="ale_py:ALE/Pong-v5", ) diff --git a/rllib/models/tests/test_preprocessors.py b/rllib/models/tests/test_preprocessors.py index 51ad457dabe7..03a344de3289 100644 --- a/rllib/models/tests/test_preprocessors.py +++ b/rllib/models/tests/test_preprocessors.py @@ -90,12 +90,12 @@ def test_gym_preprocessors(self): p2 = ModelCatalog.get_preprocessor(gym.make("FrozenLake-v1")) self.assertEqual(type(p2), OneHotPreprocessor) - p3 = ModelCatalog.get_preprocessor(gym.make("ALE/MsPacman-ram-v5")) + p3 = ModelCatalog.get_preprocessor(gym.make("ale_py:ALE/MsPacman-ram-v5")) self.assertEqual(type(p3), AtariRamPreprocessor) p4 = ModelCatalog.get_preprocessor( gym.make( - "ALE/MsPacman-v5", + "ale_py:ALE/MsPacman-v5", frameskip=1, ) ) diff --git a/rllib/tuned_examples/bc/benchmark_atari_pong_bc.py b/rllib/tuned_examples/bc/benchmark_atari_pong_bc.py index f5d7727bb68a..d084f61fb9f4 100644 --- a/rllib/tuned_examples/bc/benchmark_atari_pong_bc.py +++ b/rllib/tuned_examples/bc/benchmark_atari_pong_bc.py @@ -128,7 +128,7 @@ def _make_learner_connector(observation_space, action_space): # in the collection of the `rl_unplugged` data. def _env_creator(cfg): return wrap_atari_for_new_api_stack( - gym.make("ALE/Pong-v5", **cfg), + gym.make("ale_py:ALE/Pong-v5", **cfg), # Perform frame-stacking through ConnectorV2 API. framestack=4, dim=84, diff --git a/rllib/tuned_examples/impala/pong_impala.py b/rllib/tuned_examples/impala/pong_impala.py index 8802abf6a3b2..3fe08f9c35ed 100644 --- a/rllib/tuned_examples/impala/pong_impala.py +++ b/rllib/tuned_examples/impala/pong_impala.py @@ -15,7 +15,7 @@ parser = add_rllib_example_script_args() parser.set_defaults( enable_new_api_stack=True, - env="ALE/Pong-v5", + env="ale_py:ALE/Pong-v5", ) parser.add_argument( "--use-tiny-cnn", diff --git a/rllib/tuned_examples/impala/pong_impala_pb2_hyperopt.py b/rllib/tuned_examples/impala/pong_impala_pb2_hyperopt.py index 2f7b100500c6..ca331fe9a861 100644 --- a/rllib/tuned_examples/impala/pong_impala_pb2_hyperopt.py +++ b/rllib/tuned_examples/impala/pong_impala_pb2_hyperopt.py @@ -15,7 +15,7 @@ from ray import tune parser = add_rllib_example_script_args() -parser.set_defaults(env="ALE/Pong-v5") +parser.set_defaults(env="ale_py:ALE/Pong-v5") parser.add_argument( "--use-tiny-cnn", action="store_true", diff --git a/rllib/tuned_examples/ppo/atari_ppo.py b/rllib/tuned_examples/ppo/atari_ppo.py index 7abcfdff245e..ad298550e8a3 100644 --- a/rllib/tuned_examples/ppo/atari_ppo.py +++ b/rllib/tuned_examples/ppo/atari_ppo.py @@ -14,7 +14,10 @@ default_timesteps=3000000, default_iters=100000000000, ) -parser.set_defaults(enable_new_api_stack=True) +parser.set_defaults( + enable_new_api_stack=True, + env="ale_py:ALE/Pong-v5", +) # Use `parser` to add your own custom command line options to this script # and (if needed) use their values toset up `config` below. args = parser.parse_args() diff --git a/rllib/utils/error.py b/rllib/utils/error.py index 5671abc10eef..d2b9db4c351a 100644 --- a/rllib/utils/error.py +++ b/rllib/utils/error.py @@ -67,7 +67,7 @@ class NotSerializable(Exception): 1) Run `pip install gymnasium` on your command line. 2) Change all your import statements in your code from `import gym` -> `import gymnasium as gym` OR - `from gym.space import Discrete` -> `from gymnasium.spaces import Discrete` + `from gym.spaces import Discrete` -> `from gymnasium.spaces import Discrete` For your custom (single agent) gym.Env classes: 3.1) Either wrap your old Env class via the provided `from gymnasium.wrappers import diff --git a/rllib/utils/exploration/tests/test_curiosity.py b/rllib/utils/exploration/tests/test_curiosity.py index 4531154371f0..bcc603171264 100644 --- a/rllib/utils/exploration/tests/test_curiosity.py +++ b/rllib/utils/exploration/tests/test_curiosity.py @@ -1,23 +1,14 @@ -from collections import deque -import gymnasium as gym -import minigrid import numpy as np import sys import unittest import ray -from ray import air, tune -from ray.air.constants import TRAINING_ITERATION from ray.rllib.algorithms.callbacks import DefaultCallbacks import ray.rllib.algorithms.ppo as ppo -from ray.rllib.utils.test_utils import check_learning_achieved from ray.rllib.utils.metrics import ( ENV_RUNNER_RESULTS, EPISODE_RETURN_MAX, - EPISODE_RETURN_MEAN, ) -from ray.rllib.utils.numpy import one_hot -from ray.tune import register_env class MyCallBack(DefaultCallbacks): @@ -46,96 +37,6 @@ def on_sample_end(self, *, worker, samples, **kwargs): self.deltas = [] -class OneHotWrapper(gym.core.ObservationWrapper): - def __init__(self, env, vector_index, framestack): - super().__init__(env) - self.framestack = framestack - # 49=7x7 field of vision; 11=object types; 6=colors; 3=state types. - # +4: Direction. - self.single_frame_dim = 49 * (11 + 6 + 3) + 4 - self.init_x = None - self.init_y = None - self.x_positions = [] - self.y_positions = [] - self.x_y_delta_buffer = deque(maxlen=100) - self.vector_index = vector_index - self.frame_buffer = deque(maxlen=self.framestack) - for _ in range(self.framestack): - self.frame_buffer.append(np.zeros((self.single_frame_dim,))) - - self.observation_space = gym.spaces.Box( - 0.0, 1.0, shape=(self.single_frame_dim * self.framestack,), dtype=np.float32 - ) - - def observation(self, obs): - # Debug output: max-x/y positions to watch exploration progress. - if self.step_count == 0: - for _ in range(self.framestack): - self.frame_buffer.append(np.zeros((self.single_frame_dim,))) - if self.vector_index == 0: - if self.x_positions: - max_diff = max( - np.sqrt( - (np.array(self.x_positions) - self.init_x) ** 2 - + (np.array(self.y_positions) - self.init_y) ** 2 - ) - ) - self.x_y_delta_buffer.append(max_diff) - print( - "100-average dist travelled={}".format( - np.mean(self.x_y_delta_buffer) - ) - ) - self.x_positions = [] - self.y_positions = [] - self.init_x = self.agent_pos[0] - self.init_y = self.agent_pos[1] - - # Are we carrying the key? - # if self.carrying is not None: - # print("Carrying KEY!!") - - self.x_positions.append(self.agent_pos[0]) - self.y_positions.append(self.agent_pos[1]) - - # One-hot the last dim into 11, 6, 3 one-hot vectors, then flatten. - objects = one_hot(obs[:, :, 0], depth=11) - colors = one_hot(obs[:, :, 1], depth=6) - states = one_hot(obs[:, :, 2], depth=3) - # Is the door we see open? - # for x in range(7): - # for y in range(7): - # if objects[x, y, 4] == 1.0 and states[x, y, 0] == 1.0: - # print("Door OPEN!!") - - all_ = np.concatenate([objects, colors, states], -1) - all_flat = np.reshape(all_, (-1,)) - direction = one_hot(np.array(self.agent_dir), depth=4).astype(np.float32) - single_frame = np.concatenate([all_flat, direction]) - self.frame_buffer.append(single_frame) - return np.concatenate(self.frame_buffer) - - -def env_maker(config): - name = config.get("name", "MiniGrid-Empty-5x5-v0") - framestack = config.get("framestack", 4) - env = gym.make(name) - # Make it impossible to reach goal by chance. - env = gym.wrappers.TimeLimit(env, max_episode_steps=15) - # Only use image portion of observation (discard goal and direction). - env = minigrid.wrappers.ImgObsWrapper(env) - env = OneHotWrapper( - env, - config.vector_index if hasattr(config, "vector_index") else 0, - framestack=framestack, - ) - return env - - -register_env("mini-grid", env_maker) -CONV_FILTERS = [[16, [11, 11], 3], [32, [9, 9], 3], [64, [5, 5], 3]] - - class TestCuriosity(unittest.TestCase): @classmethod def setUpClass(cls): @@ -187,10 +88,7 @@ def test_curiosity_on_frozen_lake(self): "type": "StochasticSampling", }, }, - ) - # TODO (Kourosh): We need to provide examples on how we do curiosity with - # RLModule API - .training(lr=0.001) + ).training(lr=0.001) ) num_iterations = 10 @@ -207,106 +105,6 @@ def test_curiosity_on_frozen_lake(self): algo.stop() self.assertTrue(learnt) - # Disable this check for now. Add too much flakyness to test. - # if fw == "tf": - # # W/o Curiosity. Expect to learn nothing. - # print("Trying w/o curiosity (not expected to learn).") - # config["exploration_config"] = { - # "type": "StochasticSampling", - # } - # algo = ppo.PPO(config=config) - # rewards_wo = 0.0 - # for _ in range(num_iterations): - # result = algo.train() - # rewards_wo += result[ENV_RUNNER_RESULTS][EPISODE_RETURN_MEAN] - # print(result) - # algo.stop() - # self.assertTrue(rewards_wo == 0.0) - # print("Did not reach goal w/o curiosity!") - - def test_curiosity_on_partially_observable_domain(self): - config = ( - ppo.PPOConfig() - .environment( - "mini-grid", - env_config={ - # Also works with: - # - MiniGrid-MultiRoom-N4-S5-v0 - # - MiniGrid-MultiRoom-N2-S4-v0 - "name": "MiniGrid-Empty-8x8-v0", - "framestack": 1, # seems to work even w/o framestacking - }, - ) - .env_runners( - num_envs_per_env_runner=4, - num_env_runners=0, - exploration_config={ - "type": "Curiosity", - # For the feature NN, use a non-LSTM fcnet (same as the one - # in the policy model). - "eta": 0.1, - "lr": 0.0003, # 0.0003 or 0.0005 seem to work fine as well. - "feature_dim": 64, - # No actual feature net: map directly from observations to feature - # vector (linearly). - "feature_net_config": { - "fcnet_hiddens": [], - "fcnet_activation": "relu", - }, - "sub_exploration": { - "type": "StochasticSampling", - }, - }, - ) - .training( - model={ - "fcnet_hiddens": [256, 256], - "fcnet_activation": "relu", - }, - num_epochs=8, - ) - ) - - min_reward = 0.001 - stop = { - TRAINING_ITERATION: 25, - f"{ENV_RUNNER_RESULTS}/{EPISODE_RETURN_MEAN}": min_reward, - } - # To replay: - # algo = ppo.PPO(config=config) - # algo.restore("[checkpoint file]") - # env = env_maker(config["env_config"]) - # obs, info = env.reset() - # for _ in range(10000): - # obs, reward, done, truncated, info = env.step( - # algo.compute_single_action(s) - # ) - # if done: - # obs, info = env.reset() - # env.render() - - results = tune.Tuner( - "PPO", - param_space=config, - run_config=air.RunConfig(stop=stop, verbose=1), - ).fit() - check_learning_achieved(results, min_reward) - iters = results.get_best_result().metrics[TRAINING_ITERATION] - print("Reached in {} iterations.".format(iters)) - - # config_wo = config.copy() - # config_wo["exploration_config"] = {"type": "StochasticSampling"} - # stop_wo = stop.copy() - # stop_wo[TRAINING_ITERATION] = iters - # results = tune.Tuner( - # "PPO", param_space=config_wo, stop=stop_wo, verbose=1).fit() - # try: - # check_learning_achieved(results, min_reward) - # except ValueError: - # print("Did not learn w/o curiosity (expected).") - # else: - # raise ValueError("Learnt w/o curiosity (not expected)!") - if __name__ == "__main__": import pytest