diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..8629ec6 --- /dev/null +++ b/.flake8 @@ -0,0 +1,23 @@ +[flake8] +show-source=True +statistics=True +per-file-ignores=*/__init__.py:F401 +# E402: Module level import not at top of file +# E501: Line too long +# W503: Line break before binary operator +# E203: Whitespace before ':' -> conflicts with black +# D401: First line should be in imperative mood +# R504: Unnecessary variable assignment before return statement. +# R505: Unnecessary elif after return statement +# SIM102: Use a single if-statement instead of nested if-statements +# SIM117: Merge with statements for context managers that have same scope. +# SIM118: Checks for key-existence checks against dict.keys() calls. +ignore=E402,E501,E704,W503,E203,D401,R504,R505,SIM102,SIM117,SIM118 +max-line-length = 120 +max-complexity = 30 +exclude=_*,.vscode,.git,docs/** +# docstrings +docstring-convention=google +# annotations +suppress-none-returning=True +allow-star-arg-any=True diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml new file mode 100644 index 0000000..122c64b --- /dev/null +++ b/.github/workflows/pre-commit.yml @@ -0,0 +1,16 @@ +name: Run linters using pre-commit + +on: + pull_request: + types: [opened, synchronize, reopened] + +jobs: + pre-commit: + name: Run pre-commit checks + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v3 + with: + python-version: "3.11" + - uses: pre-commit/action@v3.0.0 diff --git a/.gitignore b/.gitignore index b7faf40..31e7e2e 100644 --- a/.gitignore +++ b/.gitignore @@ -182,11 +182,11 @@ cython_debug/ .abstra/ # Visual Studio Code -# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore +# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore # that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore -# and can be added to the global gitignore or merged into this file. However, if you prefer, +# and can be added to the global gitignore or merged into this file. However, if you prefer, # you could uncomment the following to ignore the entire vscode folder -# .vscode/ +.vscode/ # Ruff stuff: .ruff_cache/ @@ -205,3 +205,8 @@ cython_debug/ marimo/_static/ marimo/_lsp/ __marimo__/ + + +# IsaacLab +datasets/ +recordings/ diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..35ab6c7 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,6 @@ +[submodule "dependencies/IsaacLab"] + path = dependencies/IsaacLab + url = https://github.com/isaac-sim/IsaacLab.git +[submodule "dependencies/curobo"] + path = dependencies/curobo + url = https://github.com/NVlabs/curobo.git diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..92a4ed3 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,60 @@ +repos: + - repo: https://github.com/python/black + rev: 24.3.0 + hooks: + - id: black + args: ["--line-length", "120", "--unstable"] + - repo: https://github.com/pycqa/flake8 + rev: 7.0.0 + hooks: + - id: flake8 + additional_dependencies: [flake8-simplify, flake8-return] + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.5.0 + hooks: + - id: trailing-whitespace + - id: check-symlinks + - id: destroyed-symlinks + - id: check-added-large-files + args: ["--maxkb=2000"] # restrict files more than 2 MB. Should use git-lfs instead. + - id: check-yaml + - id: check-merge-conflict + - id: check-case-conflict + - id: check-executables-have-shebangs + - id: check-toml + - id: end-of-file-fixer + - id: check-shebang-scripts-are-executable + - id: detect-private-key + - id: debug-statements + - repo: https://github.com/pycqa/isort + rev: 5.13.2 + hooks: + - id: isort + name: isort (python) + args: ["--profile", "black", "--filter-files"] + - repo: https://github.com/asottile/pyupgrade + rev: v3.15.1 + hooks: + - id: pyupgrade + args: ["--py310-plus"] + # FIXME: This is a hack because Pytorch does not like: torch.Tensor | dict aliasing + exclude: "source/isaaclab/isaaclab/envs/common.py|source/isaaclab/isaaclab/ui/widgets/image_plot.py|source/isaaclab_tasks/isaaclab_tasks/direct/humanoid_amp/motions/motion_loader.py" + - repo: https://github.com/codespell-project/codespell + rev: v2.2.6 + hooks: + - id: codespell + additional_dependencies: + - tomli + exclude: "CONTRIBUTORS.md|docs/source/setup/walkthrough/concepts_env_design.rst|docs/package-lock.json" + args: ["-L", "reacher"] + # FIXME: Figure out why this is getting stuck under VPN. + # - repo: https://github.com/RobertCraigie/pyright-python + # rev: v1.1.315 + # hooks: + # - id: pyright + - repo: https://github.com/pre-commit/pygrep-hooks + rev: v1.10.0 + hooks: + - id: rst-backticks + - id: rst-directive-colons + - id: rst-inline-touching-normal diff --git a/README.md b/README.md index eecdce7..3e8c9b5 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,162 @@ # autosim -autosim + +**autosim: An automated data generation pipeline based on NVIDIA Isaac Lab** + +An automatic simulation data generation pipeline built on Isaac Lab, integrating LLM-based task decomposition, motion planning, and navigation capabilities. + +## Overview + +`autosim` provides an extensible automated data generation pipeline for NVIDIA Isaac Lab environments: + +- Starting from **task code and scene information**, it uses an LLM to **decompose high-level tasks** automatically. +- Maps the decomposition result into a sequence of **atomic skills**. +- Invokes **motion planning (based on cuRobo)** and **navigation** to execute these skills in simulation. +- Produces unified **action sequences / trajectory data** for downstream robot learning and research. + +> In short: `autosim` helps you automatically turn “a task in Isaac Lab” into an executable sequence of skills, and generates data that can be used for training or evaluation. + +## Installation + +Below is a typical setup workflow. `autosim` can be installed as a submodule inside an environment that already contains Isaac Lab. + +```bash +conda create -n autosim python=3.11 + +conda activate autosim + +git clone https://github.com/LightwheelAI/autosim.git + +cd autosim + +git submodule update --init --recursive +``` + +### IsaacLab Installation + +`autosim` depends on Isaac Lab. You can follow the official installation guide [here](https://isaac-sim.github.io/IsaacLab/v2.2.1/source/setup/installation/pip_installation.html), or use the commands below. If you already have an environment with Isaac Lab installed, you can reuse it and skip this step. + +```bash +# Install CUDA toolkit +conda install -c "nvidia/label/cuda-12.8.1" cuda-toolkit + +# Install PyTorch +pip install -U torch==2.7.0 torchvision==0.22.0 --index-url https://download.pytorch.org/whl/cu128 + +# Install IsaacSim +pip install --upgrade pip +pip install "isaacsim[all,extscache]==5.1.0" --extra-index-url https://pypi.nvidia.com + + +# Install IsaacLab +sudo apt install cmake build-essential + +cd dependencies/IsaacLab +./isaaclab.sh --install +``` + +> ⚠️ The installation of Isaac Lab is relatively involved. Please follow the official documentation carefully. This repository only works on top of a correctly configured Isaac Lab environment. + +### cuRobo Installation + +Some skills in `autosim` depend on cuRobo. You can follow the official [documentation](https://curobo.org/get_started/1_install_instructions.html), or use the commands below: + +```bash +cd dependencies/curobo + +pip install -e . --no-build-isolation +``` + +### autosim Installation + +Finally, install `autosim` into your environment: + +```bash +pip install -e source/autosim +``` + +## Quick Start + +### Run the Example Pipeline (Franka Cube Lift) + +After completing the installation and configuration steps above, you can directly run the built-in example. + +First, install the `autosim` example package: + +```bash +pip install -e source/autosim_examples +``` + +Then launch the example with: + +```bash +cd autosim + +python examples/run_autosim_example.py \ + --pipeline_id AutoSimPipeline-FrankaCubeLift-v0 +``` + +After launching, you will see in the Isaac Sim UI that the manipulator automatically executes the Cube Lift task. + +## Defining Your Own Pipeline + +For a task that has already been defined in Isaac Lab, you can use `autosim` to define a custom `AutoSimPipeline` for it. A typical workflow looks like this: + +1. **Implement a config class** + + Inherit from `AutoSimPipelineCfg` and adjust components as needed: + + - `decomposer` (e.g., using a different LLM or prompt template) + - `motion_planner` (e.g., different robot model or planner parameters) + - `skills` / `action_adapter`, etc. + +2. **Implement the pipeline class** + + Inherit from `AutoSimPipeline` and implement: + + - `load_env(self) -> ManagerBasedEnv`: load the environment based on Isaac Lab / Gymnasium; this environment should correspond to the pre-defined Task. + - `get_env_extra_info(self) -> EnvExtraInfo`: provide the task name, robot name, end-effector link, and reach targets expressed as poses relative to objects, etc. + +3. **Register the pipeline in the package’s `__init__.py`** + + For example: + + ```python + from autosim import register_pipeline + + register_pipeline( + id="MyPipeline-v0", + entry_point="my_package.pipelines.my_pipeline:MyPipeline", + cfg_entry_point="my_package.pipelines.my_pipeline:MyPipelineCfg", + ) + ``` + +4. **Use it from a script or external project** + + ```python + from autosim import make_pipeline + + pipeline = make_pipeline("MyPipeline-v0") + output = pipeline.run() + ``` + +> You can refer to `source/autosim_examples/autosim_examples/autosim/pipelines/franka_lift_cube.py` for a minimal working example. + +## Contributing + +Issues, feature requests, and pull requests are all welcome! + +Before submitting contributions, we recommend: + +- First verify that the example pipeline runs correctly in your local environment. +- Follow the existing code style in this repository (Black + isort, see the root `pyproject.toml`). +- Whenever possible, add tests or minimal reproducible examples for new features or bug fixes. + +## Acknowledgements + +`autosim` is built on top of the following excellent open-source projects: + +- **cuRobo**: GPU-accelerated robot motion planning. +- **Isaac Lab**: NVIDIA’s framework for robot simulation and reinforcement learning. +- And other dependencies and upstream projects used in this repository. + +We sincerely thank the authors and communities behind these projects. diff --git a/dependencies/IsaacLab b/dependencies/IsaacLab new file mode 160000 index 0000000..3c6e67b --- /dev/null +++ b/dependencies/IsaacLab @@ -0,0 +1 @@ +Subproject commit 3c6e67bb5c7ada942a6d1884ab69338f57596f77 diff --git a/dependencies/curobo b/dependencies/curobo new file mode 160000 index 0000000..ebb7170 --- /dev/null +++ b/dependencies/curobo @@ -0,0 +1 @@ +Subproject commit ebb71702f3f70e767f40fd8e050674af0288abe8 diff --git a/examples/run_autosim_example.py b/examples/run_autosim_example.py new file mode 100644 index 0000000..18c7e25 --- /dev/null +++ b/examples/run_autosim_example.py @@ -0,0 +1,36 @@ +"""Script to run autosim example.""" + +import argparse + +from isaaclab.app import AppLauncher + +# add argparse arguments +parser = argparse.ArgumentParser(description="run autosim example pipeline.") +parser.add_argument( + "--pipeline_id", type=str, default="AutoSimPipeline-FrankaCubeLift-v0", help="Name of the autosim pipeline." +) + +# append AppLauncher cli args +AppLauncher.add_app_launcher_args(parser) +# parse the arguments +args_cli = parser.parse_args() + +app_launcher_args = vars(args_cli) + +# launch omniverse app +app_launcher = AppLauncher(app_launcher_args) +simulation_app = app_launcher.app + + +import autosim_examples # noqa: F401 +from autosim import make_pipeline + + +def main(): + pipeline = make_pipeline(args_cli.pipeline_id) + result = pipeline.run() + print(result) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..7dd8f3e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,7 @@ +[tool.black] +line-length = 120 +target-version = ["py311"] + +[tool.isort] +profile = "black" +known_first_party = ["autosim", "autosim_examples"] diff --git a/source/autosim/autosim/__init__.py b/source/autosim/autosim/__init__.py new file mode 100644 index 0000000..2f6f1b6 --- /dev/null +++ b/source/autosim/autosim/__init__.py @@ -0,0 +1,2 @@ +from .core import * +from .skills import * diff --git a/source/autosim/autosim/capabilities/__init__.py b/source/autosim/autosim/capabilities/__init__.py new file mode 100644 index 0000000..0b227a1 --- /dev/null +++ b/source/autosim/autosim/capabilities/__init__.py @@ -0,0 +1,2 @@ +from .motion_planning import * +from .navigation import * diff --git a/source/autosim/autosim/capabilities/motion_planning/__init__.py b/source/autosim/autosim/capabilities/motion_planning/__init__.py new file mode 100644 index 0000000..d5736d3 --- /dev/null +++ b/source/autosim/autosim/capabilities/motion_planning/__init__.py @@ -0,0 +1,4 @@ +from .curobo.curobo_planner import CuroboPlanner +from .curobo.curobo_planner_cfg import CuroboPlannerCfg + +__all__ = ["CuroboPlanner", "CuroboPlannerCfg"] diff --git a/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner.py b/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner.py new file mode 100644 index 0000000..acd2d74 --- /dev/null +++ b/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner.py @@ -0,0 +1,261 @@ +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any + +import torch +from curobo.cuda_robot_model.util import load_robot_yaml +from curobo.geom.types import WorldConfig +from curobo.types.base import TensorDeviceType +from curobo.types.file_path import ContentPath +from curobo.types.math import Pose +from curobo.types.state import JointState +from curobo.util.logger import setup_curobo_logger +from curobo.util.usd_helper import UsdHelper +from curobo.util_file import get_assets_path, get_configs_path +from curobo.wrap.reacher.motion_gen import ( + MotionGen, + MotionGenConfig, + MotionGenPlanConfig, +) +from isaaclab.assets import Articulation +from isaaclab.envs import ManagerBasedEnv + +from autosim.core.logger import AutoSimLogger + +if TYPE_CHECKING: + from .curobo_planner_cfg import CuroboPlannerCfg + + +class CuroboPlanner: + """Motion planner for robot manipulation using cuRobo.""" + + def __init__( + self, + env: ManagerBasedEnv, + robot: Articulation, + cfg: CuroboPlannerCfg, + env_id: int = 0, + ) -> None: + """Initialize the motion planner for a specific environment.""" + + self._env = env + self._robot = robot + self._env_id = env_id + + self.cfg: CuroboPlannerCfg = cfg + + # Initialize logger + log_level = logging.DEBUG if self.cfg.debug_planner else logging.INFO + self._logger = AutoSimLogger("CuroboPlanner", log_level) + setup_curobo_logger("warn") + + # Configuration operations + self._refine_config_from_env(env) + + # Load robot configuration + self.robot_cfg: dict[str, Any] = self._load_robot_config() + + # Create motion generator + world_cfg = WorldConfig() + motion_gen_config = MotionGenConfig.load_from_robot_config( + self.robot_cfg, + world_cfg, + self.tensor_args, + interpolation_dt=self.cfg.interpolation_dt, + collision_checker_type=self.cfg.collision_checker_type, + collision_cache=self.cfg.collision_cache, + collision_activation_distance=self.cfg.collision_activation_distance, + num_trajopt_seeds=self.cfg.num_trajopt_seeds, + num_graph_seeds=self.cfg.num_graph_seeds, + use_cuda_graph=self.cfg.use_cuda_graph, + fixed_iters_trajopt=True, + maximum_trajectory_dt=0.5, + ik_opt_iters=500, + ) + self.motion_gen: MotionGen = MotionGen(motion_gen_config) + + self.target_joint_names = self.motion_gen.kinematics.joint_names + + # Create plan configuration with parameters from configuration + self.plan_config: MotionGenPlanConfig = MotionGenPlanConfig( + enable_graph=self.cfg.enable_graph, + enable_graph_attempt=self.cfg.enable_graph_attempt, + max_attempts=self.cfg.max_planning_attempts, + time_dilation_factor=self.cfg.time_dilation_factor, + ) + + # Create USD helper + self.usd_helper = UsdHelper() + self.usd_helper.load_stage(env.scene.stage) + + # Warm up planner + self._logger.info("Warming up motion planner...") + self.motion_gen.warmup(enable_graph=True, warmup_js_trajopt=False) + + # Read static world geometry once + self._initialize_static_world() + + # Define supported cuRobo primitive types for object discovery and pose synchronization + self.primitive_types: list[str] = ["mesh", "cuboid", "sphere", "capsule", "cylinder", "voxel", "blox"] + + def _refine_config_from_env(self, env: ManagerBasedEnv): + """Refine the config from the environment.""" + + # Force cuRobo to always use CUDA device regardless of Isaac Lab device + # This isolates the motion planner from Isaac Lab's device configuration + if torch.cuda.is_available(): + idx = self.cfg.cuda_device if self.cfg.cuda_device is not None else torch.cuda.current_device() + self.tensor_args = TensorDeviceType(device=torch.device(f"cuda:{idx}"), dtype=torch.float32) + self._logger.debug(f"cuRobo motion planner initialized on CUDA device {idx}") + else: + self.tensor_args = TensorDeviceType() + self._logger.warning("CUDA not available, cuRobo using CPU - this may cause device compatibility issues") + + # refine interpolation dt + self.cfg.interpolation_dt = env.cfg.sim.dt * env.cfg.decimation + + def _load_robot_config(self): + """Load robot configuration from file or dictionary.""" + + if isinstance(self.cfg.robot_config_file, str): + self._logger.info(f"Loading robot configuration from {self.cfg.robot_config_file}") + + curobo_config_path = self.cfg.curobo_config_path or f"{get_configs_path()}/robot" + curobo_asset_path = self.cfg.curobo_asset_path or get_assets_path() + + content_path = ContentPath( + robot_config_root_path=curobo_config_path, + robot_urdf_root_path=curobo_asset_path, + robot_asset_root_path=curobo_asset_path, + robot_config_file=self.cfg.robot_config_file, + ) + robot_cfg = load_robot_yaml(content_path) + robot_cfg["robot_cfg"]["kinematics"]["external_asset_path"] = curobo_asset_path + + return robot_cfg + else: + self._logger.info("Using custom robot configuration dictionary.") + + return self.cfg.robot_config_file + + def _to_curobo_device(self, tensor: torch.Tensor) -> torch.Tensor: + """Convert tensor to cuRobo device for isolated device management.""" + + return tensor.to(device=self.tensor_args.device, dtype=self.tensor_args.dtype) + + def _initialize_static_world(self) -> None: + """Initialize static world geometry from USD stage (only called once).""" + + env_prim_path = f"/World/envs/env_{self._env_id}" + robot_prim_path = self.cfg.robot_prim_path or f"{env_prim_path}/Robot" + + only_paths = [f"{env_prim_path}/{sub}" for sub in self.cfg.world_only_subffixes] + + ignore_list = [f"{env_prim_path}/{sub}" for sub in self.cfg.world_ignore_subffixes] or [ + f"{env_prim_path}/target", + "/World/defaultGroundPlane", + "/curobo", + ] + ignore_list.append(robot_prim_path) + + self._static_world_config = self.usd_helper.get_obstacles_from_stage( + only_paths=only_paths, + reference_prim_path=robot_prim_path, + ignore_substring=ignore_list, + ) + self._static_world_config = self._static_world_config.get_collision_check_world() + self.motion_gen.update_world(self._static_world_config) + + def plan_motion( + self, + target_pos: torch.Tensor, + target_quat: torch.Tensor, + current_q: torch.Tensor, + current_qd: torch.Tensor | None = None, + link_goals: dict[str, torch.Tensor] | None = None, + ) -> JointState | None: + """ + Plan a trajectory to reach a target pose from a current joint state. + + Args: + target_pos: Target position [x, y, z] + target_quat: Target quaternion [qw, qx, qy, qz] + current_q: Current joint positions + current_qd: Current joint velocities + link_goals: Optional dictionary mapping link names to target poses for other links + + Returns: + JointState of the trajectory or None if planning failed + """ + + if current_qd is None: + current_qd = torch.zeros_like(current_q) + dof_needed = len(self.target_joint_names) + + # adjust the joint number + if len(current_q) < dof_needed: + pad = torch.zeros(dof_needed - len(current_q), dtype=current_q.dtype) + current_q = torch.concatenate([current_q, pad], axis=0) + current_qd = torch.concatenate([current_qd, torch.zeros_like(pad)], axis=0) + elif len(current_q) > dof_needed: + current_q = current_q[:dof_needed] + current_qd = current_qd[:dof_needed] + + # build the target pose + goal = Pose( + position=self._to_curobo_device(target_pos), + quaternion=self._to_curobo_device(target_quat), + ) + + # build the current state + state = JointState( + position=self._to_curobo_device(current_q), + velocity=self._to_curobo_device(current_qd) * 0.0, + acceleration=self._to_curobo_device(current_qd) * 0.0, + jerk=self._to_curobo_device(current_qd) * 0.0, + joint_names=self.target_joint_names, + ) + + current_joint_state: JointState = state.get_ordered_joint_state(self.target_joint_names) + + # Prepare link_poses for multi-arm robots + link_poses = None + if link_goals is not None: + # Use provided link goals + link_poses = { + link_name: Pose(position=self._to_curobo_device(pose[:3]), quaternion=self._to_curobo_device(pose[3:])) + for link_name, pose in link_goals.items() + } + + # execute planning + result = self.motion_gen.plan_single( + current_joint_state.unsqueeze(0), + goal, + self.plan_config, + link_poses=link_poses, + ) + + if result.success.item(): + current_plan = result.get_interpolated_plan() + motion_plan = current_plan.get_ordered_joint_state(self.target_joint_names) + + self._logger.debug(f"planning succeeded with {len(motion_plan.position)} waypoints") + return motion_plan + else: + self._logger.warning(f"planning failed: {result.status}") + return None + + def reset(self): + """reset the planner state""" + + self.motion_gen.reset() + + def get_ee_pose(self, current_q: torch.Tensor) -> Pose: + """Get the end-effector pose of the robot.""" + + current_joint_state = JointState( + position=self._to_curobo_device(current_q), joint_names=self.target_joint_names + ) + kin_state = self.motion_gen.compute_kinematics(current_joint_state) + return kin_state.link_poses[self.motion_gen.kinematics.ee_link] diff --git a/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner_cfg.py b/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner_cfg.py new file mode 100644 index 0000000..fb8daa7 --- /dev/null +++ b/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner_cfg.py @@ -0,0 +1,62 @@ +from dataclasses import MISSING + +from curobo.geom.sdf.world import CollisionCheckerType +from isaaclab.utils.configclass import configclass + +from .curobo_planner import CuroboPlanner + + +@configclass +class CuroboPlannerCfg: + """Configuration for the Curobo motion planner.""" + + class_type: type = CuroboPlanner + """The class type of the Curobo motion planner.""" + + # Curobo robot configuration + robot_config_file: str | dict = MISSING + """cuRobo robot configuration file (path or dictionary).""" + curobo_config_path: str | None = None + """Path to the curobo config directory.""" + curobo_asset_path: str | None = None + """Path to the curobo asset directory.""" + + # Motion planning parameters + collision_checker_type: CollisionCheckerType = CollisionCheckerType.MESH + """Type of collision checker to use.""" + collision_cache: dict[str, int] = {"obb": 1000, "mesh": 500} + """Collision cache for different collision types.""" + collision_activation_distance: float = 0.05 + """Distance at which collision constraints are activated.""" + interpolation_dt: float = 0.05 + """Time step for interpolating.""" + num_trajopt_seeds: int = 12 + """Number of seeds for trajectory optimization.""" + num_graph_seeds: int = 12 + """Number of seeds for graph search.""" + + # Planning configuration + enable_graph: bool = True + """Whether to enable graph-based planning.""" + enable_graph_attempt: int = 4 + """Number of graph planning attempts.""" + use_cuda_graph: bool = True + """Whether to use CUDA graph for planning.""" + max_planning_attempts: int = 10 + """Maximum number of planning attempts.""" + time_dilation_factor: float = 0.5 + """Time dilation factor for planning.""" + + # Optional prim path configuration + robot_prim_path: str | None = None + """Absolute USD prim path to the robot root for world extraction; None derives it from environment root.""" + world_only_subffixes: list[str] | None = None + """List of subffixes to only extract world obstacles from.""" + world_ignore_subffixes: list[str] | None = None + """List of subffixes to ignore when extracting world obstacles.""" + + # Debug and visualization + debug_planner: bool = False + """Enable detailed motion planning debug information.""" + cuda_device: int | None = 0 + """Preferred CUDA device index; None uses torch.cuda.current_device() (respects CUDA_VISIBLE_DEVICES).""" diff --git a/source/autosim/autosim/capabilities/navigation/__init__.py b/source/autosim/autosim/capabilities/navigation/__init__.py new file mode 100644 index 0000000..1091176 --- /dev/null +++ b/source/autosim/autosim/capabilities/navigation/__init__.py @@ -0,0 +1,5 @@ +from .a_star.a_star_planner import AStarPlanner +from .a_star.a_star_planner_cfg import AStarPlannerCfg +from .dwa.dwa_planner import DWAPlanner +from .dwa.dwa_planner_cfg import DWAPlannerCfg +from .occupancy_map import OccupancyMapCfg, get_occupancy_map diff --git a/source/autosim/autosim/capabilities/navigation/a_star/a_star_planner.py b/source/autosim/autosim/capabilities/navigation/a_star/a_star_planner.py new file mode 100644 index 0000000..3ff8309 --- /dev/null +++ b/source/autosim/autosim/capabilities/navigation/a_star/a_star_planner.py @@ -0,0 +1,213 @@ +from __future__ import annotations + +import heapq +from typing import TYPE_CHECKING + +import numpy as np +import torch +from scipy.ndimage import distance_transform_edt + +from autosim.core.logger import AutoSimLogger + +if TYPE_CHECKING: + from autosim.core.types import OccupancyMap + + from .a_star_planner_cfg import AStarPlannerCfg + + +class AStarPlanner: + """A* global path planner with distance field guidance""" + + # TODO: refine this class to be more efficient and robust + + def __init__(self, cfg: AStarPlannerCfg, occupancy_map: OccupancyMap) -> None: + self._cfg = cfg + self._occupancy_map = occupancy_map + self._device = self._occupancy_map.occupancy_map.device + + self._logger = AutoSimLogger("AStarPlanner") + + free_space = (self._occupancy_map.occupancy_map == 0).cpu().numpy() + self._distance_field = distance_transform_edt(free_space) * self._occupancy_map.resolution + + def plan(self, start: torch.Tensor, goal: torch.Tensor) -> torch.Tensor | None: + """ + A* planning from start to goal + + Args: + start: Tensor of shape [2] with world-frame [x, y]. + goal: Tensor of shape [2] with world-frame [x, y]. + + Returns: + path: Tensor of shape [N, 2] with world-frame waypoints, or None if planning failed. + """ + + # All internal computation is done in numpy + grid = self._occupancy_map.occupancy_map.cpu().numpy() + + start_np = start.detach().cpu().numpy() + goal_np = goal.detach().cpu().numpy() + + # convert world-frame numpy inputs to grid indices + start_grid = self._world_to_grid(start_np) + goal_grid = self._world_to_grid(goal_np) + + # Check bounds + if not self._is_valid_grid_pos(start_grid, grid.shape): + self._logger.warning(f"Start position {start[:2]} -> grid {start_grid} is out of bounds {grid.shape}") + return None + if not self._is_valid_grid_pos(goal_grid, grid.shape): + self._logger.warning(f"Goal position {goal[:2]} -> grid {goal_grid} is out of bounds {grid.shape}") + return None + + # A* search + path_grid = self._astar_search(grid, start_grid, goal_grid) + if path_grid is None: + return None + + # Convert back to world coordinates + path_points = [self._grid_to_world(p) for p in path_grid] + # Simplify path (remove redundant waypoints) + path_points = self._simplify_path(path_points) + + if not path_points: + return None + + path_np = np.stack(path_points, axis=0) + return torch.as_tensor(path_np, device=self._device, dtype=torch.float32) + + def _world_to_grid(self, pos: np.ndarray) -> np.ndarray: + """Convert world coordinates to grid coordinates (row, col) + + Args: + pos: [x, y] in world frame + + Returns: + grid_pos: numpy array [row, col] in grid frame. + """ + + x, y = float(pos[0]), float(pos[1]) + col = int((x - self._occupancy_map.origin[0]) / self._occupancy_map.resolution) + row = int((y - self._occupancy_map.origin[1]) / self._occupancy_map.resolution) + return np.array([row, col], dtype=np.int64) + + def _grid_to_world(self, pos: np.ndarray) -> np.ndarray: + """Convert grid coordinates to world coordinates + + Args: + pos: [row, col] in grid frame + + Returns: + world_pos: numpy array [x, y] in world frame. + """ + + row, col = int(pos[0]), int(pos[1]) + x = self._occupancy_map.origin[0] + (col + 0.5) * self._occupancy_map.resolution + y = self._occupancy_map.origin[1] + (row + 0.5) * self._occupancy_map.resolution + return np.array([x, y], dtype=np.float32) + + def _is_valid_grid_pos(self, pos: np.ndarray, shape: tuple[int, int]) -> bool: + """Check if grid position is within bounds (internal numpy representation)""" + row, col = int(pos[0]), int(pos[1]) + return 0 <= row < shape[0] and 0 <= col < shape[1] + + def _astar_search(self, grid: np.ndarray, start: np.ndarray, goal: np.ndarray) -> list[np.ndarray] | None: + """A* search algorithm with distance field guidance""" + + start_tuple = tuple(start) + goal_tuple = tuple(goal) + + # Check if start or goal is in obstacle + if grid[start_tuple[0], start_tuple[1]] == 1: + self._logger.warning(f"Start position is in obstacle {start_tuple}") + return None + if grid[goal_tuple[0], goal_tuple[1]] == 1: + self._logger.warning(f"Goal position is in obstacle {goal_tuple}") + return None + + # Priority queue: (f_score, counter, position, g_score) + counter = 0 + heap = [(0, counter, start_tuple, 0)] + visited = set() + came_from = {} # For path reconstruction + g_scores = {start_tuple: 0} + + while heap: + f_score, _, current_tuple, current_g = heapq.heappop(heap) + current = np.array(current_tuple) + + if current_tuple in visited: + continue + visited.add(current_tuple) + + # Check if reached goal + if np.linalg.norm(current - goal) < self._cfg.goal_tolerance: + # Reconstruct path + path = [goal] + node = current_tuple + while node in came_from: + path.append(np.array(node)) + node = came_from[node] + path.append(start) + return path[::-1] + + # Explore neighbors (8-connected) + for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1), (-1, -1), (-1, 1), (1, -1), (1, 1)]: + next_pos = current + np.array([dx, dy]) + next_tuple = tuple(next_pos) + + # Check bounds + if not self._is_valid_grid_pos(next_pos, grid.shape): + continue + # Check obstacle + if grid[next_tuple[0], next_tuple[1]] == 1: + continue + if next_tuple in visited: + continue + + # Calculate move cost (diagonal moves cost sqrt(2)) + move_cost = 1.414 if (dx != 0 and dy != 0) else 1.0 + + # Add proximity penalty (except for start and goal) + proximity_penalty = 0.0 + if self._distance_field is not None and next_tuple != goal_tuple: + dist_to_obstacle = self._distance_field[next_tuple[0], next_tuple[1]] + if dist_to_obstacle < self._cfg.safety_distance: + # Penalty increases as we get closer to obstacles + proximity_penalty = self._cfg.proximity_weight * (self._cfg.safety_distance - dist_to_obstacle) + + tentative_g = current_g + move_cost + proximity_penalty + + if next_tuple not in g_scores or tentative_g < g_scores[next_tuple]: + g_scores[next_tuple] = tentative_g + came_from[next_tuple] = current_tuple + h_score = np.linalg.norm(next_pos - goal) + f_score = tentative_g + h_score + + counter += 1 + heapq.heappush(heap, (f_score, counter, next_tuple, tentative_g)) + + self._logger.warning(f"No path found from {start} to {goal}") + return None # No path found + + def _simplify_path(self, path: list[np.ndarray]) -> list[np.ndarray]: + """Simplify path by removing redundant waypoints""" + + if len(path) <= 2: + return path + + simplified = [path[0]] + for i in range(1, len(path) - 1): + # Check if current point is on the line between previous and next + v1 = path[i] - path[i - 1] + v2 = path[i + 1] - path[i] + v1_norm = np.linalg.norm(v1) + v2_norm = np.linalg.norm(v2) + + if v1_norm > 0 and v2_norm > 0: + cos_angle = np.dot(v1, v2) / (v1_norm * v2_norm) + if cos_angle < 1.0 - self._cfg.angle_threshold: # Significant direction change + simplified.append(path[i]) + + simplified.append(path[-1]) + return simplified diff --git a/source/autosim/autosim/capabilities/navigation/a_star/a_star_planner_cfg.py b/source/autosim/autosim/capabilities/navigation/a_star/a_star_planner_cfg.py new file mode 100644 index 0000000..af7818a --- /dev/null +++ b/source/autosim/autosim/capabilities/navigation/a_star/a_star_planner_cfg.py @@ -0,0 +1,20 @@ +from isaaclab.utils import configclass + +from .a_star_planner import AStarPlanner + + +@configclass +class AStarPlannerCfg: + """Configuration for the A* planner.""" + + class_type: type = AStarPlanner + """The class type of the A* planner.""" + + safety_distance: float = 0.5 + """The safety distance from the obstacle, start penalizing when closer than this distance (meters).""" + proximity_weight: float = 1.0 + """The weight of the proximity penalty. More high, more avoid the obstacle.""" + angle_threshold: float = 0.1 + """The threshold of the angle to simplify the path.""" + goal_tolerance: float = 1.5 + """The tolerance of the goal, in grid cells.""" diff --git a/source/autosim/autosim/capabilities/navigation/dwa/dwa_planner.py b/source/autosim/autosim/capabilities/navigation/dwa/dwa_planner.py new file mode 100644 index 0000000..f9afcff --- /dev/null +++ b/source/autosim/autosim/capabilities/navigation/dwa/dwa_planner.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import numpy as np + +if TYPE_CHECKING: + from autosim.core.types import OccupancyMap + + from .dwa_planner_cfg import DWAPlannerCfg + + +class DWAPlanner: + """Dynamic Window Approach (DWA) planner for local obstacle avoidance.""" + + # TODO: Standardize the use of torch.Tensor and np.ndarray + # TODO: move magic number to cfg + + def __init__(self, cfg: DWAPlannerCfg, occupancy_map: OccupancyMap) -> None: + self._cfg = cfg + self._occupancy_map = occupancy_map + + def compute_velocity( + self, + current_pose: np.ndarray, + target: np.ndarray, + ) -> np.ndarray: + """ + Compute optimal velocity using simplified DWA + + Args: + current_pose: [x, y, yaw] + target: [x, y] + + Returns: + [v, w]: optimal velocity + """ + + # Sample full velocity range (simplified, no dynamic window constraint) + v_samples = np.arange(0, self._cfg.max_linear_velocity + self._cfg.v_resolution, self._cfg.v_resolution) + w_samples = np.arange( + -self._cfg.max_angular_velocity, + self._cfg.max_angular_velocity + self._cfg.w_resolution, + self._cfg.w_resolution, + ) + + best_v, best_w = 0.0, 0.0 + best_score = -float("inf") + + for v in v_samples: + for w in w_samples: + # Predict trajectory + trajectory = self._predict_trajectory(current_pose, v, w) + # Evaluate trajectory + score = self._evaluate_trajectory(trajectory, target) + + if score > best_score: + best_score = score + best_v, best_w = v, w + + return np.array([best_v, best_w]) + + def _predict_trajectory(self, pose: np.ndarray, v: float, w: float) -> np.ndarray: + """Predict trajectory given velocity""" + + trajectory = [] + x, y, yaw = pose + + steps = int(self._cfg.predict_time / self._cfg.dt) + for _ in range(steps): + x += v * np.cos(yaw) * self._cfg.dt + y += v * np.sin(yaw) * self._cfg.dt + yaw += w * self._cfg.dt + trajectory.append([x, y, yaw]) + + return np.array(trajectory) + + def _evaluate_trajectory(self, trajectory: np.ndarray, target: np.ndarray) -> float: + """Evaluate trajectory quality using grid-based collision check""" + + # Distance to target (minimize) + final_pos = trajectory[-1, :2] + dist_to_target = np.linalg.norm(final_pos - target) + target_score = 1.0 / (dist_to_target + 0.1) + + # Collision check using occupancy grid (O(1) per point) + # Skip first few points to allow escaping from near-obstacle positions + check_start = min(3, len(trajectory)) + for point in trajectory[check_start:]: + if self._is_collision(point[0], point[1]): + return -1000.0 # Collision, reject trajectory + + # Velocity score (prefer higher velocity toward target) + velocity_score = np.linalg.norm(trajectory[-1, :2] - trajectory[0, :2]) if len(trajectory) > 1 else 0 + + # Combined score + return target_score * 1.5 + velocity_score * 0.5 + + def _is_collision(self, x: float, y: float) -> bool: + """Check collision using occupancy grid (O(1) lookup)""" + + col = int((x - self._occupancy_map.origin[0]) / self._occupancy_map.resolution) + row = int((y - self._occupancy_map.origin[1]) / self._occupancy_map.resolution) + if ( + 0 <= row < self._occupancy_map.occupancy_map.shape[0] + and 0 <= col < self._occupancy_map.occupancy_map.shape[1] + ): + return self._occupancy_map.occupancy_map[row, col] == 1 + # Out of bounds - assume free if close to boundary, collision if far + return False # Allow movement outside map diff --git a/source/autosim/autosim/capabilities/navigation/dwa/dwa_planner_cfg.py b/source/autosim/autosim/capabilities/navigation/dwa/dwa_planner_cfg.py new file mode 100644 index 0000000..9e984b9 --- /dev/null +++ b/source/autosim/autosim/capabilities/navigation/dwa/dwa_planner_cfg.py @@ -0,0 +1,30 @@ +from isaaclab.utils import configclass + +from .dwa_planner import DWAPlanner + + +@configclass +class DWAPlannerCfg: + """Configuration for the DWA planner.""" + + class_type: type = DWAPlanner + """The class type of the DWA planner.""" + + max_linear_velocity: float = 1.0 + """The maximum linear velocity of the robot base.""" + max_angular_velocity: float = 1.0 + """The maximum angular velocity of the robot base.""" + max_linear_acceleration: float = 0.5 + """The maximum linear acceleration of the robot base.""" + max_angular_acceleration: float = 1.0 + """The maximum angular acceleration of the robot base.""" + dt: float | None = None + """The time step of the navigation. If None, will calculate from the environment.""" + predict_time: float = 2.0 + """The prediction time.""" + v_resolution: float = 0.1 + """The velocity resolution.""" + w_resolution: float = 0.2 + """The angular velocity resolution.""" + yaw_facing_threshold: float = 0.2 + """The threshold of the yaw to face the target (radians).""" diff --git a/source/autosim/autosim/capabilities/navigation/occupancy_map.py b/source/autosim/autosim/capabilities/navigation/occupancy_map.py new file mode 100644 index 0000000..6ae56c6 --- /dev/null +++ b/source/autosim/autosim/capabilities/navigation/occupancy_map.py @@ -0,0 +1,208 @@ +from __future__ import annotations + +from dataclasses import MISSING + +import numpy as np +import torch +from isaaclab.envs import ManagerBasedEnv +from isaaclab.utils import configclass +from pxr import Usd, UsdGeom + +from autosim.core.logger import AutoSimLogger +from autosim.core.types import MapBounds, OccupancyMap + +_logger = AutoSimLogger("OccupancyMap") + + +@configclass +class OccupancyMapCfg: + """Configuration for the occupancy map.""" + + floor_prim_suffix: str = MISSING + """The suffix of the floor prim.""" + max_world_extent: float = 100.0 + """The maximum extent of the world in meters.""" + max_map_size: int = 2000 + """The maximum size of the map in cells.""" + min_xy_extent: float = 0.01 + """Minimum xy extent to consider as obstacle (1cm by default).""" + cell_size: float = 0.05 + """The size of the cell in meters.""" + sample_height: float = 0.5 + """The height to sample the occupancy map at, in meters.""" + height_tolerance: float = 0.2 + """The tolerance for the height sampling.""" + + +def _get_prim_bounds(stage, prim_path: str, verbose: bool = True) -> tuple[np.ndarray, np.ndarray]: + """Get bounding box of a prim + + Returns: + min_bound, max_bound + """ + + prim = stage.GetPrimAtPath(prim_path) + + # Get bounding box + bbox_cache = UsdGeom.BBoxCache(Usd.TimeCode.Default(), includedPurposes=[UsdGeom.Tokens.default_]) + bbox = bbox_cache.ComputeWorldBound(prim) + + # Aligned bounding box range + aligned_box = bbox.ComputeAlignedBox() + min_point = aligned_box.GetMin() + max_point = aligned_box.GetMax() + + if verbose: + _logger.info(f"Prim '{prim_path}' bounds: min={list(min_point)}, max={list(max_point)}") + + return np.array([min_point[0], min_point[1], min_point[2]]), np.array([max_point[0], max_point[1], max_point[2]]) + + +def _collect_collision_prims( + stage, floor_prim_path: str, sample_height_min: float, sample_height_max: float, min_xy_extent: float = 0.01 +) -> list: + """Collect collision primitives from the scene""" + + collision_prims = [] + bbox_cache = UsdGeom.BBoxCache(Usd.TimeCode.Default(), includedPurposes=[UsdGeom.Tokens.default_]) + + for prim in stage.Traverse(): + path_str = str(prim.GetPath()) + + # Skip the floor itself and robot + if floor_prim_path in path_str or "Robot" in path_str or "robot" in path_str.lower(): + continue + + # Skip lights, cameras, and other non-geometry prims + if any(skip in path_str.lower() for skip in ["light", "camera", "looks", "material"]): + continue + + # Check if prim has geometry + has_geometry = ( + prim.IsA(UsdGeom.Mesh) + or prim.IsA(UsdGeom.Cube) + or prim.IsA(UsdGeom.Cylinder) + or prim.IsA(UsdGeom.Sphere) + or prim.IsA(UsdGeom.Capsule) + ) + if not has_geometry and prim.IsA(UsdGeom.Xform): + # Check if it's a group that might contain geometry + for child in prim.GetChildren(): + if child.IsA(UsdGeom.Mesh) or child.IsA(UsdGeom.Cube): + has_geometry = True + break + + if has_geometry: + try: + # Get bounding box + bbox = bbox_cache.ComputeWorldBound(prim) + aligned_box = bbox.ComputeAlignedBox() + prim_min = aligned_box.GetMin() + prim_max = aligned_box.GetMax() + + # Check if this prim intersects our sampling height range + if prim_min[2] <= sample_height_max and prim_max[2] >= sample_height_min: + # Only include if it has significant XY extent + xy_extent_x = prim_max[0] - prim_min[0] + xy_extent_y = prim_max[1] - prim_min[1] + if xy_extent_x > min_xy_extent and xy_extent_y > min_xy_extent: + collision_prims.append({ + "path": path_str, + "min": np.array([prim_min[0], prim_min[1], prim_min[2]]), + "max": np.array([prim_max[0], prim_max[1], prim_max[2]]), + }) + except Exception: + # Skip prims that can't be processed + continue + + return collision_prims + + +def get_occupancy_map(env: ManagerBasedEnv, cfg: OccupancyMapCfg) -> OccupancyMap: + """Generate occupancy map from IsaacLab environment. + + Args: + env: The IsaacLab environment. + cfg: The configuration for the occupancy map. + + Returns: + The occupancy map. + """ + + stage = env.scene.stage + + floor_prim_path = f"/World/envs/env_0/{cfg.floor_prim_suffix}" + + min_bound, max_bound = _get_prim_bounds(stage, floor_prim_path) + + # Validate bounds - check for unreasonable values (inf, nan, or too large) + world_extent_x = max_bound[0] - min_bound[0] + world_extent_y = max_bound[1] - min_bound[1] + + bounds_invalid = ( + not np.isfinite(world_extent_x) + or not np.isfinite(world_extent_y) + or world_extent_x > cfg.max_world_extent + or world_extent_y > cfg.max_world_extent + or world_extent_x <= 0 + or world_extent_y <= 0 + ) + + if bounds_invalid: + raise ValueError(f"Floor bounds invalid or too large: extent_x={world_extent_x}, extent_y={world_extent_y}") + + # Calculate map bounds (use floor bounds) + map_min_x, map_max_x = min_bound[0], max_bound[0] + map_min_y, map_max_y = min_bound[1], max_bound[1] + + map_width = int((map_max_x - map_min_x) / cfg.cell_size) + 1 + map_height = int((map_max_y - map_min_y) / cfg.cell_size) + 1 + + # Clamp map size to prevent memory issues + if map_width > cfg.max_map_size or map_height > cfg.max_map_size: + _logger.warning(f"Map size {map_width}x{map_height} exceeds max {cfg.max_map_size}") + new_cell_size = max((map_max_x - map_min_x) / cfg.max_map_size, (map_max_y - map_min_y) / cfg.max_map_size) + _logger.info(f"Adjusting cell_size from {cfg.cell_size:.3f}m to {new_cell_size:.3f}m") + cfg.cell_size = new_cell_size + map_width = int((map_max_x - map_min_x) / cfg.cell_size) + 1 + map_height = int((map_max_y - map_min_y) / cfg.cell_size) + 1 + _logger.info( + f"Generating map: {map_width}x{map_height} cells, bounds: x=[{map_min_x:.2f}, {map_max_x:.2f}]," + f" y=[{map_min_y:.2f}, {map_max_y:.2f}]" + ) + + # Initialize occupancy map (0 = free, 1 = occupied) + occupancy_map = np.zeros((map_height, map_width), dtype=np.int8) + + # Calculate height range for sampling + sample_height_min = min_bound[2] + cfg.sample_height - cfg.height_tolerance + sample_height_max = min_bound[2] + cfg.sample_height + cfg.height_tolerance + _logger.info(f"Sampling height range: [{sample_height_min:.2f}, {sample_height_max:.2f}]") + + # Collect collision primitives + collision_prims = _collect_collision_prims( + stage, floor_prim_path, sample_height_min, sample_height_max, cfg.min_xy_extent + ) + _logger.info(f"Found {len(collision_prims)} collision primitives") + + # Mark occupied cells + for prim_info in collision_prims: + prim_min = prim_info["min"] + prim_max = prim_info["max"] + + # Calculate grid indices for this prim's bounding box + min_i = max(0, int((prim_min[1] - map_min_y) / cfg.cell_size)) + max_i = min(map_height - 1, int((prim_max[1] - map_min_y) / cfg.cell_size) + 1) + min_j = max(0, int((prim_min[0] - map_min_x) / cfg.cell_size)) + max_j = min(map_width - 1, int((prim_max[0] - map_min_x) / cfg.cell_size) + 1) + + # Mark all cells in this bounding box as occupied + occupancy_map[min_i : max_i + 1, min_j : max_j + 1] = 1 + + return OccupancyMap( + occupancy_map=torch.from_numpy(occupancy_map).to(env.device), + origin=(map_min_x, map_min_y), + resolution=cfg.cell_size, + map_bounds=MapBounds(min_x=map_min_x, max_x=map_max_x, min_y=map_min_y, max_y=map_max_y), + floor_bounds=MapBounds(min_x=min_bound[0], max_x=max_bound[0], min_y=min_bound[1], max_y=max_bound[1]), + ) diff --git a/source/autosim/autosim/core/__init__.py b/source/autosim/autosim/core/__init__.py new file mode 100644 index 0000000..e876bed --- /dev/null +++ b/source/autosim/autosim/core/__init__.py @@ -0,0 +1,20 @@ +from .action_adapter import ActionAdapterBase, ActionAdapterCfg +from .registration import ( + SkillRegistry, + list_pipelines, + make_pipeline, + register_pipeline, + register_skill, + unregister_pipeline, +) + +__all__ = [ + "SkillRegistry", + "list_pipelines", + "make_pipeline", + "register_pipeline", + "register_skill", + "unregister_pipeline", + "ActionAdapterBase", + "ActionAdapterCfg", +] diff --git a/source/autosim/autosim/core/action_adapter.py b/source/autosim/autosim/core/action_adapter.py new file mode 100644 index 0000000..141d2f0 --- /dev/null +++ b/source/autosim/autosim/core/action_adapter.py @@ -0,0 +1,104 @@ +from dataclasses import MISSING +from typing import Protocol + +import torch +from isaaclab.envs import ManagerBasedEnv +from isaaclab.utils import configclass + +from autosim.core.logger import AutoSimLogger +from autosim.core.skill import Skill + +from .types import SkillOutput + + +class ApplyMethodProtocol(Protocol): + """Protocol for apply methods - fixed signature that users must follow.""" + + def __call__(self, skill_output: SkillOutput, env: ManagerBasedEnv) -> torch.Tensor: + """Apply method signature - fixed parameters.""" + ... + + +@configclass +class ActionAdapterCfg: + """Configuration for the action adapter.""" + + class_type: type = MISSING + """The class type of the action adapter.""" + skip_apply_skills: list[str] = [] + """The skills that should be skipped for applying the action. e.g. "moveto" should be skipped if the robot is fixed to the environment""" + + +class ActionAdapterBase: + """Base class for all action adapters. + + Users should define instance methods and register them using `register_apply_method()`. + All apply methods must follow this fixed signature: + + def method_name(self, skill_output: SkillOutput, env: ManagerBasedEnv) -> torch.Tensor + + Example: + >>> class MyActionAdapter(ActionAdapterBase): + >>> def __init__(self, cfg: ActionAdapterCfg) -> None: + >>> super().__init__(cfg) + >>> self.register_apply_method("grasp", self.handle_grasp) + + >>> def handle_grasp(self, skill_output: SkillOutput, env: ManagerBasedEnv) -> torch.Tensor: + >>> # Your implementation here + >>> ... + """ + + cfg: ActionAdapterCfg + """The configuration of the action adapter.""" + + def __init__(self, cfg: ActionAdapterCfg) -> None: + self.cfg = cfg + self._apply_map: dict[str, ApplyMethodProtocol] = {} + self._skip_apply_skills = self.cfg.skip_apply_skills + self._logger = AutoSimLogger("ActionAdapter") + + def register_apply_method(self, skill_name: str, method: ApplyMethodProtocol) -> None: + """Register an apply method for a specific skill.""" + + self._apply_map[skill_name] = method + + def should_skip_apply(self, skill: Skill) -> bool: + """ + Check if the skill should be skipped for applying the action. + + Args: + skill: The skill instance. + """ + + return skill.cfg.name in self._skip_apply_skills + + def apply(self, skill: Skill, skill_output: SkillOutput, env: ManagerBasedEnv) -> torch.Tensor: + """ + Apply the skill output to the environment. + + Args: + skill: The skill instance. + skill_output: The output of the skill. + env: The environment. + + Returns: + The applied action. [action_dim] + """ + + skill_type = skill.cfg.name + if skill_type in self._apply_map: + return self._apply_map[skill_type](skill_output, env) + else: + return self._default_apply(skill_output, env) + + def _default_apply(self, skill_output: SkillOutput, env: ManagerBasedEnv) -> torch.Tensor: + """ + Default apply method for the skill. + + Args: + skill_output: The output of the skill. + env: The environment. + """ + + self._logger.warning("Action adapter for skill not implemented. Using default apply.") + return skill_output.action diff --git a/source/autosim/autosim/core/decomposer.py b/source/autosim/autosim/core/decomposer.py new file mode 100644 index 0000000..be97190 --- /dev/null +++ b/source/autosim/autosim/core/decomposer.py @@ -0,0 +1,54 @@ +import json +from abc import ABC, abstractmethod +from dataclasses import MISSING, asdict +from pathlib import Path + +from dacite import from_dict +from isaaclab.utils import configclass + +from autosim.core.logger import AutoSimLogger + +from .types import DecomposeResult, EnvExtraInfo + + +@configclass +class DecomposerCfg: + """Configuration for the decomposer.""" + + class_type: type = MISSING + """The class type of the decomposer.""" + + cache_dir: str = "~/.cache/autosim/decomposer_cache" + """The cache directory for the decomposer.""" + + +class Decomposer(ABC): + def __init__(self, cfg: DecomposerCfg) -> None: + self.cfg = cfg + self._cache_dir = Path(self.cfg.cache_dir).expanduser() + self._cache_dir.mkdir(parents=True, exist_ok=True) + self._logger = AutoSimLogger("Decomposer") + + @abstractmethod + def decompose(self, extra_info: EnvExtraInfo) -> DecomposeResult: + """Decompose the task with the given extra information.""" + + raise NotImplementedError(f"{self.__class__.__name__}.decompose() must be implemented.") + + def is_cache_hit(self, task_name: str) -> bool: + """Check if the cache hit for the given task name.""" + return (self._cache_dir / f"{task_name}.json").exists() + + def write_cache(self, task_name: str, decompose_result: DecomposeResult) -> None: + """Write the cache for the given task name.""" + + with open(self._cache_dir / f"{task_name}.json", "w") as f: + json.dump(asdict(decompose_result), f, indent=4) + + def read_cache(self, task_name: str) -> DecomposeResult: + """Read the cache for the given task name.""" + + if not (self._cache_dir / f"{task_name}.json").exists(): + raise FileNotFoundError(f"Cache file not found for task name: {task_name} in storage: {self.cfg.cache_dir}") + with open(self._cache_dir / f"{task_name}.json") as f: + return from_dict(DecomposeResult, json.load(f)) diff --git a/source/autosim/autosim/core/logger.py b/source/autosim/autosim/core/logger.py new file mode 100644 index 0000000..7576105 --- /dev/null +++ b/source/autosim/autosim/core/logger.py @@ -0,0 +1,35 @@ +import logging + + +class AutoSimLogger: + """Logger class for AutoSim pipeline debugging and monitoring.""" + + def __init__(self, name: str, level: int = logging.INFO): + self._name = name + self._level = level + self._logger = None + + @property + def logger(self): + if self._logger is None: + self._logger = logging.getLogger(self._name) + if not self._logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter("[%(name)s] %(levelname)s: %(message)s") + handler.setFormatter(formatter) + self._logger.addHandler(handler) + self._logger.setLevel(self._level) + self._logger.propagate = False + return self._logger + + def debug(self, msg, *args, **kwargs): + self.logger.debug(msg, *args, **kwargs) + + def info(self, msg, *args, **kwargs): + self.logger.info(msg, *args, **kwargs) + + def warning(self, msg, *args, **kwargs): + self.logger.warning(msg, *args, **kwargs) + + def error(self, msg, *args, **kwargs): + self.logger.error(msg, *args, **kwargs) diff --git a/source/autosim/autosim/core/pipeline.py b/source/autosim/autosim/core/pipeline.py new file mode 100644 index 0000000..497a85d --- /dev/null +++ b/source/autosim/autosim/core/pipeline.py @@ -0,0 +1,234 @@ +from abc import ABC, abstractmethod +from dataclasses import fields + +import torch +from isaaclab.envs import ManagerBasedEnv +from isaaclab.utils import configclass + +from autosim.capabilities.motion_planning import CuroboPlanner, CuroboPlannerCfg +from autosim.capabilities.navigation import OccupancyMapCfg, get_occupancy_map +from autosim.core.logger import AutoSimLogger +from autosim.core.skill import Skill, SkillGoal +from autosim.skills import ( + AutoSimSkillsExtraCfg, + CuroboSkillExtraCfg, + NavigateSkillExtraCfg, +) + +from .action_adapter import ActionAdapterBase, ActionAdapterCfg +from .decomposer import Decomposer, DecomposerCfg +from .registration import SkillRegistry +from .types import ( + DecomposeResult, + EnvExtraInfo, + OccupancyMap, + PipelineOutput, + WorldState, +) + + +@configclass +class AutoSimPipelineCfg: + """Configuration for the AutoSim pipeline.""" + + decomposer: DecomposerCfg = DecomposerCfg() + """The decomposer for the AutoSim pipeline.""" + + motion_planner: CuroboPlannerCfg = CuroboPlannerCfg() + """The motion planner for the AutoSim pipeline.""" + + occupancy_map: OccupancyMapCfg = OccupancyMapCfg() + """The occupancy map for the AutoSim pipeline.""" + + skills: AutoSimSkillsExtraCfg = AutoSimSkillsExtraCfg() + """The skills for the AutoSim pipeline.""" + + action_adapter: ActionAdapterCfg = ActionAdapterCfg() + """The action adapter for the AutoSim pipeline.""" + + max_steps: int = 500 + """The maximum number of steps to execute one skill.""" + + +class AutoSimPipeline(ABC): + def __init__(self, cfg: AutoSimPipelineCfg) -> None: + self.cfg = cfg + self._logger = AutoSimLogger("AutoSimPipeline") + + self._decomposer: Decomposer = self.cfg.decomposer.class_type(self.cfg.decomposer) + + # load the environment and extra information + self._env: ManagerBasedEnv = self.load_env() + self._env_extra_info: EnvExtraInfo = self.get_env_extra_info() + self._env_id = 0 + + # robot and env related information + self._robot_name = self._env_extra_info.robot_name + self._robot = self._env.scene[self._robot_name] + self._eef_link_name = self._env_extra_info.ee_link_name + self._eef_link_idx = self._robot.data.body_names.index(self._eef_link_name) + self._robot_base_link_name = self._env_extra_info.robot_base_link_name + self._robot_base_link_idx = self._robot.data.body_names.index(self._robot_base_link_name) + + # initialize the motion planner + self._motion_planner: CuroboPlanner = self.cfg.motion_planner.class_type( + env=self._env, + robot=self._robot, + cfg=self.cfg.motion_planner, + env_id=self._env_id, + ) + + # initialize the occupancy map + self._occupancy_map: OccupancyMap = get_occupancy_map(self._env, self.cfg.occupancy_map) + + # initialize the action adapter + self._action_adapter: ActionAdapterBase = self.cfg.action_adapter.class_type(self.cfg.action_adapter) + + # save generated actions + self._generated_actions = [] + + def run(self) -> PipelineOutput: + """Run the AutoSim pipeline.""" + + # decompose the task with cache hit check + decompose_result: DecomposeResult = self.decompose() + + # execute the pipeline + pipeline_output = self.execute_skill_sequence(decompose_result) + + return pipeline_output + + @abstractmethod + def load_env(self) -> ManagerBasedEnv: + """Load the environment in isaaclab.""" + + raise NotImplementedError(f"{self.__class__.__name__}.load_env() must be implemented.") + + @abstractmethod + def get_env_extra_info(self) -> EnvExtraInfo: + """Get the extra information from the environment.""" + + raise NotImplementedError(f"{self.__class__.__name__}.get_env_extra_info() must be implemented.") + + def reset_env(self) -> None: + """Reset the environment.""" + + self._env.reset() + + def decompose(self) -> DecomposeResult: + """Decompose the task.""" + + if self._decomposer.is_cache_hit(self._env_extra_info.task_name): + decompose_result: DecomposeResult = self._decomposer.read_cache(self._env_extra_info.task_name) + else: + decompose_result: DecomposeResult = self._decomposer.decompose(self._env_extra_info) + self._decomposer.write_cache(self._env_extra_info.task_name, decompose_result) + return decompose_result + + def execute_skill_sequence(self, decompose_result: DecomposeResult): + """Execute the skill sequence.""" + + self._check_skill_extra_cfg() + self.reset_env() + + # TODO: add retry mechanism for skill execution + for subtask in decompose_result.subtasks: + for skill_info in subtask.skills: + + skill = SkillRegistry.create( + skill_info.skill_type, self.cfg.skills.get(skill_info.skill_type).extra_cfg + ) + + if self._action_adapter.should_skip_apply(skill): + self._logger.info(f"Skill {skill_info.skill_type} skipped due to action adapter setting.") + continue + + goal = skill.extract_goal_from_info(skill_info, self._env, self._env_extra_info) + success, steps = self._execute_single_skill(skill, goal) + + if not success: + self._logger.error(f"Skill {skill_info.skill_type} execution failed with {steps} steps.") + raise ValueError(f"Skill {skill_info.skill_type} execution failed with {steps} steps.") + self._logger.info(f"Skill {skill_info.skill_type} executed successfully.({steps} steps)") + self._logger.info( + f"Subtask {subtask.subtask_name} executed successfully with {len(subtask.skills)} skills." + ) + + self.reset_env() + + # build pipeline output + return PipelineOutput(success=True, generated_actions=self._generated_actions) + + def _check_skill_extra_cfg(self) -> None: + """modify the extra configuration of the skills.""" + + if self.cfg.skills.moveto.extra_cfg.use_dwa and self.cfg.skills.moveto.extra_cfg.local_planner.dt is None: + physics_dt = self._env.cfg.sim.dt + decimation = self._env.cfg.decimation + self.cfg.skills.moveto.extra_cfg.local_planner.dt = physics_dt * decimation + for skill_cfg_field in fields(self.cfg.skills): + skill_cfg = self.cfg.skills.get(skill_cfg_field.name) + if isinstance(skill_cfg.extra_cfg, CuroboSkillExtraCfg): + skill_cfg.extra_cfg.curobo_planner = self._motion_planner + if isinstance(skill_cfg.extra_cfg, NavigateSkillExtraCfg): + skill_cfg.extra_cfg.occupancy_map = self._occupancy_map + + def _execute_single_skill(self, skill: Skill, goal: SkillGoal) -> tuple[bool, int]: + """Execute a single skill.""" + + world_state: WorldState = self._build_world_state() + plan_success = skill.plan(world_state, goal) + + steps = 0 + while plan_success and steps < self.cfg.max_steps: + world_state = self._build_world_state() + + output = skill.step(world_state) + + action = torch.zeros(self._env.action_space.shape, device=self._env.device) + action[self._env_id, :] = self._action_adapter.apply(skill, output, self._env) + + self._env.step(action) + self._generated_actions.append(action) + + steps += 1 + if output.done: + return True, steps + + return False, steps + + def _build_world_state(self) -> WorldState: + """Build the world state.""" + + joint_pos_limits = self._robot.data.joint_pos_limits[self._env_id, :, :] + lower, upper = joint_pos_limits[:, 0], joint_pos_limits[:, 1] + robot_joint_pos = torch.clamp(self._robot.data.joint_pos[self._env_id, :], min=lower, max=upper) + + robot_joint_vel = self._robot.data.joint_vel[self._env_id, :] + robot_ee_pose = self._robot.data.body_link_pose_w[self._env_id, self._eef_link_idx] + + robot_base_pose = self._robot.data.body_link_pose_w[ + self._env_id, self._robot_base_link_idx + ] # [x, y, z, qw, qx, qy, qz] + w, x, y, z = robot_base_pose[3:7] + sin_yaw = 2 * (w * z + x * y) + cos_yaw = 1 - 2 * (y**2 + z**2) + yaw = torch.atan2(sin_yaw, cos_yaw) + robot_base_pose = torch.stack((robot_base_pose[0], robot_base_pose[1], yaw)) # [x, y, yaw] + + sim_joint_names = self._robot.data.joint_names + + objects_dict = dict() + for obj_name in self._env.scene.keys(): + obj = self._env.scene[obj_name] + if hasattr(obj, "data") and hasattr(obj.data, "root_pose_w") and obj_name != self._robot_name: + objects_dict[obj_name] = obj.data.root_pose_w[self._env_id] + + return WorldState( + robot_joint_pos=robot_joint_pos, + robot_joint_vel=robot_joint_vel, + robot_ee_pose=robot_ee_pose, + robot_base_pose=robot_base_pose, + sim_joint_names=sim_joint_names, + objects=objects_dict, + ) diff --git a/source/autosim/autosim/core/registration.py b/source/autosim/autosim/core/registration.py new file mode 100644 index 0000000..e1e2b08 --- /dev/null +++ b/source/autosim/autosim/core/registration.py @@ -0,0 +1,239 @@ +"""AutoSim registration system.""" + +from __future__ import annotations + +import importlib +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, Protocol + +if TYPE_CHECKING: + from autosim.core.pipeline import AutoSimPipeline as Pipeline + from autosim.core.skill import Skill, SkillExtraCfg + + +"""Pipeline Registration System + +This section provides the registration and instantiation system for +pipelines. Pipelines can be registered with an ID and entry points, then +created using make_pipeline(). + +Usage: + # 1. Register a pipeline + >>> register_pipeline( + >>> id="MyPipeline-v0", + >>> entry_point="autosim.pipelines:MyPipeline", + >>> cfg_entry_point="autosim.pipelines:MyPipelineCfg", + >>> ) + + # 2. Create a pipeline instance + >>> pipeline = make_pipeline("MyPipeline-v0") + >>> pipeline.run() + + # 3. List all registered pipelines + >>> pipeline_ids = list_pipelines() + + # 4. Unregister a pipeline + >>> unregister_pipeline("MyPipeline-v0") +""" + + +class PipelineCreator(Protocol): + """Function that creates a pipeline instance.""" + + def __call__(self, **kwargs: Any) -> Pipeline: ... + + +class ConfigCreator(Protocol): + """Function that creates a configuration instance.""" + + def __call__(self, **kwargs: Any) -> Any: ... + + +@dataclass +class PipelineEntry: + """Entry for a pipeline in the registry. + + Attributes: + id: Unique identifier for the pipeline (e.g., "MyPipeline-v0"). + entry_point: String pointing to the Pipeline class or a callable that creates a pipeline instance. Format: "module.path:ClassName". + cfg_entry_point: String pointing to the configuration class or a callable that creates a config instance. Format: "module.path:ConfigClass". + """ + + id: str + entry_point: PipelineCreator | str | None = field(default=None) + cfg_entry_point: ConfigCreator | str | None = field(default=None) + + +# Global registry for pipelines +pipeline_registry: dict[str, PipelineEntry] = {} + + +def register_pipeline( + id: str, + entry_point: PipelineCreator | str | None = None, + cfg_entry_point: ConfigCreator | str | None = None, +) -> None: + """Register a pipeline in the global registry.""" + + assert entry_point is not None, "Entry point must be provided." + assert cfg_entry_point is not None, "Configuration entry point must be provided." + + if id in pipeline_registry: + raise ValueError( + f"Pipeline with id '{id}' is already registered. To register a new version, use a different id (e.g.," + f" '{id}-v1')." + ) + + entry = PipelineEntry( + id=id, + entry_point=entry_point, + cfg_entry_point=cfg_entry_point, + ) + pipeline_registry[entry.id] = entry + + +def _load_entry_point(entry_point: str) -> Any: + """Load a class or function from an entry point string.""" + + try: + mod_name, attr_name = entry_point.split(":") + mod = importlib.import_module(mod_name) + obj = getattr(mod, attr_name) + return obj + except (ValueError, ModuleNotFoundError, AttributeError) as e: + raise ValueError( + f"Could not resolve entry point '{entry_point}'. Expected format: 'module.path:ClassName'. Error: {e}" + ) from e + + +def _load_creator(creator: str | PipelineCreator | ConfigCreator) -> PipelineCreator | ConfigCreator: + + if isinstance(creator, str): + return _load_entry_point(creator) + else: + return creator + + +def make_pipeline( + id: str, +) -> Pipeline: + """Create a pipeline instance from the registry.""" + + if id not in pipeline_registry: + raise ValueError( + f"Pipeline '{id}' not found in registry. You can list all registered pipelines with list_pipelines()." + ) + + entry = pipeline_registry[id] + + pipeline_creator = _load_creator(entry.entry_point) + cfg_creator = _load_creator(entry.cfg_entry_point) + + # Instantiate the pipeline + try: + cfg = cfg_creator() + pipeline = pipeline_creator(cfg=cfg) + except TypeError as e: + entry_point_str = entry.entry_point if isinstance(entry.entry_point, str) else str(entry.entry_point) + raise TypeError( + f"Failed to instantiate pipeline '{id}' with entry point '{entry_point_str}'. Error: {e}" + ) from e + + return pipeline + + +def list_pipelines() -> list[str]: + """List all registered pipeline IDs.""" + + return sorted(pipeline_registry.keys()) + + +def unregister_pipeline(id: str) -> None: + """Unregister a pipeline from the registry.""" + + if id not in pipeline_registry: + raise ValueError(f"Pipeline '{id}' not found in registry.") + del pipeline_registry[id] + + +"""Skill Registration System +This section provides the registration and instantiation system for skills. +Skills can be registered manually, then created using SkillRegistry.create(). + +Usage: + # 1. Using decorator (recommended) + >>> @register_skill("reach", "Reach to target pose", ["curobo"]) + >>> class ReachSkill(Skill): + >>> ... + + # 2. Manual registration + >>> class MySkill(Skill): + >>> cfg = SkillCfg(name="my_skill", description="My custom skill") + >>> SkillRegistry.register(MySkill) + + # 3. Create a skill instance + >>> skill = SkillRegistry.create("reach", extra_cfg=SkillExtraCfg(param="value")) + + # 4. List all registered skills + >>> skill_configs = SkillRegistry.list_skills() +""" + + +class SkillRegistry: + """Skill Registry - Supports manual registration of skills.""" + + _instance: SkillRegistry | None = None + _skills: dict[str, Skill] = dict() + + def __new__(cls) -> SkillRegistry: + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + @classmethod + def register(cls, skill_cls: type) -> type: + """Register a skill in the registry.""" + from autosim.core.skill import Skill + + if not issubclass(skill_cls, Skill): + raise TypeError(f"Skill class '{skill_cls.__name__}' must inherit from Skill.") + if skill_cls.cfg.name in cls._skills and cls._skills[skill_cls.cfg.name] != skill_cls: + raise ValueError( + f"Skill with name '{skill_cls.cfg.name}' already registered with different class:" + f" {cls._skills[skill_cls.cfg.name]}" + ) + + cls._skills[skill_cls.cfg.name] = skill_cls + return skill_cls + + @classmethod + def get(cls, name: str) -> type: + """Get a skill from the registry.""" + + if name not in cls._skills: + raise ValueError(f"Skill '{name}' not found in registry.") + return cls._skills[name] + + @classmethod + def create(cls, name: str, extra_cfg: SkillExtraCfg) -> Skill: + """Create a skill instance from the registry, extra_cfg will overwrite the default value in the skill configuration.""" + + skill_cls = cls.get(name) + return skill_cls(extra_cfg) + + @classmethod + def list_skills(cls) -> list[str]: + """List all registered skill names.""" + + return [skill_cls.get_cfg() for skill_cls in cls._skills.values()] + + +def register_skill(name: str, description: str, cfg_type: type) -> type: + """Decorator: Simplify skill definition.""" + + def decorator(cls: type) -> type: + cls.cfg = cfg_type(name=name, description=description) + SkillRegistry.register(cls) + return cls + + return decorator diff --git a/source/autosim/autosim/core/skill.py b/source/autosim/autosim/core/skill.py new file mode 100644 index 0000000..e7c27e4 --- /dev/null +++ b/source/autosim/autosim/core/skill.py @@ -0,0 +1,119 @@ +from abc import ABC, abstractmethod + +from isaaclab.envs import ManagerBasedEnv +from isaaclab.utils import configclass + +from .types import ( + EnvExtraInfo, + SkillGoal, + SkillInfo, + SkillOutput, + SkillStatus, + WorldState, +) + + +@configclass +class SkillExtraCfg: + """Extra configuration for the skill.""" + + +@configclass +class SkillCfg: + """Configuration for the skill.""" + + name: str = "base_skill" + """The name of the skill.""" + description: str = "Base skill class." + """The description of the skill (for prompt generation).""" + extra_cfg: SkillExtraCfg = SkillExtraCfg() + """The extra configuration for the skill (used in specific skill classes).""" + + +class Skill(ABC): + """Base class for all skills.""" + + cfg: SkillCfg + """The configuration of the skill.""" + + def __init__(self, extra_cfg: SkillExtraCfg) -> None: + self._status: SkillStatus = SkillStatus.IDLE + self.cfg.extra_cfg = extra_cfg + + @classmethod + def get_cfg(cls) -> SkillCfg: + """Get the configuration of the skill.""" + + return cls.cfg + + @abstractmethod + def extract_goal_from_info( + self, skill_info: SkillInfo, env: ManagerBasedEnv, env_extra_info: EnvExtraInfo + ) -> SkillGoal: + """Extract the goal from the skill information. + + Args: + skill_info: The skill information. + env: The environment. + env_extra_info: The extra information of the environment. + + Returns: + The goal of the skill. + """ + + raise NotImplementedError(f"{self.__class__.__name__}.extract_goal_from_info() must be implemented.") + + def plan(self, state: WorldState, goal: SkillGoal) -> bool: + """Plan the skill. + + Args: + state: The current state of the world. + goal: The goal of the skill. + + Returns: + True if the skill is planned successfully, False otherwise. + """ + + self._status = SkillStatus.PLANNING + success = self.execute_plan(state, goal) + if success: + self._status = SkillStatus.EXECUTING + else: + self._status = SkillStatus.FAILED + return success + + @abstractmethod + def execute_plan(self, state: WorldState, goal: SkillGoal) -> bool: + """Execute the plan of the skill. + + Args: + state: The current state of the world. + goal: The goal of the skill. + + Returns: + True if the skill is planned successfully, False otherwise. + """ + + raise NotImplementedError(f"{self.__class__.__name__}.plan() must be implemented.") + + @abstractmethod + def step(self, state: WorldState) -> SkillOutput: + """Execute one step of the skill. + + Args: + state: The current state of the world. + + Returns: + The output of the skill, containing the action, done, success, info, and trajectory. + """ + + raise NotImplementedError(f"{self.__class__.__name__}.step() must be implemented.") + + def reset(self) -> None: + """Reset the skill.""" + + self._status = SkillStatus.IDLE + + def __repr__(self) -> str: + + return f"{self.__class__.__name__}(status={self._status.value})" diff --git a/source/autosim/autosim/core/types.py b/source/autosim/autosim/core/types.py new file mode 100644 index 0000000..824a6fb --- /dev/null +++ b/source/autosim/autosim/core/types.py @@ -0,0 +1,267 @@ +from collections.abc import Iterator +from dataclasses import dataclass, field +from enum import Enum +from typing import Any + +import torch + +"""PIPELINE RELATED TYPES""" + + +@dataclass +class PipelineOutput: + """Output of the pipeline execution.""" + + success: bool + """Whether the pipeline execution was successful.""" + generated_actions: list[torch.Tensor] + """The generated actions of the pipeline.""" + + +"""SKILL RELATED TYPES""" + + +class SkillStatus(Enum): + """Status of the skill execution.""" + + IDLE = "idle" + """The skill is idle.""" + PLANNING = "planning" + """The skill is planning.""" + EXECUTING = "executing" + """The skill is executing.""" + SUCCESS = "success" + """The skill execution was successful.""" + FAILED = "failed" + """The skill execution failed.""" + + +@dataclass +class SkillGoal: + """Goal of the skill.""" + + target_object: str | None = None + """The target object of the skill.""" + target_pose: torch.Tensor | None = None + """The target pose of the skill.""" + extra_target_poses: dict[str, torch.Tensor] | None = None + """The target poses of the extra end-effectors. dict[link_name, target_pose].""" + + +@dataclass +class SkillOutput: + """Output of the skill execution.""" + + action: torch.Tensor + """The action of the skill. shape: [action_dim]""" + done: bool + """Whether the skill execution is done.""" + success: bool + """Whether the skill execution was successful.""" + info: dict[str, Any] = field(default_factory=dict) + """The information of the skill execution.""" + + +"""ENVIRONMENT RELATED TYPES""" + + +@dataclass +class EnvExtraInfo: + """Extra information from the environment.""" + + task_name: str + """The name of the task, need to be the same as the task name in the gymnasium registry.""" + objects: list[str] | None = None + """The objects in the environment.""" + additional_prompt_contents: str | None = None + """The additional prompt contents for the task decomposition.""" + + robot_name: str = "robot" + """The name of the robot in the scene.""" + robot_base_link_name: str = "base_link" + """The name of the base link of the robot (it is not necessarily the root link of the robot).""" + ee_link_name: str = "ee_link" + """The name of the end-effector link.""" + + object_reach_target_poses: dict[str, list[torch.Tensor]] = field(default_factory=dict) + """The reach target poses in the objects frame. each object can have a list of reach target poses [x, y, z, qw, qx, qy, qz] in the order of execution.""" + object_extra_reach_target_poses: dict[str, dict[str, list[torch.Tensor]]] = field(default_factory=dict) + """The extra reach target poses in the objects frame. each object can have a list of extra reach target poses [x, y, z, qw, qx, qy, qz] with ee_name as the key in the order of execution.""" + + def __post_init__(self): + self._object_reach_target_poses_iterator_dict = { + object_name: self._build_iterator(reach_target_poses) + for object_name, reach_target_poses in self.object_reach_target_poses.items() + } + self._object_extra_reach_target_poses_iterator_dict = { + object_name: { + ee_name: self._build_iterator(extra_reach_target_poses) + for ee_name, extra_reach_target_poses in extra_reach_target_poses.items() + } + for object_name, extra_reach_target_poses in self.object_extra_reach_target_poses.items() + } + + def _build_iterator(self, value_list: list[torch.Tensor]) -> Iterator[torch.Tensor]: + yield from value_list + + def get_next_reach_target_pose(self, object_name: str) -> torch.Tensor: + return next(self._object_reach_target_poses_iterator_dict[object_name]) + + def get_next_extra_reach_target_pose(self, object_name: str, ee_name: str) -> torch.Tensor: + return next(self._object_extra_reach_target_poses_iterator_dict[object_name][ee_name]) + + +@dataclass +class WorldState: + """The unified state representation of the world.""" + + robot_joint_pos: torch.Tensor + """The joint positions of the robot.""" + robot_joint_vel: torch.Tensor + """The joint velocities of the robot.""" + robot_ee_pose: torch.Tensor + """The end - effector pose of the robot in the world frame. [x, y, z, qw, qx, qy, qz]""" + robot_base_pose: torch.Tensor + """The base pose of the robot in the world frame. [x, y, yaw]""" + sim_joint_names: list[str] + """The joint names of the robot.""" + objects: dict[str, torch.Tensor] = field(default_factory=dict) + """The state of the objects in the world.""" + metadata: dict[str, Any] = field(default_factory=dict) + """The metadata of the world state.""" + + @property + def device(self): + return self.robot_joint_pos.device + + def to(self, device): + """Move all tensors to device""" + self.robot_joint_pos = self.robot_joint_pos.to(device) + self.robot_joint_vel = self.robot_joint_vel.to(device) + self.robot_ee_pose = self.robot_ee_pose.to(device) + self.objects = {k: v.to(device) for k, v in self.objects.items()} + return self + + +"""DECOMPOSER RELATED TYPES""" + + +@dataclass +class ObjectInfo: + """Information of the object.""" + + name: str + """The name of the object.""" + type: str + """The type of the object.""" + graspable: bool + """Whether the object is graspable.""" + initial_location: str + """The initial location of the object.""" + target_location: str + """The target location of the object.""" + role: str + """The role of the object. "manipulated" (needs operation) or "static" (no operation needed)""" + + +@dataclass +class FixtureInfo: + """Information of the fixture.""" + + name: str + """The name of the fixture.""" + type: str + """The type of the fixture.""" + interactive: bool | None = None + """Whether the fixture is interactive.""" + interaction_type: str | None = None + """The type of interaction with the fixture.""" + + +@dataclass +class SkillInfo: + """Information of the skill.""" + + step: int + """The step of the skill, globally sequential across all subtasks""" + skill_type: str + """The type of the skill, must be one of the atomic skills""" + target_object: str + """The target object of the skill.""" + target_type: str + """The type of the target. "object", "fixture", "interactive_element", or "position".""" + description: str + """The description of the skill.""" + + +@dataclass +class SubtaskResult: + """Result of the subtask.""" + + subtask_id: int + """The ID of the subtask.""" + subtask_name: str + """The name of the subtask.""" + description: str + """The description of the subtask.""" + skills: list[SkillInfo] + """The skills of the subtask.""" + + +@dataclass +class DecomposeResult: + """Result of the task decomposition.""" + + task_name: str + """The name of the task.""" + task_description: str + """The description of the task.""" + parent_classes: list[str] + """The parent classes of the task.""" + objects: list[ObjectInfo] + """The objects of the task.""" + fixtures: list[FixtureInfo] + """The fixtures of the task.""" + interactive_elements: list[str] + """The interactive elements of the task.""" + subtasks: list[SubtaskResult] + """The subtasks of the task.""" + success_conditions: list[str] + """The success conditions of the task.""" + total_steps: int + """The total number of steps in the task.""" + skill_sequence: list[str] + """The sequence of skills in the task.""" + + +"""NAVIGATION RELATED TYPES""" + + +@dataclass +class MapBounds: + """Bounds of the map. [min_x, max_x, min_y, max_y]""" + + min_x: float + """The minimum x - coordinate of the map.""" + max_x: float + """The maximum x - coordinate of the map.""" + min_y: float + """The minimum y - coordinate of the map.""" + max_y: float + """The maximum y - coordinate of the map.""" + + +@dataclass +class OccupancyMap: + """Occupancy map of the environment.""" + + occupancy_map: torch.Tensor + """The occupancy map of the environment. 2D array of shape[height, width] 0: free, 1: occupied, -1: unknown.""" + resolution: float + """The resolution of the occupancy map, cell size in meters.""" + origin: tuple[float, float] + """The origin of the occupancy map, (x, y).""" + map_bounds: MapBounds + """The bounds of the occupancy map.""" + floor_bounds: MapBounds + """The bounds of the floor.""" diff --git a/source/autosim/autosim/decomposers/__init__.py b/source/autosim/autosim/decomposers/__init__.py new file mode 100644 index 0000000..3c48169 --- /dev/null +++ b/source/autosim/autosim/decomposers/__init__.py @@ -0,0 +1,7 @@ +from .llm_decomposer.llm_decomposer import LLMDecomposer +from .llm_decomposer.llm_decomposer_cfg import LLMDecomposerCfg + +__all__ = [ + "LLMDecomposer", + "LLMDecomposerCfg", +] diff --git a/source/autosim/autosim/decomposers/llm_decomposer/llm_decomposer.py b/source/autosim/autosim/decomposers/llm_decomposer/llm_decomposer.py new file mode 100644 index 0000000..4d25bba --- /dev/null +++ b/source/autosim/autosim/decomposers/llm_decomposer/llm_decomposer.py @@ -0,0 +1,237 @@ +from __future__ import annotations + +import contextlib +import importlib +import inspect +import json +import re +from pathlib import Path +from typing import TYPE_CHECKING + +from dacite import from_dict +from jinja2 import Environment, FileSystemLoader +from openai import OpenAI + +from autosim import SkillRegistry +from autosim.core.decomposer import Decomposer +from autosim.core.types import DecomposeResult, EnvExtraInfo + +if TYPE_CHECKING: + from .llm_decomposer_cfg import LLMDecomposerCfg + + +class LLMBackend: + """ + LLM Backend, using OpenAI-compatible interface including GPT, DeepSeek, Claude, etc. + """ + + def __init__(self, api_key: str, base_url: str, model: str): + """ + Initialize LLM backend + + Args: + api_key: API key + base_url: API endpoint URL + model: Model name, supported models: + - GPT: gpt-4o, gpt-4o-mini, gpt-3.5-turbo, etc. + - DeepSeek: deepseek-chat, deepseek-reasoner + - Claude: claude-3-5-sonnet-20241022, etc. + """ + + self.client = OpenAI(api_key=api_key, base_url=base_url) + self.model = model + + def generate(self, prompt: str, temperature: float, max_tokens: int) -> str: + """ + Generate response from LLM + + Args: + prompt: Input prompt + temperature: Sampling temperature (0.0-2.0) + max_tokens: Maximum tokens to generate + + Returns: + Generated text + """ + + response = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": prompt}], + temperature=temperature, + max_tokens=max_tokens, + ) + return response.choices[0].message.content + + +class LLMDecomposer(Decomposer): + def __init__(self, cfg: LLMDecomposerCfg) -> None: + super().__init__(cfg) + + self._llm_backend = LLMBackend(api_key=self.cfg.api_key, base_url=self.cfg.base_url, model=self.cfg.model) + + self._atomic_skills = [skill_cfg.name for skill_cfg in SkillRegistry.list_skills()] + + jinja_env = Environment( + loader=FileSystemLoader(str(Path(__file__).parent / "prompts")), + autoescape=False, + ) + self._prompt_template = jinja_env.get_template("task_decompose.jinja") + + def decompose(self, extra_info: EnvExtraInfo) -> DecomposeResult: + + task_code = self._load_task_code(extra_info.task_name) + prompt = self._build_prompt(task_code, extra_info) + self._logger.debug(f"prompt for llm composer: \n{prompt}") + + self._logger.info("generate response from llm...") + response = self._llm_backend.generate( + prompt=prompt, temperature=self.cfg.temperature, max_tokens=self.cfg.max_tokens + ) + + # parse json response + try: + results = self._extract_json(response) + self._validate_result(results) + + return from_dict(DecomposeResult, results) + except json.JSONDecodeError as e: + raise ValueError(f"Failed to parse JSON response: {e}\nResponse: {response}") + + def _load_task_code(self, task_name: str) -> str: + """ + Load task code from gymnasium registry. + + Args: + task_name: The name of the task. + + Returns: + The task code. + """ + + module_path, class_name = self._find_task_in_gym_registry(task_name) + if module_path is None or class_name is None: + raise ValueError(f"Task {task_name} not found in gymnasium registry") + module = importlib.import_module(module_path) + task_cls = getattr(module, class_name) + + cls_source_code = inspect.getsource(task_cls) + module_source_code = inspect.getsource(module) + + # extract import statements + import_lines = [] + for line in module_source_code.split("\n"): + line = line.strip() + if line.startswith("import ") or line.startswith("from "): + import_lines.append(line) + elif line and not line.startswith("#"): + # stop at the first non-import, non-comment line + break + full_code = "\n".join(import_lines) + f"\n\n{cls_source_code}" + + return full_code + + def _find_task_in_gym_registry(self, task_name: str) -> tuple: + """ + Find task in gymnasium registry and extract module path and class name + + Args: + task_name: Task cfg class name in gymnasium registry + + Returns: + Tuple of (module_path, class_name) or (None, None) if not found + """ + + import gymnasium as gym + + for task_spec in gym.registry.values(): + if task_spec.id == task_name and task_spec.kwargs: + env_cfg_entry_point = task_spec.kwargs.get("env_cfg_entry_point") + if env_cfg_entry_point: + module_path, class_name = env_cfg_entry_point.split(":") + return module_path, class_name + return None, None + + def _build_prompt(self, task_code: str, extra_info: EnvExtraInfo) -> str: + """ + Build the prompt for the LLM decomposer. + + Args: + task_code: The code of the task. + extra_info: The extra information of the environment. + + Returns: + The prompt for the LLM decomposer. + """ + + skills = {skill_cfg.name: skill_cfg.description for skill_cfg in SkillRegistry.list_skills()} + + return self._prompt_template.render( + task_code=task_code, + task_name=extra_info.task_name, + skills=skills, + objects=extra_info.objects, + additional_prompt_contents=extra_info.additional_prompt_contents, + ) + + def _extract_json(self, response: str) -> dict: + """ + Extract JSON from LLM response (handles markdown code blocks) + + Args: + response: LLM response string + + Returns: + Parsed JSON dictionary + """ + + # Try direct parsing + with contextlib.suppress(json.JSONDecodeError): + return json.loads(response) + + # Try markdown code blocks + matches = re.findall(r"```(?:json)?\s*(\{.*?\})\s*```", response, re.DOTALL) + if matches: + with contextlib.suppress(json.JSONDecodeError): + return json.loads(matches[0]) + + # Try finding JSON object + matches = re.findall(r"\{.*\}", response, re.DOTALL) + for match in sorted(matches, key=len, reverse=True): + with contextlib.suppress(json.JSONDecodeError): + return json.loads(match) + + raise json.JSONDecodeError("No valid JSON found in response", response, 0) + + def _validate_result(self, result: dict) -> None: + """ + Validate decomposition result structure + + Args: + result: Decomposition result dictionary + + Raises: + ValueError: If validation fails + """ + + # Check required fields + required_fields = [ + "task_name", + "task_description", + "parent_classes", + "objects", + "fixtures", + "subtasks", + "success_conditions", + "total_steps", + "skill_sequence", + ] + + for field in required_fields: + if field not in result: + raise ValueError(f"Missing required field: {field}") + + # Validate skill types + for subtask in result["subtasks"]: + for skill in subtask["skills"]: + if skill["skill_type"] not in self._atomic_skills: + raise ValueError(f"Invalid skill type: {skill['skill_type']}. Must be one of {self._atomic_skills}") diff --git a/source/autosim/autosim/decomposers/llm_decomposer/llm_decomposer_cfg.py b/source/autosim/autosim/decomposers/llm_decomposer/llm_decomposer_cfg.py new file mode 100644 index 0000000..7d16f13 --- /dev/null +++ b/source/autosim/autosim/decomposers/llm_decomposer/llm_decomposer_cfg.py @@ -0,0 +1,41 @@ +import os +from dataclasses import MISSING + +from isaaclab.utils import configclass + +from autosim.core.decomposer import DecomposerCfg + +from .llm_decomposer import LLMDecomposer + + +@configclass +class LLMDecomposerCfg(DecomposerCfg): + """Configuration for the LLM decomposer.""" + + class_type: type = LLMDecomposer + """The class type of the LLM decomposer.""" + + api_key: str = MISSING + """The API key for the LLM.""" + + base_url: str = "https://api.chatanywhere.org/v1" # TODO: change here. + """The base URL for the LLM API.""" + + model: str = "gpt-3.5-turbo" + """The model name for the LLM.""" + + temperature: float = 0.3 + """The temperature for the LLM.""" + + max_tokens: int = 4000 + """The maximum number of tokens to generate.""" + + def __post_init__(self) -> None: + super().__post_init__() + api_key = os.environ.get("AUTOSIM_LLM_API_KEY") + if api_key is None: + raise ValueError( + "Please set the AUTOSIM_LLM_API_KEY environment variable when using the LLMDecomposer, e.g. export" + " AUTOSIM_LLM_API_KEY=your_api_key" + ) + self.api_key = api_key diff --git a/source/autosim/autosim/decomposers/llm_decomposer/prompts/task_decompose.jinja b/source/autosim/autosim/decomposers/llm_decomposer/prompts/task_decompose.jinja new file mode 100644 index 0000000..b79d6ac --- /dev/null +++ b/source/autosim/autosim/decomposers/llm_decomposer/prompts/task_decompose.jinja @@ -0,0 +1,101 @@ +You are a robot task planning expert. Analyze the given isaaclab task code and decompose it into atomic skill sequences. + +{% if objects %} +## CRITICAL: Available Objects in Scene +**You MUST only use the following object names as target_object in your skill sequence:** +[{{ objects }}] + +**DO NOT invent or use any object names that are not in this list!** +For example, if you need to refer to a coffee machine, use the exact name from the list above (e.g., +"coffee_machine_main_group"), NOT made-up names like "coffee_machine_dispenser". +{% endif %} + +## Available Atomic Skills +You have {{ skills | length }} atomic skills available: +{% for name, desc in skills.items() %} +{{ loop.index }}. **{{ name }}** - {{ desc }} +{% endfor %} + +## Task Code +```python +{{ task_code }} +``` + +## Target Task +Analyze the task class `{{ task_name }}` and its implementation. + +## Output Requirements +Output the task decomposition result in JSON format with the following fields: + +1. **task_name**: Task name (string) +2. **task_description**: Task description in English (string) +3. **parent_classes**: List of parent class names (list of strings) +4. **objects**: List of objects in the scene, each containing: + - name: Object name (string) + - type: Object type (string) + - graspable: Whether graspable (boolean) + - initial_location: Initial location (string) + - target_location: Target location (string) + - role: Role - "manipulated" (needs operation) or "static" (no operation needed) +5. **fixtures**: List of fixtures in the scene, each containing: + - name: Fixture name (string) + - type: Fixture type (string) + - interactive: Whether interactive (boolean, optional) + - interaction_type: Type of interaction (string, optional) +6. **interactive_elements**: List of interactive elements like buttons, knobs (list, can be empty) +7. **subtasks**: List of subtasks, each containing: + - subtask_id: Subtask ID (integer, starting from 1) + - subtask_name: Subtask name (string) + - description: Description (string) + - skills: Skill sequence, each skill containing: + - step: Step number (integer, globally incrementing) + - skill_type: Skill type (must be one of the 6 atomic skills) + - target_object: Target object name (string) + - target_type: Target type - "object", "fixture", "interactive_element", or "position" + - description: Step description (string) +8. **success_conditions**: List of success conditions +9. **total_steps**: Total number of steps (integer) +10. **skill_sequence**: Flattened list of skill types in order (list of strings) + +## Decomposition Patterns +1. **Lift Object Pattern** (for lifting objects off surface): + ``` + moveto(object) → reach(object) → grasp(object) → lift(up) + ``` + +2. **Pick-and-Place Pattern** (for moving objects to specific location): + ``` + moveto(object) → reach(object) → grasp(object) → [moveto(target)](optional) → reach(target) → ungrasp(object) + ``` + +3. **Press Pattern** (for pressing buttons): + ``` + moveto(button) → reach(button) → press(button) + ``` + +4. **Rotate Pattern** (for rotating knobs/objects): + ``` + moveto(knob) → reach(knob) → grasp(knob) → rotate(knob) → ungrasp(knob) + ``` + +5. **Push/Pull Pattern** (for pushing/pulling objects): + ``` + moveto(object) → reach(object) → grasp(object) → push(forward) # or pull(back) + ``` + +6. **Combined Patterns**: For complex tasks, combine multiple patterns in sequence. + +## Planning Strategy +1. **Start with success conditions**: Check the success conditions first to understand what needs to be achieved +2. **Minimize steps**: Only include skills that are necessary to satisfy the success conditions +3. **Example - LiftObj task**: + - Success condition: Object is lifted above surface (Z-position check) + - Required skills: moveto(object) → reach(object) → grasp(object) → lift(up) + - NOT needed: Moving to target location, placing object down (unless required by success condition) + +## Output Format +Return a single valid JSON object following the structure above. Ensure all fields are present and properly formatted. + +{% if additional_prompt_contents %} +{{ additional_prompt_contents }} +{% endif %} diff --git a/source/autosim/autosim/skills/__init__.py b/source/autosim/autosim/skills/__init__.py new file mode 100644 index 0000000..b393919 --- /dev/null +++ b/source/autosim/autosim/skills/__init__.py @@ -0,0 +1,31 @@ +from isaaclab.utils import configclass + +from .base_skill import CuroboSkillExtraCfg +from .gripper import GraspSkill, GraspSkillCfg, UngraspSkill, UngraspSkillCfg +from .navigate import NavigateSkill, NavigateSkillCfg, NavigateSkillExtraCfg +from .reach import ReachSkill, ReachSkillCfg +from .relative_reach import ( + LiftSkill, + LiftSkillCfg, + PullSkill, + PullSkillCfg, + PushSkill, + PushSkillCfg, +) + + +@configclass +class AutoSimSkillsExtraCfg: + """Extra configuration for the AutoSim skills.""" + + grasp: GraspSkillCfg = GraspSkillCfg() + ungrasp: UngraspSkillCfg = UngraspSkillCfg() + lift: LiftSkillCfg = LiftSkillCfg() + moveto: NavigateSkillCfg = NavigateSkillCfg() + pull: PullSkillCfg = PullSkillCfg() + push: PushSkillCfg = PushSkillCfg() + reach: ReachSkillCfg = ReachSkillCfg() + + def get(cls, skill_name: str): + """Get the skill configuration by name.""" + return getattr(cls, skill_name) diff --git a/source/autosim/autosim/skills/base_skill.py b/source/autosim/autosim/skills/base_skill.py new file mode 100644 index 0000000..2fa7653 --- /dev/null +++ b/source/autosim/autosim/skills/base_skill.py @@ -0,0 +1,93 @@ +import torch +from isaaclab.envs import ManagerBasedEnv +from isaaclab.utils import configclass + +from autosim.capabilities.motion_planning import CuroboPlanner +from autosim.core.skill import Skill, SkillExtraCfg +from autosim.core.types import ( + EnvExtraInfo, + SkillGoal, + SkillInfo, + SkillOutput, + WorldState, +) + + +@configclass +class GripperSkillExtraCfg(SkillExtraCfg): + """Extra configuration for the gripper skill.""" + + gripper_value: float = 0.0 + """The value of the gripper.""" + duration: int = 20 + """The duration of the gripper.""" + + +class GripperSkillBase(Skill): + """Base class for gripper skills open/close skills.""" + + def __init__(self, extra_cfg: GripperSkillExtraCfg) -> None: + super().__init__(extra_cfg) + + self._gripper_value = extra_cfg.gripper_value + self._duration = extra_cfg.duration + self._step_count = 0 + self._target_object_name = None + + def extract_goal_from_info( + self, skill_info: SkillInfo, env: ManagerBasedEnv, env_extra_info: EnvExtraInfo + ) -> SkillGoal: + """Return the target object name.""" + + return SkillGoal(target_object=skill_info.target_object) + + def execute_plan(self, state: WorldState, goal: SkillGoal) -> bool: + """Execute the plan of the gripper skill.""" + + self._target_object_name = goal.target_object + self._step_count = 0 + return True + + def step(self, state: WorldState) -> SkillOutput: + """Step the gripper skill. + + Args: + state: The current state of the world. + + Returns: + The output of the skill execution. + action: The action to be applied to the environment. [gripper_value] + """ + + done = self._step_count >= self._duration + self._step_count += 1 + + return SkillOutput( + action=torch.tensor([self._gripper_value], device=state.device), + done=done, + success=done, + info={"step": self._step_count, "target_object": self._target_object_name}, + ) + + def reset(self) -> None: + """Reset the gripper skill.""" + + super().reset() + self._step_count = 0 + self._target_object_name = None + + +@configclass +class CuroboSkillExtraCfg(SkillExtraCfg): + """Extra configuration for the curobo skill.""" + + curobo_planner: CuroboPlanner | None = None + """The curobo planner for the skill.""" + + +class CuroboSkillBase(Skill): + """Base class for skills dependent on curobo.""" + + def __init__(self, extra_cfg: CuroboSkillExtraCfg) -> None: + super().__init__(extra_cfg) + self._planner = extra_cfg.curobo_planner diff --git a/source/autosim/autosim/skills/gripper.py b/source/autosim/autosim/skills/gripper.py new file mode 100644 index 0000000..8395497 --- /dev/null +++ b/source/autosim/autosim/skills/gripper.py @@ -0,0 +1,38 @@ +from isaaclab.utils import configclass + +from autosim import register_skill +from autosim.core.skill import SkillCfg + +from .base_skill import GripperSkillBase, GripperSkillExtraCfg + + +@configclass +class GraspSkillCfg(SkillCfg): + """Configuration for the grasp skill.""" + + extra_cfg: GripperSkillExtraCfg = GripperSkillExtraCfg(gripper_value=-1.0) + """default configuration: close gripper[-1.0] for 10 steps""" + + +@register_skill(name="grasp", cfg_type=GraspSkillCfg, description="Grasp object (close gripper)") +class GraspSkill(GripperSkillBase): + """Skill to grasp an object""" + + def __init__(self, extra_cfg: GripperSkillExtraCfg) -> None: + super().__init__(extra_cfg) + + +@configclass +class UngraspSkillCfg(SkillCfg): + """Configuration for the ungrasp skill.""" + + extra_cfg: GripperSkillExtraCfg = GripperSkillExtraCfg(gripper_value=1.0) + """default configuration: open gripper[1.0] for 10 steps""" + + +@register_skill(name="ungrasp", cfg_type=UngraspSkillCfg, description="Release object (open gripper)") +class UngraspSkill(GripperSkillBase): + """Skill to release an object""" + + def __init__(self, extra_cfg: GripperSkillExtraCfg) -> None: + super().__init__(extra_cfg) diff --git a/source/autosim/autosim/skills/navigate.py b/source/autosim/autosim/skills/navigate.py new file mode 100644 index 0000000..5aa2989 --- /dev/null +++ b/source/autosim/autosim/skills/navigate.py @@ -0,0 +1,292 @@ +import numpy as np +import torch +from isaaclab.envs import ManagerBasedEnv +from isaaclab.utils import configclass +from scipy.ndimage import distance_transform_edt + +from autosim import register_skill +from autosim.capabilities.navigation import AStarPlannerCfg, DWAPlannerCfg +from autosim.core.logger import AutoSimLogger +from autosim.core.skill import Skill, SkillCfg, SkillExtraCfg +from autosim.core.types import ( + EnvExtraInfo, + OccupancyMap, + SkillGoal, + SkillInfo, + SkillOutput, + WorldState, +) + + +@configclass +class NavigateSkillExtraCfg(SkillExtraCfg): + """Extra configuration for the navigate skill.""" + + global_planner: AStarPlannerCfg = AStarPlannerCfg() + """The configuration for the A* motion planner.""" + local_planner: DWAPlannerCfg = DWAPlannerCfg() + """The configuration for the DWA motion planner.""" + use_dwa: bool = False + """Whether to use DWA motion planner.""" + waypoint_tolerance: float = 0.5 + """The tolerance of the waypoint.""" + goal_tolerance: float = 0.25 + """The tolerance of the goal.""" + yaw_tolerance: float = 0.01 + """The tolerance of the yaw (radians).""" + sampling_radius: float = 0.8 + """The sampling radius for the target position, in meters.""" + num_samples: int = 4 + """The number of samples for the target position.""" + + occupancy_map: OccupancyMap | None = None + """The occupancy map of the environment.""" + + +@configclass +class NavigateSkillCfg(SkillCfg): + """Configuration for the navigate skill.""" + + extra_cfg: NavigateSkillExtraCfg = NavigateSkillExtraCfg() + """Extra configuration for the navigate skill.""" + + +@register_skill( + name="moveto", cfg_type=NavigateSkillCfg, description="Move robot base to near the target object or location." +) +class NavigateSkill(Skill): + """Skill to navigate to a target position using A* + DWA motion planner.""" + + def __init__(self, extra_cfg: NavigateSkillExtraCfg) -> None: + super().__init__(extra_cfg) + + self._logger = AutoSimLogger("NavigateSkill") + + self._occupancy_map = extra_cfg.occupancy_map + self._global_planner = extra_cfg.global_planner.class_type(extra_cfg.global_planner, self._occupancy_map) + self._local_planner = extra_cfg.local_planner.class_type(extra_cfg.local_planner, self._occupancy_map) + + # variables for the skill execution + self._target_object_name = None + self._target_yaw = None + self._target_pos = None + self._global_path = None + self._current_waypoint_idx = 0 + + def extract_goal_from_info( + self, skill_info: SkillInfo, env: ManagerBasedEnv, env_extra_info: EnvExtraInfo + ) -> SkillGoal: + """Return the target pose[x, y, yaw] in the world frame.""" + + target_object_name = skill_info.target_object + if target_object_name not in env.scene.keys(): + raise ValueError(f"Object {target_object_name} not found in scene") + target_object = env.scene[target_object_name] + + obj_pos_w = target_object.data.root_pos_w[0].cpu().numpy() + + is_free = (self._occupancy_map.occupancy_map == 0).cpu().numpy() + if np.any(is_free): + dist_field = distance_transform_edt(is_free) + else: + dist_field = np.zeros_like(is_free, dtype=np.float32) + + best_score = -1.0 + + angles = np.linspace(0, 2 * np.pi, self.cfg.extra_cfg.num_samples, endpoint=False) + + for angle in angles: + # calculate the sample point coordinates in the world frame + cx = obj_pos_w[0] + self.cfg.extra_cfg.sampling_radius * np.cos(angle) + cy = obj_pos_w[1] + self.cfg.extra_cfg.sampling_radius * np.sin(angle) + + # convert to the grid coordinates in the occupancy map + gx = int((cx - self._occupancy_map.origin[0]) / self._occupancy_map.resolution) + gy = int((cy - self._occupancy_map.origin[1]) / self._occupancy_map.resolution) + + # check the boundary + if ( + 0 <= gy < self._occupancy_map.occupancy_map.shape[0] + and 0 <= gx < self._occupancy_map.occupancy_map.shape[1] + ): + # check the collision (must be free space) + if self._occupancy_map.occupancy_map[gy, gx] == 0: + # get the safety score (the farther from the obstacle, the better) + score = dist_field[gy, gx] + if score > best_score: + best_score = score + target_pos_candidate = np.array([cx, cy]) + # calculate the yaw (facing the object) + dx = obj_pos_w[0] - cx + dy = obj_pos_w[1] - cy + target_yaw = np.arctan2(dy, dx) + + # if no target position is found, use the default fallback position + if target_pos_candidate is None: + self._logger.warning(f"Map sampling failed for {target_object_name}. Using default offset.") + + target_x = obj_pos_w[0] + target_y = obj_pos_w[1] - 1.0 + target_pos_candidate = np.array([target_x, target_y]) + + dx = obj_pos_w[0] - target_x + dy = obj_pos_w[1] - target_y + target_yaw = np.arctan2(dy, dx) + + target_pose = torch.tensor( + [target_pos_candidate[0], target_pos_candidate[1], target_yaw], device=env.device, dtype=torch.float32 + ) + + return SkillGoal(target_object=target_object_name, target_pose=target_pose) + + def execute_plan(self, state: WorldState, goal: SkillGoal) -> bool: + """Global path planning using A*""" + + # Extract start position from metadata (robot root pose in world frame) + robot_base_pose = state.robot_base_pose + + start_pos = robot_base_pose[:2] # [x, y] + goal_pos = goal.target_pose[:2] # [x, y] + + target_yaw = goal.target_pose[2].item() + + self._target_object_name = goal.target_object + self._target_yaw = target_yaw + self._target_pos = goal_pos + + self._logger.info( + f"Planning from ({start_pos[0]:.2f}, {start_pos[1]:.2f}) to ({goal_pos[0]:.2f}, {goal_pos[1]:.2f})," + f" target_yaw={target_yaw:.2f}." + ) + + self._global_path = self._global_planner.plan(start_pos, goal_pos) + + if self._global_path is None: + self._logger.error("Global planning failed.") + return False + + self._logger.info(f"Global path planned: {len(self._global_path)} waypoints.") + return True + + def step(self, state: WorldState) -> SkillOutput: + """Step the skill execution. + + Args: + state: The current state of the world. + + Returns: + The output of the skill execution. + action: The action to be applied to the environment. [vx, vy, vyaw] in the world frame. + """ + + current_pose = state.robot_base_pose # [x, y, yaw] + + # Check if reached goal + goal_pos = self._global_path[-1] + dist_to_goal = float(torch.linalg.norm(current_pose[:2] - goal_pos)) + + desired_yaw = self._target_yaw + is_final_approach = dist_to_goal < self.cfg.extra_cfg.goal_tolerance + + # If we are not in the final approach, try to face the object + if not is_final_approach: + obj_tensor = state.objects[self._target_object_name] # [x, y, z, qw, qx, qy, qz] + obj_pos = obj_tensor[:2] # [x, y] in world frame + + dx_obj = obj_pos[0] - current_pose[0] + dy_obj = obj_pos[1] - current_pose[1] + yaw_to_obj = float(torch.arctan2(dy_obj, dx_obj).item()) + desired_yaw = yaw_to_obj + + # Calculate Yaw Error + yaw_error = self._normalize_angle(desired_yaw - current_pose[2]) + + # Check success condition + if is_final_approach and abs(yaw_error) < self.cfg.extra_cfg.yaw_tolerance: + # successfully reached the goal + return SkillOutput( + action=torch.zeros(3), + done=True, + success=True, + info={"distance_to_goal": dist_to_goal, "yaw_error": yaw_error}, + ) + + # Get current target waypoint (go ahead if close enough) + while self._current_waypoint_idx < len(self._global_path) - 1: + waypoint = self._global_path[self._current_waypoint_idx] + dist = float(torch.linalg.norm(current_pose[:2] - waypoint)) + if dist < self.cfg.extra_cfg.waypoint_tolerance: + self._current_waypoint_idx += 1 + else: + break + + target_waypoint = self._global_path[self._current_waypoint_idx] + + # Simple proportional control or DWA-based local planning + # ------------------------------------------------------------------ + # 1) Waypoint-based P controller (original implementation, kept for backward compatibility) + dx = target_waypoint[0] - current_pose[0] + dy = target_waypoint[1] - current_pose[1] + dist_to_waypoint = float(torch.sqrt(dx * dx + dy * dy)) + + vx, vy, vyaw = 0.0, 0.0, 0.0 + + if not self.cfg.extra_cfg.use_dwa: + # ---------- Original local planner: P controller ---------- + if dist_to_waypoint > 0.01: + # Normalize and scale by max velocity + speed = min( + self.cfg.extra_cfg.local_planner.max_linear_velocity, dist_to_waypoint * 2.0 + ) # Proportional gain + vx = speed * dx / dist_to_waypoint + vy = speed * dy / dist_to_waypoint + + # Always rotate towards desired_yaw (either object or goal) + max_w = self.cfg.extra_cfg.local_planner.max_angular_velocity + vyaw = np.clip(yaw_error * 2.0, -max_w, max_w) + + # If we are far from goal but not facing the object yet, stop linear movement. + if not is_final_approach and abs(yaw_error) > self.cfg.extra_cfg.local_planner.yaw_facing_threshold: + vx = 0.0 + vy = 0.0 + else: + # ---------- Use DWA as local planner ---------- + # Note: DWA logic typically handles collision and velocity profiles better, + # but getting it to "Face Object" while moving requires modifying the DWA cost function + # or pre-rotating input. For now, sticking to the requested modification on standard behavior. + # If DWA is enabled, we assume it handles the movement. + # However, enforcing "Face Object" in DWA requires tricking the DWA or overriding yaw. + + # Use current waypoint position as DWA target + dwa_target = target_waypoint[:2].cpu().numpy() + v_lin, v_yaw = self._local_planner.compute_velocity( + current_pose=current_pose.cpu().numpy(), # [x, y, yaw] + target=dwa_target, # [x, y] + ) + # Project body-frame forward speed v_lin into world-frame (vx, vy) + yaw = current_pose[2].item() + vx = v_lin * np.cos(yaw) + vy = v_lin * np.sin(yaw) + vyaw = v_yaw + + # Create action [vx, vy, vyaw] in world frame (can't be applied to the environment directly yet) + action = torch.tensor([vx, vy, vyaw]) + + return SkillOutput( + action=action, + done=False, + success=None, + info={ + "waypoint_idx": self._current_waypoint_idx, + "total_waypoints": len(self._global_path), + "distance_to_goal": dist_to_goal, + "yaw_error": yaw_error, + "velocity_world": [vx, vy, vyaw], + }, + ) + + def _normalize_angle(self, angle: float) -> float: + """Normalize angle to [-pi, pi]""" + + angle_tensor = torch.as_tensor(angle) + return float(torch.remainder(angle_tensor + np.pi, 2 * np.pi) - np.pi) diff --git a/source/autosim/autosim/skills/reach.py b/source/autosim/autosim/skills/reach.py new file mode 100644 index 0000000..3c260bd --- /dev/null +++ b/source/autosim/autosim/skills/reach.py @@ -0,0 +1,177 @@ +import isaaclab.utils.math as PoseUtils +import torch +from isaaclab.envs import ManagerBasedEnv +from isaaclab.utils import configclass + +from autosim import register_skill +from autosim.core.logger import AutoSimLogger +from autosim.core.skill import SkillCfg +from autosim.core.types import ( + EnvExtraInfo, + SkillGoal, + SkillInfo, + SkillOutput, + WorldState, +) + +from .base_skill import CuroboSkillBase, CuroboSkillExtraCfg + + +@configclass +class ReachSkillCfg(SkillCfg): + """Configuration for the reach skill.""" + + extra_cfg: CuroboSkillExtraCfg = CuroboSkillExtraCfg() + """Extra configuration for the reach skill.""" + + +@register_skill( + name="reach", + cfg_type=ReachSkillCfg, + description="Extend robot arm to target position (for approaching objects or placement locations)", +) +class ReachSkill(CuroboSkillBase): + """Skill to reach to a target object or location""" + + def __init__(self, extra_cfg: CuroboSkillExtraCfg) -> None: + super().__init__(extra_cfg) + + self._logger = AutoSimLogger("ReachSkill") + + # variables for the skill execution + self._trajectory = None + self._step_idx = 0 + + def extract_goal_from_info( + self, skill_info: SkillInfo, env: ManagerBasedEnv, env_extra_info: EnvExtraInfo + ) -> SkillGoal: + """Return the target pose[x, y, z, qw, qx, qy, qz] in the robot root frame. + IMPORTANT: the robot root frame is not the same as the robot base frame. + """ + + target_object = skill_info.target_object + robot = env.scene[env_extra_info.robot_name] + + object_pose_in_env = env.scene[target_object].data.root_pose_w + object_pos_in_env, object_quat_in_env = object_pose_in_env[:, :3], object_pose_in_env[:, 3:] + + reach_target_pose = env_extra_info.get_next_reach_target_pose(target_object) + reach_target_pose_in_object = reach_target_pose.unsqueeze(0) + reach_target_pos_in_object, reach_target_quat_in_object = ( + reach_target_pose_in_object[:, :3], + reach_target_pose_in_object[:, 3:], + ) + + reach_target_pos_in_env, reach_target_quat_in_env = PoseUtils.combine_frame_transforms( + object_pos_in_env, object_quat_in_env, reach_target_pos_in_object, reach_target_quat_in_object + ) + self._logger.info(f"Reach target position in environment: {reach_target_pos_in_env}") + self._logger.info(f"Reach target quaternion in environment: {reach_target_quat_in_env}") + + robot_root_pose_in_env = robot.data.root_pose_w + robot_root_pos_in_env, robot_root_quat_in_env = robot_root_pose_in_env[:, :3], robot_root_pose_in_env[:, 3:] + + reach_target_pos_in_robot_root, reach_target_quat_in_robot_root = PoseUtils.subtract_frame_transforms( + robot_root_pos_in_env, robot_root_quat_in_env, reach_target_pos_in_env, reach_target_quat_in_env + ) + + target_pose = torch.cat((reach_target_pos_in_robot_root, reach_target_quat_in_robot_root), dim=-1).squeeze(0) + + if target_object in env_extra_info.object_extra_reach_target_poses.keys(): + extra_target_poses = {} + for ee_name in env_extra_info.object_extra_reach_target_poses[target_object].keys(): + ee_target_pose = env_extra_info.get_next_extra_reach_target_pose(target_object, ee_name) + ee_target_pose = torch.as_tensor(ee_target_pose, device=env.device) + extra_target_pos_in_obj, extra_target_quat_in_obj = ee_target_pose[:3].unsqueeze(0), ee_target_pose[ + 3: + ].unsqueeze(0) + extra_target_pos_in_env, extra_target_quat_in_env = PoseUtils.combine_frame_transforms( + object_pos_in_env, object_quat_in_env, extra_target_pos_in_obj, extra_target_quat_in_obj + ) + self._logger.info(f"Extra target position for {ee_name} in environment: {extra_target_pos_in_env}") + self._logger.info(f"Extra target quaternion for {ee_name} in environment: {extra_target_quat_in_env}") + extra_target_pos_in_robot_root, extra_target_quat_in_robot_root = PoseUtils.subtract_frame_transforms( + robot_root_pos_in_env, robot_root_quat_in_env, extra_target_pos_in_env, extra_target_quat_in_env + ) + extra_target_poses[ee_name] = torch.cat( + (extra_target_pos_in_robot_root, extra_target_quat_in_robot_root), dim=-1 + ).squeeze(0) + else: + extra_target_poses = None + + return SkillGoal(target_object=target_object, target_pose=target_pose, extra_target_poses=extra_target_poses) + + def execute_plan(self, state: WorldState, goal: SkillGoal) -> bool: + """Execute the plan of the reach skill.""" + + self._logger.info(f"Reach from pose in environment: {state.robot_ee_pose}") + + target_pose = goal.target_pose # target pose in the robot root frame + target_pos, target_quat = target_pose[:3], target_pose[3:] + + full_sim_joint_names = state.sim_joint_names + full_sim_q = state.robot_joint_pos + full_sim_qd = state.robot_joint_vel + planner_activate_joints = self._planner.target_joint_names + + activate_q, activate_qd = [], [] + for joint_name in planner_activate_joints: + if joint_name in full_sim_joint_names: + activate_q.append(full_sim_q[full_sim_joint_names.index(joint_name)]) + activate_qd.append(full_sim_qd[full_sim_joint_names.index(joint_name)]) + else: + raise ValueError( + f"Joint {joint_name} in planner activate joints is not in the full simulation joint names." + ) + activate_q = torch.stack(activate_q, dim=0) + activate_qd = torch.stack(activate_qd, dim=0) + self._trajectory = self._planner.plan_motion( + target_pos, + target_quat, + activate_q, + activate_qd, + link_goals=goal.extra_target_poses, + ) + + return self._trajectory is not None + + def step(self, state: WorldState) -> SkillOutput: + """Step the reach skill. + + Args: + state: The current state of the world. + + Returns: + The output of the skill execution. + action: The action to be applied to the environment. [joint_positions with isaaclab joint order] + """ + + traj_positions = self._trajectory.position + if self._step_idx >= len(self._trajectory.position): + traj_pos = traj_positions[-1] + done = True + else: + traj_pos = traj_positions[self._step_idx] + done = False + self._step_idx += 1 + + curobo_joint_names = self._trajectory.joint_names + sim_joint_names = state.sim_joint_names + joint_pos = state.robot_joint_pos.clone() + for curobo_idx, curobo_joint_name in enumerate(curobo_joint_names): + sim_idx = sim_joint_names.index(curobo_joint_name) + joint_pos[sim_idx] = traj_pos[curobo_idx] + + return SkillOutput( + action=joint_pos, + done=done, + success=True, + info={}, + ) + + def reset(self) -> None: + """Reset the reach skill.""" + + super().reset() + self._step_idx = 0 + self._trajectory = None diff --git a/source/autosim/autosim/skills/relative_reach.py b/source/autosim/autosim/skills/relative_reach.py new file mode 100644 index 0000000..f05493d --- /dev/null +++ b/source/autosim/autosim/skills/relative_reach.py @@ -0,0 +1,210 @@ +import isaaclab.utils.math as PoseUtils +import torch +from isaaclab.envs import ManagerBasedEnv +from isaaclab.utils import configclass + +from autosim import register_skill +from autosim.core.logger import AutoSimLogger +from autosim.core.skill import SkillCfg +from autosim.core.types import ( + EnvExtraInfo, + SkillGoal, + SkillInfo, + SkillOutput, + WorldState, +) + +from .base_skill import CuroboSkillExtraCfg +from .reach import ReachSkill + + +@configclass +class RelativeReachSkillExtraCfg(CuroboSkillExtraCfg): + """Extra configuration for the relative reach skill.""" + + move_offset: float = 0.1 + """The offset to move the end-effector.""" + move_axis: str = "+z" + """The axis to move the end-effector, which is in the eef frame. e.g. "+x", "+y", "+z", "-x", "-y", "-z".""" + + def __post_init__(self): + """Post-initialize the relative reach skill extra configuration.""" + + super().__post_init__() + assert self.move_axis in ["+x", "+y", "+z", "-x", "-y", "-z"], f"Invalid move axis: {self.move_axis}." + self._axis_map = { + "+x": torch.tensor([1.0, 0.0, 0.0]), + "+y": torch.tensor([0.0, 1.0, 0.0]), + "+z": torch.tensor([0.0, 0.0, 1.0]), + "-x": torch.tensor([-1.0, 0.0, 0.0]), + "-y": torch.tensor([0.0, -1.0, 0.0]), + "-z": torch.tensor([0.0, 0.0, -1.0]), + } + + +@configclass +class RelativeReachSkillCfg(SkillCfg): + """Configuration for the relative reach skill.""" + + extra_cfg: RelativeReachSkillExtraCfg = RelativeReachSkillExtraCfg() + """Extra configuration for the relative reach skill.""" + + +class RelativeReachSkill(ReachSkill): + """Skill to move the end-effector along a specific axis""" + + def __init__(self, extra_cfg: RelativeReachSkillCfg) -> None: + super().__init__(extra_cfg) + self._logger = AutoSimLogger("RelativeReachSkill") + + def extract_goal_from_info( + self, skill_info: SkillInfo, env: ManagerBasedEnv, env_extra_info: EnvExtraInfo + ) -> SkillGoal: + """Return the target object of the relative reach skill.""" + + return SkillGoal(target_object=skill_info.target_object) + + def execute_plan(self, state: WorldState, goal: SkillGoal) -> bool: + """Execute the plan of the relative reach skill.""" + + full_sim_joint_names = state.sim_joint_names + full_sim_q = state.robot_joint_pos + full_sim_qd = state.robot_joint_vel + planner_activate_joints = self._planner.target_joint_names + + activate_q, activate_qd = [], [] + for joint_name in planner_activate_joints: + if joint_name in full_sim_joint_names: + activate_q.append(full_sim_q[full_sim_joint_names.index(joint_name)]) + activate_qd.append(full_sim_qd[full_sim_joint_names.index(joint_name)]) + else: + raise ValueError( + f"Joint {joint_name} in planner activate joints is not in the full simulation joint names." + ) + activate_q = torch.stack(activate_q, dim=0) + activate_qd = torch.stack(activate_qd, dim=0) + + ee_pose = self._planner.get_ee_pose(activate_q) + target_pos, target_quat = ee_pose.position.clone(), ee_pose.quaternion.clone() + self._logger.info(f"ee pos in robot root frame: {target_pos}, ee quat in robot root frame: {target_quat}") + + # move the eef along the move axis by the move offset based on eef frame, and convert to robot root frame to get target pose + isaaclab_device = state.device + move_offset_vector = self.cfg.extra_cfg._axis_map[self.cfg.extra_cfg.move_axis] * self.cfg.extra_cfg.move_offset + offset_pos_in_ee = move_offset_vector.to(isaaclab_device).unsqueeze(0) + offset_quat_in_ee = torch.tensor([1.0, 0.0, 0.0, 0.0], device=isaaclab_device).unsqueeze(0) + ee_pos_in_robot_root, ee_quat_in_robot_root = target_pos.to(isaaclab_device), target_quat.to(isaaclab_device) + + offset_pos_in_robot_root, offset_quat_in_robot_root = PoseUtils.combine_frame_transforms( + ee_pos_in_robot_root, ee_quat_in_robot_root, offset_pos_in_ee, offset_quat_in_ee + ) + + planner_device = self._planner.tensor_args.device + target_pos = offset_pos_in_robot_root.to(planner_device).squeeze(0) + target_quat = offset_quat_in_robot_root.to(planner_device).squeeze(0) + self._logger.info( + f"target pos in robot root frame: {target_pos}, target quat in robot root frame: {target_quat}" + ) + + self._trajectory = self._planner.plan_motion( + target_pos, + target_quat, + activate_q, + activate_qd, + ) + + return self._trajectory is not None + + def step(self, state: WorldState) -> SkillOutput: + """Step the relative reach skill. + + Args: + state: The current state of the world. + + Returns: + The output of the skill execution. + action: The action to be applied to the environment. [joint_positions with isaaclab joint order] + """ + + return super().step(state) + + +"""LIFT SKILL""" + + +@configclass +class LiftSkillExtraCfg(RelativeReachSkillExtraCfg): + """Extra configuration for the lift skill.""" + + move_axis: str = "+z" + """lift the end-effector upward""" + + +@configclass +class LiftSkillCfg(RelativeReachSkillCfg): + """Configuration for the lift skill.""" + + extra_cfg: LiftSkillExtraCfg = LiftSkillExtraCfg() + """Extra configuration for the lift skill.""" + + +@register_skill(name="lift", cfg_type=LiftSkillCfg, description="Lift end-effector upward (target: 'up')") +class LiftSkill(RelativeReachSkill): + """Skill to lift end-effector upward""" + + def __init__(self, extra_cfg: LiftSkillExtraCfg) -> None: + super().__init__(extra_cfg) + + +"""PULL SKILL""" + + +@configclass +class PullSkillExtraCfg(RelativeReachSkillExtraCfg): + """Extra configuration for the pull skill.""" + + move_axis: str = "-x" + """pull the end-effector backward""" + + +@configclass +class PullSkillCfg(RelativeReachSkillCfg): + """Configuration for the pull skill.""" + + extra_cfg: PullSkillExtraCfg = PullSkillExtraCfg() + """Extra configuration for the pull skill.""" + + +@register_skill(name="pull", cfg_type=PullSkillCfg, description="Pull end-effector backward (target: 'backward')") +class PullSkill(RelativeReachSkill): + """Skill to pull end-effector backward""" + + def __init__(self, extra_cfg: PullSkillExtraCfg) -> None: + super().__init__(extra_cfg) + + +"""PUSH SKILL""" + + +@configclass +class PushSkillExtraCfg(RelativeReachSkillExtraCfg): + """Extra configuration for the push skill.""" + + move_axis: str = "+x" + """push the end-effector forward""" + + +@configclass +class PushSkillCfg(RelativeReachSkillCfg): + """Configuration for the push skill.""" + + extra_cfg: PushSkillExtraCfg = PushSkillExtraCfg() + """Extra configuration for the push skill.""" + + +@register_skill(name="push", cfg_type=PushSkillCfg, description="Push end-effector forward (target: 'forward')") +class PushSkill(RelativeReachSkill): + """Skill to push end-effector forward""" + + def __init__(self, extra_cfg: PushSkillExtraCfg) -> None: + super().__init__(extra_cfg) diff --git a/source/autosim/autosim/utils/debug_util.py b/source/autosim/autosim/utils/debug_util.py new file mode 100644 index 0000000..31c2191 --- /dev/null +++ b/source/autosim/autosim/utils/debug_util.py @@ -0,0 +1,17 @@ +import torch +from isaaclab.markers import VisualizationMarkers +from isaaclab.markers.config import FRAME_MARKER_CFG + +markers: list[VisualizationMarkers] = [] + + +def create_replay_marker(marker_id: str | int = 0): + frame_marker_cfg = FRAME_MARKER_CFG.copy() + frame_marker_cfg.markers["frame"].scale = (0.1, 0.1, 0.1) + marker_cfg = frame_marker_cfg.replace(prim_path=f"/World/Visuals/replay_marker_{marker_id}") + marker = VisualizationMarkers(marker_cfg) + markers.append(marker) + + +def marker_visualize(marker_idx: int, pos: torch.Tensor, quat: torch.Tensor): + markers[marker_idx].visualize(translations=pos, orientations=quat, marker_indices=[0]) diff --git a/source/autosim/pyproject.toml b/source/autosim/pyproject.toml new file mode 100644 index 0000000..9326747 --- /dev/null +++ b/source/autosim/pyproject.toml @@ -0,0 +1,33 @@ +[build-system] +requires = ["setuptools", "wheel", "toml"] +build-backend = "setuptools.build_meta" + +[project] +name = "autosim" +version = "0.0.0" +description = "autosim: An automated data generation pipeline based on Isaac Lab" +readme = "README.md" +license = {text = "Apache-2.0"} +authors = [ + {name = "Yinghao Shuai", email = "yinghao.shuai@lightwheel.ai"} +] +maintainers = [] +keywords = ["autosim", "data generation", "isaaclab"] +classifiers = [ + "Natural Language :: English", + "Programming Language :: Python :: 3.11", +] +requires-python = ">=3.10" +dependencies = [ + "openai", + "dacite" +] + +[project.urls] +Homepage = "https://github.com/LightwheelAI/autosim" +Repository = "https://github.com/LightwheelAI/autosim" + +[tool.setuptools] +packages = {find = {}} +include-package-data = true +zip-safe = false diff --git a/source/autosim_examples/autosim_examples/__init__.py b/source/autosim_examples/autosim_examples/__init__.py new file mode 100644 index 0000000..5972e62 --- /dev/null +++ b/source/autosim_examples/autosim_examples/__init__.py @@ -0,0 +1,2 @@ +from .autosim import * +from .isaaclab_task import * diff --git a/source/autosim_examples/autosim_examples/autosim/__init__.py b/source/autosim_examples/autosim_examples/autosim/__init__.py new file mode 100644 index 0000000..a52293d --- /dev/null +++ b/source/autosim_examples/autosim_examples/autosim/__init__.py @@ -0,0 +1,7 @@ +from autosim import register_pipeline + +register_pipeline( + id="AutoSimPipeline-FrankaCubeLift-v0", + entry_point=f"{__name__}.pipelines.franka_lift_cube:FrankaCubeLiftPipeline", + cfg_entry_point=f"{__name__}.pipelines.franka_lift_cube:FrankaCubeLiftPipelineCfg", +) diff --git a/source/autosim_examples/autosim_examples/autosim/action_adapters/franka_adapter.py b/source/autosim_examples/autosim_examples/autosim/action_adapters/franka_adapter.py new file mode 100644 index 0000000..15d36d2 --- /dev/null +++ b/source/autosim_examples/autosim_examples/autosim/action_adapters/franka_adapter.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import torch +from isaaclab.envs import ManagerBasedEnv + +from autosim import ActionAdapterBase +from autosim.core.types import SkillOutput + +if TYPE_CHECKING: + from .franka_adapter_cfg import FrankaAbsAdapterCfg + + +class FrankaAbsAdapter(ActionAdapterBase): + def __init__(self, cfg: FrankaAbsAdapterCfg): + super().__init__(cfg) + + self.register_apply_method("reach", self._apply_reach) + self.register_apply_method("grasp", self._apply_grasp) + self.register_apply_method("lift", self._apply_reach) + + def _apply_reach(self, skill_output: SkillOutput, env: ManagerBasedEnv) -> torch.Tensor: + target_joint_pos = skill_output.action # [joint_positions with isaaclab joint order] + + robot = env.scene["robot"] + default_joint_pos = robot.data.default_joint_pos[0, :] + + last_action = env.action_manager.action + action = last_action[0, :].clone() + + arm_action_cfg = env.action_manager.get_term("arm_action").cfg + + arm_action_ids, _ = robot.find_joints(arm_action_cfg.joint_names) + + arm_target_joint_pos = target_joint_pos[arm_action_ids] + arm_action = arm_target_joint_pos + if arm_action_cfg.use_default_offset: + arm_action = arm_action - default_joint_pos[arm_action_ids] + arm_action = arm_action / arm_action_cfg.scale + + action[0:7] = arm_action + + return action + + def _apply_grasp(self, skill_output: SkillOutput, env: ManagerBasedEnv) -> torch.Tensor: + last_action = env.action_manager.action + action = last_action[0, :].clone() + action[-1] = -1.0 + return action diff --git a/source/autosim_examples/autosim_examples/autosim/action_adapters/franka_adapter_cfg.py b/source/autosim_examples/autosim_examples/autosim/action_adapters/franka_adapter_cfg.py new file mode 100644 index 0000000..11b7688 --- /dev/null +++ b/source/autosim_examples/autosim_examples/autosim/action_adapters/franka_adapter_cfg.py @@ -0,0 +1,14 @@ +from isaaclab.utils import configclass + +from autosim import ActionAdapterCfg + +from .franka_adapter import FrankaAbsAdapter + + +@configclass +class FrankaAbsAdapterCfg(ActionAdapterCfg): + """Configuration for the Franka adapter.""" + + class_type: type = FrankaAbsAdapter + + skip_apply_skills: list[str] = ["moveto"] diff --git a/source/autosim_examples/autosim_examples/autosim/pipelines/franka_lift_cube.py b/source/autosim_examples/autosim_examples/autosim/pipelines/franka_lift_cube.py new file mode 100644 index 0000000..683d39b --- /dev/null +++ b/source/autosim_examples/autosim_examples/autosim/pipelines/franka_lift_cube.py @@ -0,0 +1,61 @@ +import torch +from isaaclab.envs import ManagerBasedEnv +from isaaclab.utils import configclass + +from autosim.core.pipeline import AutoSimPipeline, AutoSimPipelineCfg +from autosim.core.types import EnvExtraInfo +from autosim.decomposers import LLMDecomposerCfg + +from ..action_adapters.franka_adapter_cfg import FrankaAbsAdapterCfg + + +@configclass +class FrankaCubeLiftPipelineCfg(AutoSimPipelineCfg): + """Configuration for the Franka cube lift pipeline.""" + + decomposer: LLMDecomposerCfg = LLMDecomposerCfg() + + action_adapter: FrankaAbsAdapterCfg = FrankaAbsAdapterCfg() + + def __post_init__(self): + self.skills.lift.extra_cfg.lift_offset = 0.20 + + self.occupancy_map.floor_prim_suffix = "Table" + + self.motion_planner.robot_config_file = "franka.yml" + self.motion_planner.world_ignore_subffixes = [] + self.motion_planner.world_only_subffixes = [] + + +class FrankaCubeLiftPipeline(AutoSimPipeline): + def __init__(self, cfg: AutoSimPipelineCfg): + self._task_name = "AutoSimExamples-IsaacLab-FrankaCubeLift-v0" + + super().__init__(cfg) + + def load_env(self) -> ManagerBasedEnv: + import gymnasium as gym + from isaaclab_tasks.utils import parse_env_cfg + + env_cfg = parse_env_cfg(self._task_name, device="cuda:0", num_envs=1, use_fabric=True) + env_cfg.terminations.time_out = None + + env = gym.make(self._task_name, cfg=env_cfg).unwrapped + return env + + def get_env_extra_info(self) -> EnvExtraInfo: + available_objects = self._env.scene.keys() + return EnvExtraInfo( + task_name=self._task_name, + objects=available_objects, + additional_prompt_contents=None, + robot_name="robot", + robot_base_link_name="panda_link0", + ee_link_name="panda_hand", + object_reach_target_poses={ + "cube": [ + # torch.tensor([0.0, 0.0, 0.1, 0.0, 1.0, 0.0, 0.0]), + torch.tensor([0.0, 0.0, 0.15, 0.0, 1.0, 0.0, 0.0]), + ], + }, + ) diff --git a/source/autosim_examples/autosim_examples/isaaclab_task/__init__.py b/source/autosim_examples/autosim_examples/isaaclab_task/__init__.py new file mode 100644 index 0000000..53f74b1 --- /dev/null +++ b/source/autosim_examples/autosim_examples/isaaclab_task/__init__.py @@ -0,0 +1,10 @@ +import gymnasium as gym + +gym.register( + id="AutoSimExamples-IsaacLab-FrankaCubeLift-v0", + entry_point="isaaclab.envs:ManagerBasedRLEnv", + kwargs={ + "env_cfg_entry_point": f"{__name__}.franka_lift_cube_cfg:FrankaCubeLiftEnvCfg", + }, + disable_env_checker=True, +) diff --git a/source/autosim_examples/autosim_examples/isaaclab_task/franka_lift_cube_cfg.py b/source/autosim_examples/autosim_examples/isaaclab_task/franka_lift_cube_cfg.py new file mode 100644 index 0000000..7293504 --- /dev/null +++ b/source/autosim_examples/autosim_examples/isaaclab_task/franka_lift_cube_cfg.py @@ -0,0 +1,181 @@ +import isaaclab.sim as sim_utils +import torch +from isaaclab.assets import ArticulationCfg, AssetBaseCfg, RigidObjectCfg +from isaaclab.envs import ManagerBasedRLEnv, ManagerBasedRLEnvCfg, mdp +from isaaclab.managers import EventTermCfg as EventTerm +from isaaclab.managers import ObservationGroupCfg as ObsGroup +from isaaclab.managers import ObservationTermCfg as ObsTerm +from isaaclab.managers import SceneEntityCfg +from isaaclab.managers import TerminationTermCfg as DoneTerm +from isaaclab.scene import InteractiveSceneCfg +from isaaclab.sensors import FrameTransformerCfg +from isaaclab.sim.schemas.schemas_cfg import RigidBodyPropertiesCfg +from isaaclab.sim.spawners.from_files.from_files_cfg import GroundPlaneCfg, UsdFileCfg +from isaaclab.utils import configclass +from isaaclab.utils.assets import ISAAC_NUCLEUS_DIR +from isaaclab_tasks.manager_based.manipulation.stack.mdp import franka_stack_events + +from isaaclab_assets.robots.franka import FRANKA_PANDA_CFG # isort: skip + + +@configclass +class FrankaCubeLiftSceneCfg(InteractiveSceneCfg): + """Configuration for the Franka cube lift scene.""" + + robot: ArticulationCfg = FRANKA_PANDA_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot") + + ee_frame: FrameTransformerCfg = FrameTransformerCfg( + prim_path="{ENV_REGEX_NS}/Robot/panda_link0", + debug_vis=False, + target_frames=[ + FrameTransformerCfg.FrameCfg(prim_path="{ENV_REGEX_NS}/Robot/panda_hand", name="panda_hand"), + ], + ) + + # table + table = AssetBaseCfg( + prim_path="{ENV_REGEX_NS}/Table", + init_state=AssetBaseCfg.InitialStateCfg(pos=[0.5, 0, 0], rot=[0.707, 0, 0, 0.707]), + spawn=UsdFileCfg(usd_path=f"{ISAAC_NUCLEUS_DIR}/Props/Mounts/SeattleLabTable/table_instanceable.usd"), + ) + + # cube + cube: RigidObjectCfg = RigidObjectCfg( + prim_path="{ENV_REGEX_NS}/Cube", + init_state=RigidObjectCfg.InitialStateCfg(pos=[0.4, 0.0, 0.0203], rot=[1, 0, 0, 0]), + spawn=UsdFileCfg( + usd_path=f"{ISAAC_NUCLEUS_DIR}/Props/Blocks/blue_block.usd", + scale=(1.0, 1.0, 1.0), + rigid_props=RigidBodyPropertiesCfg( + solver_position_iteration_count=16, + solver_velocity_iteration_count=1, + max_angular_velocity=1000.0, + max_linear_velocity=1000.0, + max_depenetration_velocity=5.0, + disable_gravity=False, + ), + ), + ) + + # plane + plane = AssetBaseCfg( + prim_path="/World/GroundPlane", + init_state=AssetBaseCfg.InitialStateCfg(pos=[0, 0, -1.05]), + spawn=GroundPlaneCfg(), + ) + + # lights + light = AssetBaseCfg( + prim_path="/World/light", + spawn=sim_utils.DomeLightCfg(color=(0.75, 0.75, 0.75), intensity=3000.0), + ) + + def __post_init__(self): + self.ee_frame.visualizer_cfg.markers["frame"].scale = (0.10, 0.10, 0.10) + + +@configclass +class ActionsCfg: + """Action specifications for the MDP.""" + + arm_action: mdp.JointPositionActionCfg = mdp.JointPositionActionCfg( + asset_name="robot", joint_names=["panda_joint.*"], scale=0.5, use_default_offset=True + ) + gripper_action: mdp.BinaryJointPositionActionCfg = mdp.BinaryJointPositionActionCfg( + asset_name="robot", + joint_names=["panda_finger.*"], + open_command_expr={"panda_finger_.*": 0.04}, + close_command_expr={"panda_finger_.*": 0.0}, + ) + + +@configclass +class ObservationsCfg: + """Observation specifications for the MDP.""" + + @configclass + class PolicyCfg(ObsGroup): + """Observations for policy group with state values.""" + + actions = ObsTerm(func=mdp.last_action) + joint_pos = ObsTerm(func=mdp.joint_pos_rel) + joint_vel = ObsTerm(func=mdp.joint_vel_rel) + + def __post_init__(self): + self.enable_corruption = False + self.concatenate_terms = False + + # observation groups + policy: PolicyCfg = PolicyCfg() + + +def cube_lifted(env: ManagerBasedRLEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("cube")) -> torch.Tensor: + """Terminate when the cube is lifted.""" + + return env.scene[asset_cfg.name].data.root_pos_w[:, 2] > 0.10 + + +@configclass +class TerminationsCfg: + """Termination terms for the MDP.""" + + time_out = DoneTerm(func=mdp.time_out, time_out=True) + + success = DoneTerm(func=cube_lifted) + + +@configclass +class EventCfg: + """Configuration for events.""" + + randomize_franka_joint_state = EventTerm( + func=franka_stack_events.randomize_joint_by_gaussian_offset, + mode="reset", + params={ + "mean": 0.0, + "std": 0.02, + "asset_cfg": SceneEntityCfg("robot"), + }, + ) + + randomize_cube_positions = EventTerm( + func=franka_stack_events.randomize_object_pose, + mode="reset", + params={ + # "pose_range": {"x": (0.4, 0.6), "y": (-0.10, 0.10), "z": (0.0203, 0.0203), "yaw": (-1.0, 1.0)}, + "pose_range": {"x": (0.5, 0.5), "y": (0.0, 0.0), "z": (0.0203, 0.0203), "yaw": (0.0, 0.0)}, + "asset_cfgs": [SceneEntityCfg("cube")], + }, + ) + + +@configclass +class FrankaCubeLiftEnvCfg(ManagerBasedRLEnvCfg): + """Configuration for the Franka cube lift environment.""" + + # Scene settings + scene: FrankaCubeLiftSceneCfg = FrankaCubeLiftSceneCfg(num_envs=1, env_spacing=2.5, replicate_physics=False) + # Basic settings + observations: ObservationsCfg = ObservationsCfg() + actions: ActionsCfg = ActionsCfg() + # MDP settings + terminations: TerminationsCfg = TerminationsCfg() + # Events settings + events: EventCfg = EventCfg() + + rewards = None + + def __post_init__(self): + """Post initialization.""" + # general settings + self.decimation = 2 + self.episode_length_s = 30.0 + # simulation settings + self.sim.dt = 0.01 # 100Hz + self.sim.render_interval = 2 + + self.sim.physx.bounce_threshold_velocity = 0.2 + self.sim.physx.bounce_threshold_velocity = 0.01 + self.sim.physx.gpu_found_lost_aggregate_pairs_capacity = 1024 * 1024 * 4 + self.sim.physx.gpu_total_aggregate_pairs_capacity = 16 * 1024 + self.sim.physx.friction_correlation_distance = 0.00625 diff --git a/source/autosim_examples/pyproject.toml b/source/autosim_examples/pyproject.toml new file mode 100644 index 0000000..aa2708c --- /dev/null +++ b/source/autosim_examples/pyproject.toml @@ -0,0 +1,31 @@ +[build-system] +requires = ["setuptools", "wheel", "toml"] +build-backend = "setuptools.build_meta" + +[project] +name = "autosim_examples" +version = "0.0.0" +description = "autosim_examples: A minimal example package for data generation using autosim" +readme = "README.md" +license = {text = "Apache-2.0"} +authors = [ + {name = "Yinghao Shuai", email = "yinghao.shuai@lightwheel.ai"} +] +maintainers = [] +keywords = ["autosim", "data generation", "isaaclab"] +classifiers = [ + "Natural Language :: English", + "Programming Language :: Python :: 3.11", +] +requires-python = ">=3.10" +dependencies = [ +] + +[project.urls] +Homepage = "https://github.com/LightwheelAI/autosim" +Repository = "https://github.com/LightwheelAI/autosim" + +[tool.setuptools] +packages = {find = {}} +include-package-data = true +zip-safe = false