-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent.py
More file actions
165 lines (129 loc) · 5.48 KB
/
agent.py
File metadata and controls
165 lines (129 loc) · 5.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
from matplotlib.pyplot import plot
import torch
import random
import numpy as np
from collections import deque # Double End Queue
from snake_game import SnakeGameAI, Direction, Point
from model import Linear_QNet, QTrainer
from plot import plot
## Game Settings
BLOCK_SIZE = 20
## NN Settings
MAX_MEMORY = 100000
BATCH_SIZE = 1000
LR = 0.001
I,H,O = 11, 256, 3 ## 11 States, 3 Moves(Straight, Right, Left)
class Agent:
def __init__(self):
self.n_games = 0
self.epsilon = 0 # randomness
self.gamma = 0.9 # discount rate (Smaller than 1)
self.memory = deque(maxlen=MAX_MEMORY) # If memory exceed, popleft()
self.model = Linear_QNet(I,H,O)
self.trainer = QTrainer(self.model, lr=LR, gamma=self.gamma)
def get_state(self, game):
head = game.snake[0]
point_left = Point(head.x-BLOCK_SIZE, head.y)
point_right = Point(head.x+BLOCK_SIZE, head.y)
point_up = Point(head.x, head.y-BLOCK_SIZE)
point_down = Point(head.x, head.y+BLOCK_SIZE)
direction_left = game.direction == Direction.LEFT
direction_right = game.direction == Direction.RIGHT
direction_up = game.direction == Direction.UP
direction_down = game.direction == Direction.DOWN
state = [
# Danger Straight
(direction_right and game._is_collision(point_right)) or
(direction_left and game._is_collision(point_left)) or
(direction_up and game._is_collision(point_up)) or
(direction_down and game._is_collision(point_down)),
# Danger Right
(direction_up and game._is_collision(point_right)) or
(direction_down and game._is_collision(point_left)) or
(direction_left and game._is_collision(point_up)) or
(direction_right and game._is_collision(point_down)),
# Danger Left
(direction_up and game._is_collision(point_left)) or
(direction_down and game._is_collision(point_right)) or
(direction_right and game._is_collision(point_up)) or
(direction_left and game._is_collision(point_down)),
# Move Direction
direction_left,
direction_right,
direction_up,
direction_down,
# Food Location
game.food.x < game.head.x, # Food Left
game.food.x > game.head.x, # Food Right
game.food.y < game.head.y, # Food Up
game.food.y > game.head.y, # Food Down
]
return np.array(state, dtype=int)
def cache(self, state, action, reward, next_state, done):
# popleft is MAX_MEMORY is reached
self.memory.append((state, action, reward, next_state, done)) # Append only one tuple
def train_long_memory(self):
if len(self.memory) > BATCH_SIZE:
mini_sample = random.sample(self.memory, BATCH_SIZE) # BATCH SIZE number of tuples
else:
mini_sample = self.memory
states, actions, rewards, next_states, dones = zip(*mini_sample)
self.trainer.train_step(states, actions, rewards, next_states, dones)
#for state, action, reward, next_state, done in mini_sample:
# self.train_short_memory(state, action, reward, next_state, done)
def train_short_memory(self, state, action, reward, next_state, done):
self.trainer.train_step(state, action, reward, next_state, done)
def get_action(self, state):
# Random Moves: Tradeoff exploration / exploitation
self.epsilon = 80 - self.n_games # The more games, the smaller the epsilon (Randomness)
final_move = [0,0,0]
if random.randint(0,200) < self.epsilon:
move = random.randint(0,2)
final_move[move] = 1
else:
state0 = torch.tensor(state, dtype=torch.float32)
prediction = self.model(state0)
move = torch.argmax(prediction).item() # EG. [3.23, 2.34, 1.45] -> [1, 0, 0]
final_move[move] = 1
return final_move
def train():
plot_scores = []
plot_mean_scores = []
total_score = 0
best_score = 0
agent = Agent()
game = SnakeGameAI()
while True:
# Get old state
state_old = agent.get_state(game)
# Get Move
final_move = agent.get_action(state_old)
# Apply Move and Get new state
reward, done, score = game.play_step(final_move)
state_new = agent.get_state(game)
# Train Short memory (Just for previous last move)
agent.train_short_memory(state_old, final_move, reward, state_new, done)
# Memory
agent.cache(state_old, final_move, reward, state_new, done)
if done:
# Reset the game
game.reset()
agent.n_games += 1
# Train Long Memory (All moves)
agent.train_long_memory()
if score > best_score:
best_score = score
# Save Best Score
agent.model.save()
print('Game: {} Score: {} Best Score: {}'.format(agent.n_games, score, best_score))
total_score += score
mean_score = total_score / agent.n_games
# Plot on matplotlib
plot_on_graph(plot_scores, plot_mean_scores, score, mean_score)
def plot_on_graph(plot_scores, plot_mean_scores, score=0, mean_score=0):
# Plot on matplotlib
plot_scores.append(score)
plot_mean_scores.append(mean_score)
plot(plot_scores, plot_mean_scores)
if __name__ == '__main__':
train()