I am trying to deploy Navila on a Unitree GO2 robot, but the result doesn't look good. Actually, the VLM doesn't seem to behave according to the instruction at all. Is there anything I' ve done wrong? Here' s the video link of my deployment result.
import socket
import json
import os
import base64
from PIL import Image as PILImage
from io import BytesIO
import cv2
import numpy as np
import time
import re
from unitree_sdk2py.core.channel import ChannelFactoryInitialize
from unitree_sdk2py.go2.sport.sport_client import SportClient
from unitree_sdk2py.go2.video.video_client import VideoClient
# Configuration
VLM_HOST = "192.168.11.63"
VLM_PORT = 54321
NAVIGATION_QUERY = "walk 2.5 meters forward and then turn right. And then keep going right until you are very close to the wall."
IMAGE_SAVE_DIR = "./captured_images"
BLACK_IMAGE_PATH = os.path.join(IMAGE_SAVE_DIR, "black.jpg")
image_list = []
NUM_FRAMES = 16
os.makedirs(IMAGE_SAVE_DIR, exist_ok=True)
def create_black_image():
black = np.zeros((480, 640, 3), dtype=np.uint8)
cv2.imwrite(BLACK_IMAGE_PATH, black)
return BLACK_IMAGE_PATH
create_black_image()
def encode_image(image):
"""Encode the image in base64."""
if isinstance(image, str):
with open(image, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
elif isinstance(image, np.ndarray):
pil_image = PILImage.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
elif isinstance(image, PILImage.Image):
pil_image = image
else:
raise ValueError("Unsupported image type")
buffered = BytesIO()
pil_image.save(buffered, format="JPEG")
return base64.b64encode(buffered.getvalue()).decode('utf-8')
def send_request(host, port, images, query):
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((host, port))
payload = {
'images': [encode_image(image) for image in images],
'query': query
}
data = json.dumps(payload).encode()
client_socket.sendall(len(data).to_bytes(8, 'big'))
client_socket.sendall(data)
size_data = client_socket.recv(8)
size = int.from_bytes(size_data, 'big')
response_data = b''
while len(response_data) < size:
packet = client_socket.recv(4096)
if not packet:
break
response_data += packet
response = json.loads(response_data.decode())
client_socket.close()
return response
def save_image(image, directory=IMAGE_SAVE_DIR):
"""Save the given image to the specified directory with a timestamped filename."""
if not os.path.exists(directory):
os.makedirs(directory)
#timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
filename = f"image_{len(image_list)}.jpg"
filepath = os.path.join(directory, filename)
cv2.imwrite(filepath, image)
print(f"[INFO] Image saved as {filepath}")
return filepath
def capture_single_image(video_client):
#global image_list
code, data = video_client.GetImageSample()
if code != 0 or not data:
print("[WARN] Failed to get image, using black placeholder")
image = cv2.imread(BLACK_IMAGE_PATH)
else:
img_data = np.frombuffer(bytes(data), dtype=np.uint8)
image = cv2.imdecode(img_data, cv2.IMREAD_COLOR)
if image is None:
print("[WARN] Decoded image is invalid, using black placeholder")
image = cv2.imread(BLACK_IMAGE_PATH)
save_image(image)
image_list.append(image)
def select_images_for_request():
num_images = len(image_list)
print("num images", num_images)
if num_images < NUM_FRAMES:
sampled_images = image_list + [image_list[-1]] * (NUM_FRAMES - num_images)
else:
indices = [int(i * (num_images - 1) / (NUM_FRAMES - 1)) for i in range(NUM_FRAMES - 1)]
print("indicies: ", indices)
sampled_images = []
sampled_images = [image_list[i] for i in indices]
sampled_images.append(image_list[-1])
return sampled_images
# -------------------------------
# Action Parser & Executor
# -------------------------------
def parse_action(response_text):
response_text = response_text.lower().strip()
# Match patterns with optional unit
patterns = [
(r'move\s+forward\s+([+-]?\d*\.?\d+)\s*(cm|m)?', 'forward'),
(r'turn\s+(left|right)\s+([+-]?\d*\.?\d+)\s*degree', None), # handle turn separately
]
# Handle forward/backward
match = re.search(r'move\s+forward\s+([+-]?\d*\.?\d+)\s*(cm|m)?', response_text)
if match:
value = float(match.group(1))
unit = match.group(2) or 'cm' # default to cm if unspecified (common in VLMs)
if unit == 'cm':
value = value / 100.0 # convert cm → m
return 'forward', value
# Handle turns
match = re.search(r'turn\s+(left|right)\s+([+-]?\d*\.?\d+)\s*degree', response_text)
if match:
direction = match.group(1)
angle = float(match.group(2))
return ('left' if direction == 'left' else 'right'), angle
return None, None
def execute_action(sport_client, action_type, value):
print(f"[EXECUTING] {action_type} with value: {value}")
try:
if action_type == 'forward':
distance_m = value # already in meters
if distance_m <= 0:
print("[WARN] Non-positive distance, skipping.")
return
# Use safe velocity
vx = min(0.8, max(0.4, distance_m)) # 设置合理的速度范围
duration = distance_m / vx
print(f" → Moving at {vx:.2f} m/s for {duration:.2f} sec")
#sport_client.StandUp()
sport_client.BalanceStand()
time.sleep(1) # 给机器人足够的时间来稳定站立
ret = sport_client.Move(vx, 0.0, 0.0)
print("ret:" , ret)
time.sleep(duration + 0.5) # 确保动作完成
sport_client.StopMove()
elif action_type in ('left', 'right'):
# Turn at ~30 deg/s => time = angle / 30
angle_deg = value
if abs(angle_deg) < 1:
return
wz = 0.5 if action_type == 'left' else -0.5 # rad/s (~28.6 deg/s)
duration = abs(angle_deg) / 28.6 # approx
#sport_client.StandUp()
sport_client.BalanceStand()
time.sleep(1) # 给机器人足够的时间来稳定站立
ret = sport_client.Move(0.0, 0.0, wz)
print("ret:" , ret)
time.sleep(max(0.2, duration))
sport_client.StopMove()
else:
print("[INFO] Unknown action type.")
except Exception as e:
print(f"[ERROR] Motion failed: {e}")
def main():
print("Initializing Unitree Go2...")
ChannelFactoryInitialize(0)
sport_client = SportClient()
sport_client.SetTimeout(10.0)
sport_client.Init()
video_client = VideoClient()
video_client.SetTimeout(3.0)
video_client.Init()
print("Standing up robot...")
sport_client.StandUp()
time.sleep(2)
print("WARNING: Ensure robot has clearance to move!")
input("Press Enter to start autonomous navigation loop...")
try:
while True:
print("\n[STEP] Capturing single image...")
capture_single_image(video_client)
if len(image_list) >= 8:
print("[STEP] Selecting images for VLM server...")
selected_images = select_images_for_request()
print("[STEP] Sending to VLM server...")
print(NAVIGATION_QUERY)
vlm_response = send_request(VLM_HOST, VLM_PORT, selected_images, NAVIGATION_QUERY)
print(f"[VLM RESPONSE] {vlm_response}")
# Here you can add logic to parse and execute actions based on the response.
# For simplicity, it's omitted in this example.
action_type, value = parse_action(vlm_response)
print(f"Parsed Action: {action_type}, Value: {value}") # Debug output
if action_type and value is not None:
execute_action(sport_client, action_type, value)
else:
print("[INFO] No actionable instruction received. Pausing...")
time.sleep(2)
else:
print("[INFO] Not enough images collected yet. Continuing loop...")
time.sleep(1) # Adjust delay as necessary
except KeyboardInterrupt:
print("\n[INFO] Stopping robot and exiting...")
sport_client.StopMove()
sport_client.Damp()
if __name__ == "__main__":
main()
I am trying to deploy Navila on a Unitree GO2 robot, but the result doesn't look good. Actually, the VLM doesn't seem to behave according to the instruction at all. Is there anything I' ve done wrong? Here' s the video link of my deployment result.
https://www.bilibili.com/video/BV1tw6UBmEts/?spm_id_from=333.337.search-card.all.click
I run the vlm server script (https://github.com/yang-zj1026/NaVILA-Bench/blob/main/scripts/vlm_server.py) to start the server.
The robot send request to the server and get response from it. Then the robot execute the corresponding action given in the response text by calling unitreesdk2_python API. The code is given below.
however, when I give it a command to go right, the vlm keep generating the response telling the robot to go left! I' m new in this domain and I ' m wondering that is there anything I' ve done wrong? Can anyone PLEEEAZ help me!