Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ jobs:
matrix:
include:
- os: ubuntu-latest
python-version: '3.7.17'
python-version: '3.8'
- os: ubuntu-latest
python-version: '3.10.15'
python-version: '3.10'
- os: ubuntu-latest
python-version: '3.12.6'
python-version: '3.12'
# - os: macos-latest
# python-version: '3.12.6'
# python-version: '3.12'
- os: windows-latest
python-version: '3.12.6'
python-version: '3.12'

steps:
- name: Check out repository
Expand Down
63 changes: 63 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,69 @@ Explore the examples to learn how to use the package in various scenarios:

- [examples/webots](./examples/webots): Teleoperation of a UR5e robot arm using [ikpy](https://github.com/Phylliade/ikpy) in the [Webots](https://github.com/cyberbotics/webots/) simulator.

## Utils

The package includes several utility classes to simplify robot arm integration:

> [!NOTE]
> To use the utility classes, install the package with the additional dependencies:
> ```bash
> pip install teleop[pin]
> ```

### JacobiRobot

A Pinocchio-based servoing and kinematics for robotic manipulators.

**Key Features:**
- Forward/inverse kinematics using Pinocchio
- Pose-based servo control with velocity/acceleration limits
- Real-time 3D visualization
- Joint-level control and monitoring

**Usage:**
```python
from teleop.utils.jacobi_robot import JacobiRobot

robot = JacobiRobot("robot.urdf", ee_frame_name="end_effector")
target_pose = np.eye(4) # 4x4 transformation matrix
reached = robot.servo_to_pose(target_pose, dt=0.01)
```

### JacobiRobotROS

ROS 2 wrapper for JacobiRobot that integrates with standard ROS 2 topics and messages.

**Key Features:**
- Automatic URDF loading from `/robot_description` topic
- Joint state subscription and trajectory publishing
- Compatible with `joint_trajectory_controller`
- Seamless integration with existing ROS 2 control stacks

**Usage:**
```python
from teleop.utils.jacobi_robot_ros import JacobiRobotROS
import rclpy

rclpy.init()
node = rclpy.create_node("robot_control")

robot = JacobiRobotROS(
node=node,
ee_frame_name="end_effector",
joint_names=["joint1", "joint2", "joint3"]
)

robot.reset_joint_states() # Wait for initial joint states
reached = robot.servo_to_pose(target_pose, dt=0.03)
```

| **Topic** | **Type** | **Message Type** | **Description** |
|-----------|----------|------------------|-----------------|
| `/joint_states` | Subscribed | [sensor_msgs/JointState](https://docs.ros2.org/latest/api/sensor_msgs/msg/JointState.html) | Current joint positions and velocities |
| `/robot_description` | Subscribed | [std_msgs/String](https://docs.ros2.org/latest/api/std_msgs/msg/String.html) | URDF robot description |
| `/joint_trajectory_controller/joint_trajectory` | Published | [trajectory_msgs/JointTrajectory](https://docs.ros2.org/latest/api/trajectory_msgs/msg/JointTrajectory.html) | Joint trajectory commands for robot control |

## Development

If you’d like to contribute, install the package in editable mode:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,83 +1,64 @@
import sys
import tempfile
import threading

try:
import ikpy
from ikpy.chain import Chain
except ImportError:
sys.exit(
'The "ikpy" Python module is not installed. '
'To run this sample, please upgrade "pip" and install ikpy with this command: "pip install ikpy"'
)

from controller import Robot
from teleop import Teleop
from teleop.utils.jacobi_robot import JacobiRobot


class RobotArm(Robot):
def __init__(self):
super().__init__()
self.__arm_chain = None
self.__motors = None
self.__motors = {}

# Create the arm chain from the URDF
filename = None
self.__jacobi = None
with tempfile.NamedTemporaryFile(suffix=".urdf", delete=False) as file:
filename = file.name
file.write(self.getUrdf().encode("utf-8"))
self.__arm_chain = Chain.from_urdf_file(
filename,
active_links_mask=[False, True, True, True, True, True, True, False],
symbolic=False,
)
self.__jacobi = JacobiRobot(file.name, ee_frame_name="wrist_3_link")

# Initialize the arm motors and encoders.
timestep = int(self.getBasicTimeStep())
self.__motors = []
for link in self.__arm_chain.links:
if "joint" in link.name and "pen" not in link.name:
motor = self.getDevice(link.name)
motor.setVelocity(1.0)
position_sensor = motor.getPositionSensor()
position_sensor.enable(timestep)
self.__motors.append(motor)
for joint_name in self.__jacobi.get_joint_names():
motor = self.getDevice(joint_name)
motor.setVelocity(1.0)
position_sensor = motor.getPositionSensor()
position_sensor.enable(timestep)
self.__motors[joint_name] = motor

def move_to_position(self, pose):
position = pose[:3, 3]
orientation = pose[:3, :3]
initial_position = (
[0] + [m.getPositionSensor().getValue() for m in self.__motors] + [0]
)
ik_results = self.__arm_chain.inverse_kinematics(
position, initial_position=initial_position, target_orientation=orientation, orientation_mode="all"
)
for i in range(len(self.__motors)):
self.__motors[i].setPosition(ik_results[i + 1])
self.__jacobi.servo_to_pose(pose)
for name, motor in self.__motors.items():
motor.setPosition(self.__jacobi.get_joint_position(name))

def move_joints(self, positions):
for i in range(len(self.__motors)):
self.__motors[i].setPosition(positions[i])
for name, motor in self.__motors.items():
motor.setPosition(positions[name])
self.__jacobi.set_joint_position(name, positions[name])

def get_current_pose(self):
positions = (
[0] + [m.getPositionSensor().getValue() for m in self.__motors] + [0]
)
return self.__arm_chain.forward_kinematics(positions)
return self.__jacobi.get_ee_pose()


def main():
target_pose = None
robot = RobotArm()
teleop = Teleop()
target_pose = None

def on_teleop_callback(pose, message):
nonlocal target_pose

if message["move"]:
target_pose = pose

robot.move_joints([0, -1.0, 1.8, -2.0, -1.3, 0.4])
else:
target_pose = None

robot.move_joints({
"shoulder_pan_joint": 0.0,
"shoulder_lift_joint": -1.0,
"elbow_joint": 1.8,
"wrist_1_joint": -2.0,
"wrist_2_joint": -1.3,
"wrist_3_joint": 0.4
})
timestep = int(robot.getBasicTimeStep())
for _ in range(100):
robot.step(timestep) != -1
Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
setup(
name="teleop",
version="0.0.8",
packages=["teleop", "teleop.basic", "teleop.ros2"],
packages=["teleop", "teleop.basic", "teleop.ros2", "teleop.utils"],
long_description=long_description,
long_description_content_type="text/markdown",
description="Turns your phone into a robot arm teleoperation device by leveraging the WebXR API",
Expand All @@ -29,6 +29,9 @@
"pytest",
"requests",
],
extras_require={
"pin": ["pin"],
},
package_data={
"teleop": ["cert.pem", "key.pem", "index.html"],
},
Expand Down
19 changes: 18 additions & 1 deletion teleop/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import ssl
import os
import math
import socket
import logging
from werkzeug.serving import ThreadedWSGIServer
from typing import Callable
Expand All @@ -13,6 +14,18 @@
THIS_DIR = os.path.dirname(os.path.realpath(__file__))


def get_local_ip():
try:
# Connect to an external address (doesn't actually send data)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80)) # Google DNS as a dummy target
local_ip = s.getsockname()[0]
s.close()
return local_ip
except Exception as e:
return f"Error: {e}"


def are_close(a, b=None, lin_tol=1e-9, ang_tol=1e-9):
"""
Check if two transformation matrices are close to each other within specified tolerances.
Expand Down Expand Up @@ -207,7 +220,11 @@ def run(self) -> None:
"""
Runs the teleop server. This method is blocking.
"""
self.__logger.info(f"Server started at https://{self.__host}:{self.__port}")
self.__logger.info(f"Server started at {self.__host}:{self.__port}")
self.__logger.info(
f"The phone web app should be available at https://{get_local_ip()}:{self.__port}"
)

self.__server = ThreadedWSGIServer(
app=self.__app,
host=self.__host,
Expand Down
2 changes: 1 addition & 1 deletion teleop/ros2/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def main():
current_robot_pose_message = None
if args.omit_current_pose:
current_robot_pose_message = PoseStamped()
current_robot_pose_message.pose.orientation.w = 1
current_robot_pose_message.pose.orientation.w = 1.0
pose_initiated = False
node = rclpy.create_node("ros2_teleop")
pose_publisher = node.create_publisher(PoseStamped, "target_frame", 1)
Expand Down
Empty file added teleop/utils/__init__.py
Empty file.
Loading