Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docs/source/overview/sim/sim_articulation.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ Articulations are configured using the {class}`~cfg.ArticulationCfg` dataclass.
| `init_pos` | `tuple` | `(0,0,0)` | Initial root position `(x, y, z)`. |
| `init_rot` | `tuple` | `(0,0,0)` | Initial root rotation `(r, p, y)` in degrees. |
| `fix_base` | `bool` | `True` | Whether to fix the base of the articulation. |
| `init_qpos` | `List[float]` | `None` | Initial joint positions. |
| `body_scale` | `List[float]` | `[1.0, 1.0, 1.0]` | Scaling factors for the articulation links. |
| `disable_self_collisions` | `bool` | `True` | Whether to disable self-collisions. |
| `drive_props` | `JointDrivePropertiesCfg` | `...` | Default drive properties. |
| `attrs` | `RigidBodyAttributesCfg` | `...` | Rigid body attributes configuration. |


### Drive Configuration

Expand Down Expand Up @@ -52,11 +57,12 @@ art_cfg = ArticulationCfg(
# Note: The method is 'add_articulation'
articulation: Articulation = sim.add_articulation(cfg=art_cfg)

# 4. Initialize Physics
# 4. Reset Simulation
# This performs a global reset of the simulation state
sim.reset_objects_state()
```
## Articulation Class
State Data (Observation)

State data is accessed via getter methods that return batched tensors.

| Property | Shape | Description |
Expand Down Expand Up @@ -91,6 +97,7 @@ articulation.set_qpos(target_qpos, target=True)
# Important: Step simulation to apply control
sim.update()
```

### Drive Configuration
Dynamically adjust drive properties.

Expand All @@ -101,6 +108,7 @@ articulation.set_drive(
damping=torch.tensor([10.0], device=device)
)
```

### Kinematics
Supports differentiable Forward Kinematics (FK) and Jacobian computation.
```python
Expand Down
179 changes: 131 additions & 48 deletions embodichain/lab/sim/objects/articulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,15 @@ def __init__(
if self.device.type == "cuda"
else self.dof
)
self._target_qpos = torch.zeros(
(self.num_instances, max_dof), dtype=torch.float32, device=self.device
)
self._qpos = torch.zeros(
(self.num_instances, max_dof), dtype=torch.float32, device=self.device
)
self._target_qvel = torch.zeros(
(self.num_instances, max_dof), dtype=torch.float32, device=self.device
)
self._qvel = torch.zeros(
(self.num_instances, max_dof), dtype=torch.float32, device=self.device
)
Expand Down Expand Up @@ -248,6 +254,33 @@ def qpos(self) -> torch.Tensor:
)
return self._qpos[:, : self.dof].clone()

@property
def target_qpos(self) -> torch.Tensor:
"""Get the target positions (target_qpos) of the articulation.

Returns:
torch.Tensor: The target positions of the articulation with shape of (num_instances, dof).
"""
if self.device.type == "cpu":
# Fetch target_qpos from CPU entities
return torch.as_tensor(
np.array(
[
entity.get_current_qpos(is_target=True)
for entity in self.entities
],
),
dtype=torch.float32,
device=self.device,
)
else:
self.ps.gpu_fetch_joint_data(
data=self._target_qpos,
gpu_indices=self.gpu_indices,
data_type=ArticulationGPUAPIReadType.JOINT_TARGET_POSITION,
)
return self._target_qpos[:, : self.dof].clone()

@property
def qvel(self) -> torch.Tensor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also add target_qvel

"""Get the current velocities (qvel) of the articulation.
Expand All @@ -270,6 +303,32 @@ def qvel(self) -> torch.Tensor:
)
return self._qvel[:, : self.dof].clone()

@property
def target_qvel(self) -> torch.Tensor:
"""Get the target velocities (target_qvel) of the articulation.
Returns:
torch.Tensor: The target velocities of the articulation with shape of (num_instances, dof).
"""
if self.device.type == "cpu":
# Fetch target_qvel from CPU entities
return torch.as_tensor(
np.array(
[
entity.get_current_qvel(is_target=True)
for entity in self.entities
],
),
dtype=torch.float32,
device=self.device,
)
else:
self.ps.gpu_fetch_joint_data(
data=self._target_qvel,
gpu_indices=self.gpu_indices,
data_type=ArticulationGPUAPIReadType.JOINT_TARGET_VELOCITY,
)
return self._target_qvel[:, : self.dof].clone()

@property
def qacc(self) -> torch.Tensor:
"""Get the current accelerations (qacc) of the articulation.
Expand Down Expand Up @@ -512,7 +571,7 @@ def __init__(
self.device = device

# Store all indices for batch operations
self._all_indices = torch.arange(len(entities), dtype=torch.int32).tolist()
self._all_indices = torch.arange(len(entities), dtype=torch.int32)

if device.type == "cuda":
self._world.update(0.001)
Expand Down Expand Up @@ -890,13 +949,16 @@ def get_link_pose(
link_pose[:, :3, :3] = mat
return link_pose

def get_qpos(self) -> torch.Tensor:
"""Get the current positions (qpos) of the articulation.
def get_qpos(self, target: bool = False) -> torch.Tensor:
"""Get the current positions (qpos) or target positions (target_qpos) of the articulation.

Args:
target (bool): If True, gets target positions for simulation. If False, gets current positions.

Returns:
torch.Tensor: Joint positions with shape (N, dof), where N is the number of environments.
"""
return self.body_data.qpos
return self.body_data.qpos if not target else self.body_data.target_qpos

def set_qpos(
self,
Expand All @@ -916,6 +978,11 @@ def set_qpos(
Raises:
ValueError: If the length of `env_ids` does not match the length of `qpos`.
"""
# TODO: Refactor this part to use a more generic and extensible approach,
# such as a class decorator that can automatically convert ndarray to torch.Tensor
# and handle dimension padding for specified member functions.
# This will make the codebase cleaner and reduce repetitive type checks/conversions.
# (e.g., support specifying which methods should be decorated for auto-conversion.)
if not isinstance(qpos, torch.Tensor):
qpos = torch.as_tensor(qpos, dtype=torch.float32, device=self.device)

Expand All @@ -932,39 +999,16 @@ def set_qpos(

local_env_ids = self._all_indices if env_ids is None else env_ids

# TODO: Refactor this part to use a more generic and extensible approach,
# such as a class decorator that can automatically convert ndarray to torch.Tensor
# and handle dimension padding for specified member functions.
# This will make the codebase cleaner and reduce repetitive type checks/conversions.
# (e.g., support specifying which methods should be decorated for auto-conversion.)
qpos = torch.as_tensor(qpos, dtype=torch.float32, device=self.device)

if self.device.type == "cuda":
limits = self.body_data.qpos_limits[0].T
# clamp qpos to limits
lower_limits = limits[0][local_joint_ids]
upper_limits = limits[1][local_joint_ids]
qpos = qpos.clamp(lower_limits, upper_limits)

# Make sure qpos is 2D tensor
if qpos.dim() == 1:
qpos = qpos.unsqueeze(0)
# If only one qpos is provided, repeat it for all envs
if len(qpos) == 1 and len(local_env_ids) > 1:
qpos = qpos.repeat(len(local_env_ids), 1)

if len(local_env_ids) != len(qpos):
logger.log_error(
f"Length of env_ids {len(local_env_ids)} does not match qpos length {len(qpos)}. "
f"env_ids: {local_env_ids}, qpos.shape: {qpos.shape}"
)

data_type = (
ArticulationGPUAPIWriteType.JOINT_TARGET_POSITION
if target
else ArticulationGPUAPIWriteType.JOINT_POSITION
)

if self.device.type == "cpu":
for i, env_idx in enumerate(local_env_ids):
setter = (
Expand All @@ -974,26 +1018,42 @@ def set_qpos(
)
setter(qpos[i].numpy(), local_joint_ids.numpy())
else:
# TODO: trigger qpos getter to sync data, otherwise crash
if joint_ids is not None:
self.body_data.qpos
limits = self.body_data.qpos_limits[0].T
# clamp qpos to limits
lower_limits = limits[0][local_joint_ids]
upper_limits = limits[1][local_joint_ids]
qpos = qpos.clamp(lower_limits, upper_limits)

data_type = (
ArticulationGPUAPIWriteType.JOINT_TARGET_POSITION
if target
else ArticulationGPUAPIWriteType.JOINT_POSITION
)

# Always fetch the latest data to avoid stale values
if target:
qpos_set = self.body_data._target_qpos
else:
qpos_set = self.body_data._qpos

indices = self.body_data.gpu_indices[local_env_ids]
qpos_set = self.body_data._qpos[local_env_ids]
qpos_set[:, local_joint_ids] = qpos
qpos_set[local_env_ids[:, None], local_joint_ids] = qpos
self._ps.gpu_apply_joint_data(
data=qpos_set,
gpu_indices=indices,
data_type=data_type,
)

def get_qvel(self) -> torch.Tensor:
"""Get the current velocities (qvel) of the articulation.
def get_qvel(self, target: bool = False) -> torch.Tensor:
"""Get the current velocities (qvel) or target velocities (target_qvel) of the articulation.

Args:
target (bool): If True, gets target velocities for simulation. If False, gets current velocities. The default is False.

Returns:
torch.Tensor: The current velocities of the articulation.
"""
return self.body_data.qvel
return self.body_data.qvel if not target else self.body_data.target_qvel

def set_qvel(
self,
Expand All @@ -1020,14 +1080,18 @@ def set_qvel(
f"Length of env_ids {len(local_env_ids)} does not match qvel length {len(qvel)}."
)

data_type = (
ArticulationGPUAPIWriteType.JOINT_TARGET_VELOCITY
if target
else ArticulationGPUAPIWriteType.JOINT_VELOCITY
)
if joint_ids is None:
local_joint_ids = torch.arange(
self.dof, device=self.device, dtype=torch.int32
)
elif not isinstance(joint_ids, torch.Tensor):
local_joint_ids = torch.as_tensor(
joint_ids, dtype=torch.int32, device=self.device
)
else:
local_joint_ids = joint_ids

if self.device.type == "cpu":
local_joint_ids = np.arange(self.dof) if joint_ids is None else joint_ids
for i, env_idx in enumerate(local_env_ids):
setter = (
self._entities[env_idx].set_current_qvel
Expand All @@ -1036,14 +1100,20 @@ def set_qvel(
)
setter(qvel[i].numpy(), local_joint_ids)
else:
indices = self.body_data.gpu_indices[local_env_ids]
if joint_ids is None:
qvel_set = self.body_data._qvel[local_env_ids]
qvel_set[:, : self.dof] = qvel
data_type = (
ArticulationGPUAPIWriteType.JOINT_TARGET_VELOCITY
if target
else ArticulationGPUAPIWriteType.JOINT_VELOCITY
)

# Always fetch the latest data to avoid stale values
if target:
qvel_set = self.body_data._target_qvel
else:
self.body_data.qvel
qvel_set = self.body_data._qvel[local_env_ids]
qvel_set[:, joint_ids] = qvel
qvel_set = self.body_data._qvel

indices = self.body_data.gpu_indices[local_env_ids]
qvel_set[local_env_ids[:, None], local_joint_ids] = qvel
self._ps.gpu_apply_joint_data(
data=qvel_set,
gpu_indices=indices,
Expand Down Expand Up @@ -1161,6 +1231,7 @@ def clear_dynamics(self, env_ids: Sequence[int] | None = None) -> None:
zero_joint_data = np.zeros((len(local_env_ids), self.dof), dtype=np.float32)
for i, env_idx in enumerate(local_env_ids):
self._entities[env_idx].set_qvel(zero_joint_data[i])
self._entities[env_idx].set_current_qvel(zero_joint_data[i])
self._entities[env_idx].set_current_qf(zero_joint_data[i])
else:
zeros = torch.zeros(
Expand All @@ -1172,6 +1243,11 @@ def clear_dynamics(self, env_ids: Sequence[int] | None = None) -> None:
gpu_indices=indices,
data_type=ArticulationGPUAPIWriteType.JOINT_VELOCITY,
)
self._ps.gpu_apply_joint_data(
data=zeros,
gpu_indices=indices,
data_type=ArticulationGPUAPIWriteType.JOINT_TARGET_VELOCITY,
)
self._ps.gpu_apply_joint_data(
data=zeros,
gpu_indices=indices,
Expand All @@ -1189,9 +1265,15 @@ def reallocate_body_data(self) -> None:
self._data._qpos = torch.zeros(
(self.num_instances, max_dof), dtype=torch.float32, device=self.device
)
self._data._target_qpos = torch.zeros(
(self.num_instances, max_dof), dtype=torch.float32, device=self.device
)
self._data._qvel = torch.zeros(
(self.num_instances, max_dof), dtype=torch.float32, device=self.device
)
self._data._target_qvel = torch.zeros(
(self.num_instances, max_dof), dtype=torch.float32, device=self.device
)
self._data._qacc = torch.zeros(
(self.num_instances, max_dof), dtype=torch.float32, device=self.device
)
Expand Down Expand Up @@ -1219,6 +1301,7 @@ def reallocate_body_data(self) -> None:
dtype=torch.float32,
device=self.device,
)
self.reset()

def reset(self, env_ids: Sequence[int] | None = None) -> None:
local_env_ids = self._all_indices if env_ids is None else env_ids
Expand Down
12 changes: 8 additions & 4 deletions embodichain/lab/sim/objects/robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,16 +287,18 @@ def set_qpos(
target=target,
)

def get_qpos(self, name: str | None = None) -> torch.Tensor:
def get_qpos(self, name: str | None = None, target: bool = False) -> torch.Tensor:
"""Get the joint positions (qpos) of the robot.

Args:
name (str | None): The name of the control part to get the qpos for. If None, the default part is used.
target (bool): If True, gets target positions for simulation. If False, gets current positions. The default is False.

Returns:
torch.Tensor: Joint positions with shape (N, dof), where N is the number of environments.
"""

qpos = super().get_qpos()
qpos = super().get_qpos(target=target)
if name is None:
return qpos
else:
Expand Down Expand Up @@ -350,16 +352,18 @@ def set_qvel(
target=target,
)

def get_qvel(self, name: str | None = None) -> torch.Tensor:
def get_qvel(self, name: str | None = None, target: bool = False) -> torch.Tensor:
"""Get the joint velocities (qvel) of the robot.

Args:
name (str | None): The name of the control part to get the qvel for. If None, the default part is used.
target (bool): If True, gets target velocities for simulation. If False, gets current velocities. The default is False.

Returns:
torch.Tensor: Joint velocities with shape (N, dof), where N is the number of environments.
"""

qvel = super().get_qvel()
qvel = super().get_qvel(target=target)
if name is None:
return qvel
else:
Expand Down
Loading