forked from BiaoWang-Robo/ESE-650-Final-Project
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathf110_env_rl.py
More file actions
211 lines (177 loc) · 10.8 KB
/
f110_env_rl.py
File metadata and controls
211 lines (177 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# -- coding: utf-8 --
"""
F1TENTH gym environment of the RL planner
Author: Derek Zhou, Biao Wang, Tian Tan
"""
import os
import numpy as np
import yaml
from controllers.pure_pursuit import PurePursuit, get_lookahead_point
from f110_gym.envs.base_classes import RaceCar
from f110_gym.envs.f110_env import F110Env
from gym import spaces
from utils.lidar_utils import downsample_lidar_scan
from utils.render import Renderer, fix_gui
from utils.traj_utils import get_front_traj, get_interpolated_traj_with_horizon, densify_offset_traj, get_offset_traj, \
global_to_local, local_to_global, bresenham_line_index
from utils.waypoint_loader import WaypointLoader
class F110RLEnv(F110Env):
def __init__(self, **kwargs):
# load keyword arguments
self.render_flag = kwargs['render']
map_name = kwargs['map_name'] # levine_2nd, skir
self.num_obstacles = kwargs['num_obstacles']
self.num_lidar_scan = kwargs['num_lidar_scan']
# load map, waypoints, controller, and renderer
map_path = os.path.abspath(os.path.join('maps', map_name))
csv_data = np.loadtxt(map_path + '/' + map_name + '_raceline.csv', delimiter=';', skiprows=0)
with open(map_path + '/' + map_name + '_map' + '.yaml', 'r') as yaml_stream:
try:
map_metadata = yaml.safe_load(yaml_stream)
self.map_resolution = map_metadata['resolution']
self.origin = map_metadata['origin']
except yaml.YAMLError as ex:
print(ex)
self.orig_x = self.origin[0]
self.orig_y = self.origin[1]
self.waypoints = WaypointLoader(map_name, csv_data)
self.controller = PurePursuit(self.waypoints)
self.renderer = Renderer(self.waypoints)
if self.render_flag:
super().add_render_callback(self.renderer.render_waypoints)
super().add_render_callback(self.renderer.render_front_traj)
super().add_render_callback(self.renderer.render_horizon_traj)
super().add_render_callback(self.renderer.render_lookahead_point)
super().add_render_callback(self.renderer.render_offset_traj)
super().add_render_callback(fix_gui)
if 'obt_poses' in kwargs:
# get the obstacle poses input
obt_pose = kwargs['obt_poses']
else:
# randomly generate obstacles
obt_index = np.random.uniform(1, self.waypoints.x.shape[0] - 1, size=(self.num_obstacles,)).astype(int)
thetas = np.arctan2(self.waypoints.x[obt_index + 1] - self.waypoints.x[obt_index - 1],
self.waypoints.y[obt_index + 1] - self.waypoints.y[obt_index - 1])
obt_pose = np.array([self.waypoints.x[obt_index],
self.waypoints.y[obt_index], thetas]).transpose().reshape((-1, 3))
# load the super class - F110Env
super(F110RLEnv, self).__init__(map=map_path + '/' + map_name + '_map',
map_ext='.pgm' if map_name == 'levine_2nd' or map_name == 'skir' or map_name == 'skir_blocked' else '.png',
seed=0, num_agents=1, obt_poses=obt_pose)
# init params
self.horizon = int(10)
self.predict_time = 2.0 # get waypoints in coming seconds
self.fixed_speed = 2.0
self.lookahead_dist = 0.8
self.offset = [0.5] * self.horizon # self.offset = [0.5] * self.horizon
self.map_max_rows = RaceCar.scan_simulator.map_img.shape[0]
self.map_max_cols = RaceCar.scan_simulator.map_img.shape[1]
# set up the bounding boxes
self.max_lidar_range = 30
self.max_pose = 1e3
self.min_pose = -self.max_pose
self.max_offset = 1
self.min_offset = -self.max_offset
low_lidar = 0 * np.ones((self.num_lidar_scan,), dtype=np.float32)
high_lidar = self.max_lidar_range * np.ones((self.num_lidar_scan,), dtype=np.float32)
low_traj = self.min_pose * np.ones((self.horizon * 2,), dtype=np.float32)
high_traj = self.max_pose * np.ones((self.horizon * 2,), dtype=np.float32)
low_pose = self.min_pose * np.ones((2,), dtype=np.float32)
high_pose = self.max_pose * np.ones((2,), dtype=np.float32)
obs_low_bound = np.hstack((low_lidar, low_traj, low_pose))
obs_high_bound = np.hstack((high_lidar, high_traj, high_pose))
self.observation_space = spaces.Box(low=obs_low_bound, high=obs_high_bound,
shape=(obs_high_bound.shape[0],), dtype=np.float32)
self.single_observation_space = spaces.Box(low=obs_low_bound, high=obs_high_bound,
shape=(obs_high_bound.shape[0],), dtype=np.float32)
# print("observation space shape", self.single_observation_space.shape)
self.action_space = spaces.Box(low=self.min_offset, high=self.max_offset,
shape=(self.horizon,), dtype=np.float32) # action: offsets in n horizons
self.single_action_space = spaces.Box(low=self.min_offset, high=self.max_offset,
shape=(self.horizon,), dtype=np.float32)
# print("action space shape", self.single_action_space.shape)
def get_network_obs(self):
lidar_obs = downsample_lidar_scan(self.obs['scans'][0].flatten(), self.num_lidar_scan)
traj_obs = self.local_horizon_traj[:, :2].flatten()
pose_obs = np.array([self.obs['poses_x'][0], self.obs['poses_y'][0]]).reshape((-1,))
network_obs = np.hstack((lidar_obs, traj_obs, pose_obs))
return network_obs
def reset(self, seed=1):
# initialization
init_index = np.random.randint(0, self.waypoints.x.shape[0])
init_pos = np.array([self.waypoints.x[init_index], self.waypoints.y[init_index],
self.waypoints.θ[init_index]]).reshape((1, -1))
# init_pos = np.array([[0.0, 0.0, 0.0]]) # !!!! fixed init or not
self.obs, _, self.done, _ = super().reset(init_pos) # self.obs, _, self.done, _ = F110Env.reset(self,init_pos)
self.lap_time = 0.0
# get init horizon traj
self.front_traj = get_front_traj(self.obs, self.waypoints, predict_time=self.predict_time) # [i, x, y, v]
self.horizon_traj = get_interpolated_traj_with_horizon(self.front_traj, self.horizon) # [x, y, v]
self.local_horizon_traj = global_to_local(self.obs, self.horizon_traj)
self.local_offset_traj = get_offset_traj(self.local_horizon_traj, self.offset)
self.offset_traj = local_to_global(self.obs, self.local_offset_traj)
self.offset_traj = np.vstack((np.array([[self.obs['poses_x'][0], self.obs['poses_y'][0], self.fixed_speed]]),
self.offset_traj)) # add car pose as the first point
if self.render_flag:
self.renderer.front_traj = self.front_traj
self.renderer.horizon_traj = self.horizon_traj
self.renderer.offset_traj = self.offset_traj
network_obs = self.get_network_obs()
return network_obs
def step(self, offset=None):
self.offset = offset
# add offsets on horizon traj & densify offset traj to 80 points & get lookahead point & pure pursuit
self.local_offset_traj = get_offset_traj(self.local_horizon_traj, self.offset)
self.offset_traj = local_to_global(self.obs, self.local_offset_traj)
self.offset_traj = np.vstack((np.array([[self.obs['poses_x'][0], self.obs['poses_y'][0], self.fixed_speed]]),
self.offset_traj)) # add car pose as the first point
dense_offset_traj = densify_offset_traj(self.offset_traj) # [x, y, v]
# dense_offset_traj = densify_offset_traj(self.horizon_traj) # !!!! for bootstrap only! -> Behavioral Cloning
lookahead_point_profile = get_lookahead_point(self.obs, dense_offset_traj, lookahead_dist=self.lookahead_dist)
steering, speed = self.controller.rl_control(self.obs, lookahead_point_profile, max_speed=self.fixed_speed)
# step function in race car, time step is k+1 now
self.obs, step_time, self.done, info = super().step(np.array([[steering, self.fixed_speed]]))
self.lap_time += step_time
# extract waypoints in predicted time & interpolate the front traj to get a 10-point-traj
self.front_traj = get_front_traj(self.obs, self.waypoints, predict_time=self.predict_time) # [i, x, y, v]
self.horizon_traj = get_interpolated_traj_with_horizon(self.front_traj, self.horizon) # [x, y, v]
self.local_horizon_traj = global_to_local(self.obs, self.horizon_traj)
# get agent observation [lidar, front traj, pose]
network_obs = self.get_network_obs()
# offset trajectory collision predictions
offset_x_index = np.ceil((self.offset_traj[:, 0] - self.orig_x) / self.map_resolution).astype(int)
offset_y_index = np.ceil((self.offset_traj[:, 1] - self.orig_y) / self.map_resolution).astype(int)
offset_traj_indices = np.vstack((offset_x_index, offset_y_index)).T
all_indices = []
for i in range(offset_traj_indices.shape[0] - 1):
line_indices = bresenham_line_index(offset_traj_indices[i, :], offset_traj_indices[i + 1, :])
all_indices.append(line_indices)
all_indices = np.concatenate(all_indices).reshape(-1, 2)
filtered_traj_indices = all_indices[
(all_indices[:, 1] < self.map_max_rows) & (all_indices[:, 0] < self.map_max_cols) &
(all_indices[:, 1] >= 0) & (all_indices[:, 0] >= 0)]
# !!!! modify your reward
# derek's reward for bootstrapping
# reward = 100 * step_time
# reward -= 1 * np.linalg.norm(offset, ord=2)
# if super().current_obs['collisions'][0] == 1:
# reward -= 1000
# !!!! modify your reward
# derek's reward for obstacle avoidance
reward = 100 * step_time
# reward -= 0.1 * np.linalg.norm(offset, ord=2)
first_diff = (offset[1:] - offset[:-1])
second_diff = first_diff[1:] - first_diff[:-1]
reward -= 0.2 * np.linalg.norm(first_diff, ord=2) # < 0.2
reward -= 0.1 * np.linalg.norm(second_diff, ord=2) # < 0.2
reward -= 0.05 * np.count_nonzero(
RaceCar.scan_simulator.map_img[filtered_traj_indices[:, 1], filtered_traj_indices[:, 0]] == 0) # < 0.5
if super().current_obs['collisions'][0] == 1:
reward -= 10
if self.render_flag: # render update
self.renderer.offset_traj = self.offset_traj
self.renderer.ahead_point = lookahead_point_profile[:2] # [x, y]
self.renderer.front_traj = self.front_traj
self.renderer.horizon_traj = self.horizon_traj
super().render('human')
return network_obs, reward, self.done, info