-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAgent.py
More file actions
100 lines (73 loc) · 3.66 KB
/
Agent.py
File metadata and controls
100 lines (73 loc) · 3.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from DQN import DuelingLinearDeepQNetwork
import numpy as np
from ReplayBuffer import ReplayBuffer
import torch as T
import os
class Agent:
def __init__(self, id_, gamma, epsilon, lr, n_actions, input_dims,
mem_size, batch_size, eps_min=0.01, eps_dec=5e-7,
replace=1000, chkpt_dir='./models', view_reduced=False):
self.id = id_
self.gamma = gamma
self.epsilon = epsilon
self.batch_size = batch_size
self.eps_min = eps_min
self.eps_dec = eps_dec
self.replace_target_cnt = replace
os.makedirs(chkpt_dir, exist_ok=True)
self.chkpt_dir = chkpt_dir
self.action_space = [i for i in range(n_actions)]
self.learn_step_counter = 0
self.memory = ReplayBuffer(mem_size, input_dims)
self.q_eval = DuelingLinearDeepQNetwork(lr, n_actions, input_dims=input_dims,
view_reduced=view_reduced)
self.q_next = DuelingLinearDeepQNetwork(lr, n_actions, input_dims=input_dims,
view_reduced=view_reduced)
def choose_action(self, observation):
if np.random.random() > self.epsilon:
state = T.tensor([observation], dtype=T.float).to(self.q_eval.device)
_, advantage = self.q_eval.forward(state)
action = T.argmax(advantage).item()
else:
action = np.random.choice(self.action_space)
return action
def store_transition(self, state, action, reward, state_, done):
self.memory.store_transition(state, action, reward, state_, done)
def replace_target_network(self):
if self.learn_step_counter % self.replace_target_cnt == 0:
self.q_next.load_state_dict(self.q_eval.state_dict())
def decrement_epsilon(self):
self.epsilon = self.epsilon - self.eps_dec \
if self.epsilon > self.eps_min else self.eps_min
def save_models(self):
self.q_eval.save_checkpoint(os.path.join(self.chkpt_dir, f"{self.id}_dqn_model"))
self.q_next.save_checkpoint(os.path.join(self.chkpt_dir, f"{self.id}_dqn_model_next"))
def load_models(self):
self.q_eval.load_checkpoint(os.path.join(self.chkpt_dir, f"{self.id}_dqn_model"))
self.q_next.load_checkpoint(os.path.join(self.chkpt_dir, f"{self.id}_dqn_model_next"))
def learn(self):
if self.memory.mem_counter < self.batch_size:
return
self.q_eval.optimizer.zero_grad()
self.replace_target_network()
state, action, reward, new_state, done = self.memory.sample_buffer(self.batch_size)
states = T.tensor(state).to(self.q_eval.device)
rewards = T.tensor(reward).to(self.q_eval.device)
dones = T.tensor(done).to(self.q_eval.device)
actions = T.tensor(action).to(self.q_eval.device)
states_ = T.tensor(new_state).to(self.q_eval.device)
indices = np.arange(self.batch_size)
V_s, A_s = self.q_eval.forward(states)
V_s_, A_s_ = self.q_next.forward(states_)
V_s_eval, A_s_eval = self.q_eval.forward(states_)
q_pred = T.add(V_s, (A_s - A_s.mean(dim=1, keepdim=True)))[indices, actions]
q_next = T.add(V_s_,(A_s_ - A_s_.mean(dim=1, keepdim=True)))
q_eval = T.add(V_s_eval, (A_s_eval - A_s_eval.mean(dim=1, keepdim=True)))
max_actions = T.argmax(q_eval, dim=1)
q_next[dones] = 0.0
q_target = rewards + self.gamma*q_next[indices, max_actions]
loss = self.q_eval.loss(q_target, q_pred).to(self.q_eval.device)
loss.backward()
self.q_eval.optimizer.step()
self.learn_step_counter += 1
self.decrement_epsilon()