diff --git a/docs/source/overview/sim/sim_articulation.md b/docs/source/overview/sim/sim_articulation.md index 1cde346f..1fe6bac6 100644 --- a/docs/source/overview/sim/sim_articulation.md +++ b/docs/source/overview/sim/sim_articulation.md @@ -14,7 +14,12 @@ Articulations are configured using the {class}`~cfg.ArticulationCfg` dataclass. | `init_pos` | `tuple` | `(0,0,0)` | Initial root position `(x, y, z)`. | | `init_rot` | `tuple` | `(0,0,0)` | Initial root rotation `(r, p, y)` in degrees. | | `fix_base` | `bool` | `True` | Whether to fix the base of the articulation. | +| `init_qpos` | `List[float]` | `None` | Initial joint positions. | +| `body_scale` | `List[float]` | `[1.0, 1.0, 1.0]` | Scaling factors for the articulation links. | +| `disable_self_collisions` | `bool` | `True` | Whether to disable self-collisions. | | `drive_props` | `JointDrivePropertiesCfg` | `...` | Default drive properties. | +| `attrs` | `RigidBodyAttributesCfg` | `...` | Rigid body attributes configuration. | + ### Drive Configuration @@ -52,11 +57,12 @@ art_cfg = ArticulationCfg( # Note: The method is 'add_articulation' articulation: Articulation = sim.add_articulation(cfg=art_cfg) -# 4. Initialize Physics +# 4. Reset Simulation +# This performs a global reset of the simulation state sim.reset_objects_state() ``` ## Articulation Class -State Data (Observation) + State data is accessed via getter methods that return batched tensors. | Property | Shape | Description | @@ -91,6 +97,7 @@ articulation.set_qpos(target_qpos, target=True) # Important: Step simulation to apply control sim.update() ``` + ### Drive Configuration Dynamically adjust drive properties. @@ -101,6 +108,7 @@ articulation.set_drive( damping=torch.tensor([10.0], device=device) ) ``` + ### Kinematics Supports differentiable Forward Kinematics (FK) and Jacobian computation. ```python diff --git a/embodichain/lab/sim/objects/articulation.py b/embodichain/lab/sim/objects/articulation.py index c23e187b..760f32c6 100644 --- a/embodichain/lab/sim/objects/articulation.py +++ b/embodichain/lab/sim/objects/articulation.py @@ -128,9 +128,15 @@ def __init__( if self.device.type == "cuda" else self.dof ) + self._target_qpos = torch.zeros( + (self.num_instances, max_dof), dtype=torch.float32, device=self.device + ) self._qpos = torch.zeros( (self.num_instances, max_dof), dtype=torch.float32, device=self.device ) + self._target_qvel = torch.zeros( + (self.num_instances, max_dof), dtype=torch.float32, device=self.device + ) self._qvel = torch.zeros( (self.num_instances, max_dof), dtype=torch.float32, device=self.device ) @@ -248,6 +254,33 @@ def qpos(self) -> torch.Tensor: ) return self._qpos[:, : self.dof].clone() + @property + def target_qpos(self) -> torch.Tensor: + """Get the target positions (target_qpos) of the articulation. + + Returns: + torch.Tensor: The target positions of the articulation with shape of (num_instances, dof). + """ + if self.device.type == "cpu": + # Fetch target_qpos from CPU entities + return torch.as_tensor( + np.array( + [ + entity.get_current_qpos(is_target=True) + for entity in self.entities + ], + ), + dtype=torch.float32, + device=self.device, + ) + else: + self.ps.gpu_fetch_joint_data( + data=self._target_qpos, + gpu_indices=self.gpu_indices, + data_type=ArticulationGPUAPIReadType.JOINT_TARGET_POSITION, + ) + return self._target_qpos[:, : self.dof].clone() + @property def qvel(self) -> torch.Tensor: """Get the current velocities (qvel) of the articulation. @@ -270,6 +303,32 @@ def qvel(self) -> torch.Tensor: ) return self._qvel[:, : self.dof].clone() + @property + def target_qvel(self) -> torch.Tensor: + """Get the target velocities (target_qvel) of the articulation. + Returns: + torch.Tensor: The target velocities of the articulation with shape of (num_instances, dof). + """ + if self.device.type == "cpu": + # Fetch target_qvel from CPU entities + return torch.as_tensor( + np.array( + [ + entity.get_current_qvel(is_target=True) + for entity in self.entities + ], + ), + dtype=torch.float32, + device=self.device, + ) + else: + self.ps.gpu_fetch_joint_data( + data=self._target_qvel, + gpu_indices=self.gpu_indices, + data_type=ArticulationGPUAPIReadType.JOINT_TARGET_VELOCITY, + ) + return self._target_qvel[:, : self.dof].clone() + @property def qacc(self) -> torch.Tensor: """Get the current accelerations (qacc) of the articulation. @@ -512,7 +571,7 @@ def __init__( self.device = device # Store all indices for batch operations - self._all_indices = torch.arange(len(entities), dtype=torch.int32).tolist() + self._all_indices = torch.arange(len(entities), dtype=torch.int32) if device.type == "cuda": self._world.update(0.001) @@ -890,13 +949,16 @@ def get_link_pose( link_pose[:, :3, :3] = mat return link_pose - def get_qpos(self) -> torch.Tensor: - """Get the current positions (qpos) of the articulation. + def get_qpos(self, target: bool = False) -> torch.Tensor: + """Get the current positions (qpos) or target positions (target_qpos) of the articulation. + + Args: + target (bool): If True, gets target positions for simulation. If False, gets current positions. Returns: torch.Tensor: Joint positions with shape (N, dof), where N is the number of environments. """ - return self.body_data.qpos + return self.body_data.qpos if not target else self.body_data.target_qpos def set_qpos( self, @@ -916,6 +978,11 @@ def set_qpos( Raises: ValueError: If the length of `env_ids` does not match the length of `qpos`. """ + # TODO: Refactor this part to use a more generic and extensible approach, + # such as a class decorator that can automatically convert ndarray to torch.Tensor + # and handle dimension padding for specified member functions. + # This will make the codebase cleaner and reduce repetitive type checks/conversions. + # (e.g., support specifying which methods should be decorated for auto-conversion.) if not isinstance(qpos, torch.Tensor): qpos = torch.as_tensor(qpos, dtype=torch.float32, device=self.device) @@ -932,26 +999,9 @@ def set_qpos( local_env_ids = self._all_indices if env_ids is None else env_ids - # TODO: Refactor this part to use a more generic and extensible approach, - # such as a class decorator that can automatically convert ndarray to torch.Tensor - # and handle dimension padding for specified member functions. - # This will make the codebase cleaner and reduce repetitive type checks/conversions. - # (e.g., support specifying which methods should be decorated for auto-conversion.) - qpos = torch.as_tensor(qpos, dtype=torch.float32, device=self.device) - - if self.device.type == "cuda": - limits = self.body_data.qpos_limits[0].T - # clamp qpos to limits - lower_limits = limits[0][local_joint_ids] - upper_limits = limits[1][local_joint_ids] - qpos = qpos.clamp(lower_limits, upper_limits) - # Make sure qpos is 2D tensor if qpos.dim() == 1: qpos = qpos.unsqueeze(0) - # If only one qpos is provided, repeat it for all envs - if len(qpos) == 1 and len(local_env_ids) > 1: - qpos = qpos.repeat(len(local_env_ids), 1) if len(local_env_ids) != len(qpos): logger.log_error( @@ -959,12 +1009,6 @@ def set_qpos( f"env_ids: {local_env_ids}, qpos.shape: {qpos.shape}" ) - data_type = ( - ArticulationGPUAPIWriteType.JOINT_TARGET_POSITION - if target - else ArticulationGPUAPIWriteType.JOINT_POSITION - ) - if self.device.type == "cpu": for i, env_idx in enumerate(local_env_ids): setter = ( @@ -974,26 +1018,42 @@ def set_qpos( ) setter(qpos[i].numpy(), local_joint_ids.numpy()) else: - # TODO: trigger qpos getter to sync data, otherwise crash - if joint_ids is not None: - self.body_data.qpos + limits = self.body_data.qpos_limits[0].T + # clamp qpos to limits + lower_limits = limits[0][local_joint_ids] + upper_limits = limits[1][local_joint_ids] + qpos = qpos.clamp(lower_limits, upper_limits) + + data_type = ( + ArticulationGPUAPIWriteType.JOINT_TARGET_POSITION + if target + else ArticulationGPUAPIWriteType.JOINT_POSITION + ) + + # Always fetch the latest data to avoid stale values + if target: + qpos_set = self.body_data._target_qpos + else: + qpos_set = self.body_data._qpos indices = self.body_data.gpu_indices[local_env_ids] - qpos_set = self.body_data._qpos[local_env_ids] - qpos_set[:, local_joint_ids] = qpos + qpos_set[local_env_ids[:, None], local_joint_ids] = qpos self._ps.gpu_apply_joint_data( data=qpos_set, gpu_indices=indices, data_type=data_type, ) - def get_qvel(self) -> torch.Tensor: - """Get the current velocities (qvel) of the articulation. + def get_qvel(self, target: bool = False) -> torch.Tensor: + """Get the current velocities (qvel) or target velocities (target_qvel) of the articulation. + + Args: + target (bool): If True, gets target velocities for simulation. If False, gets current velocities. The default is False. Returns: torch.Tensor: The current velocities of the articulation. """ - return self.body_data.qvel + return self.body_data.qvel if not target else self.body_data.target_qvel def set_qvel( self, @@ -1020,14 +1080,18 @@ def set_qvel( f"Length of env_ids {len(local_env_ids)} does not match qvel length {len(qvel)}." ) - data_type = ( - ArticulationGPUAPIWriteType.JOINT_TARGET_VELOCITY - if target - else ArticulationGPUAPIWriteType.JOINT_VELOCITY - ) + if joint_ids is None: + local_joint_ids = torch.arange( + self.dof, device=self.device, dtype=torch.int32 + ) + elif not isinstance(joint_ids, torch.Tensor): + local_joint_ids = torch.as_tensor( + joint_ids, dtype=torch.int32, device=self.device + ) + else: + local_joint_ids = joint_ids if self.device.type == "cpu": - local_joint_ids = np.arange(self.dof) if joint_ids is None else joint_ids for i, env_idx in enumerate(local_env_ids): setter = ( self._entities[env_idx].set_current_qvel @@ -1036,14 +1100,20 @@ def set_qvel( ) setter(qvel[i].numpy(), local_joint_ids) else: - indices = self.body_data.gpu_indices[local_env_ids] - if joint_ids is None: - qvel_set = self.body_data._qvel[local_env_ids] - qvel_set[:, : self.dof] = qvel + data_type = ( + ArticulationGPUAPIWriteType.JOINT_TARGET_VELOCITY + if target + else ArticulationGPUAPIWriteType.JOINT_VELOCITY + ) + + # Always fetch the latest data to avoid stale values + if target: + qvel_set = self.body_data._target_qvel else: - self.body_data.qvel - qvel_set = self.body_data._qvel[local_env_ids] - qvel_set[:, joint_ids] = qvel + qvel_set = self.body_data._qvel + + indices = self.body_data.gpu_indices[local_env_ids] + qvel_set[local_env_ids[:, None], local_joint_ids] = qvel self._ps.gpu_apply_joint_data( data=qvel_set, gpu_indices=indices, @@ -1161,6 +1231,7 @@ def clear_dynamics(self, env_ids: Sequence[int] | None = None) -> None: zero_joint_data = np.zeros((len(local_env_ids), self.dof), dtype=np.float32) for i, env_idx in enumerate(local_env_ids): self._entities[env_idx].set_qvel(zero_joint_data[i]) + self._entities[env_idx].set_current_qvel(zero_joint_data[i]) self._entities[env_idx].set_current_qf(zero_joint_data[i]) else: zeros = torch.zeros( @@ -1172,6 +1243,11 @@ def clear_dynamics(self, env_ids: Sequence[int] | None = None) -> None: gpu_indices=indices, data_type=ArticulationGPUAPIWriteType.JOINT_VELOCITY, ) + self._ps.gpu_apply_joint_data( + data=zeros, + gpu_indices=indices, + data_type=ArticulationGPUAPIWriteType.JOINT_TARGET_VELOCITY, + ) self._ps.gpu_apply_joint_data( data=zeros, gpu_indices=indices, @@ -1189,9 +1265,15 @@ def reallocate_body_data(self) -> None: self._data._qpos = torch.zeros( (self.num_instances, max_dof), dtype=torch.float32, device=self.device ) + self._data._target_qpos = torch.zeros( + (self.num_instances, max_dof), dtype=torch.float32, device=self.device + ) self._data._qvel = torch.zeros( (self.num_instances, max_dof), dtype=torch.float32, device=self.device ) + self._data._target_qvel = torch.zeros( + (self.num_instances, max_dof), dtype=torch.float32, device=self.device + ) self._data._qacc = torch.zeros( (self.num_instances, max_dof), dtype=torch.float32, device=self.device ) @@ -1219,6 +1301,7 @@ def reallocate_body_data(self) -> None: dtype=torch.float32, device=self.device, ) + self.reset() def reset(self, env_ids: Sequence[int] | None = None) -> None: local_env_ids = self._all_indices if env_ids is None else env_ids diff --git a/embodichain/lab/sim/objects/robot.py b/embodichain/lab/sim/objects/robot.py index 33ce4351..09f8c3f1 100644 --- a/embodichain/lab/sim/objects/robot.py +++ b/embodichain/lab/sim/objects/robot.py @@ -287,16 +287,18 @@ def set_qpos( target=target, ) - def get_qpos(self, name: str | None = None) -> torch.Tensor: + def get_qpos(self, name: str | None = None, target: bool = False) -> torch.Tensor: """Get the joint positions (qpos) of the robot. Args: name (str | None): The name of the control part to get the qpos for. If None, the default part is used. + target (bool): If True, gets target positions for simulation. If False, gets current positions. The default is False. + Returns: torch.Tensor: Joint positions with shape (N, dof), where N is the number of environments. """ - qpos = super().get_qpos() + qpos = super().get_qpos(target=target) if name is None: return qpos else: @@ -350,16 +352,18 @@ def set_qvel( target=target, ) - def get_qvel(self, name: str | None = None) -> torch.Tensor: + def get_qvel(self, name: str | None = None, target: bool = False) -> torch.Tensor: """Get the joint velocities (qvel) of the robot. Args: name (str | None): The name of the control part to get the qvel for. If None, the default part is used. + target (bool): If True, gets target velocities for simulation. If False, gets current velocities. The default is False. + Returns: torch.Tensor: Joint velocities with shape (N, dof), where N is the number of environments. """ - qvel = super().get_qvel() + qvel = super().get_qvel(target=target) if name is None: return qvel else: