forked from AIspeakeryhl/pytorch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
强化学习(DQN)
119 lines (96 loc) · 4.07 KB
/
强化学习(DQN)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
强化学习环境OpenAI Gym
!pip3 install gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gym
# 超参数
BATCH_SIZE = 32
LR = 0.01
EPSILON = 0.9 #
GAMMA = 0.9 #reward deacy factor
TARGET_REPLACE_ITER = 10 # Q 现实网络的更新频率
MEMORY_CAPACITY = 2000 # 记忆库大小
env = gym.make('CartPole-v0') # 立杆子游戏
env = env.unwrapped
N_ACTIONS = env.action_space.n # 小车能做的动作
N_STATES = env.observation_space.shape[0] # 小车能获取的环境信息数
class Net(nn.Module):
def __init__(self, ):
super(Net, self).__init__()
self.fc1 = nn.Linear(N_STATES, 10)
self.fc1.weight.data.normal_(0, 0.1)
self.out = nn.Linear(10, N_ACTIONS)
self.out.weight.data.normal_(0, 0.1)
def forward(self, x):
x = self.fc1(x)
x = F.relu(x)
actions_value = self.out(x)
return actions_value
class DQN(object):
def __init__(self):
self.eval_net, self.target_net = Net(), Net()
self.learn_step_counter = 0 # 用于 target 更新计时
self.memory_counter = 0 # 记忆库记数
self.memory = np.zeros((MEMORY_CAPACITY, N_STATES * 2 + 2)) # 初始化记忆库
self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=LR) # torch 的优化器
self.loss_func = nn.MSELoss() # 误差公式
def choose_action(self, x):
#EPSILON贪心算法
x = torch.unsqueeze(torch.FloatTensor(x), 0)
# 这里只输入一个 sample
if np.random.uniform() < EPSILON: # 选最优动作
actions_value = self.eval_net.forward(x)
action = torch.max(actions_value, 1)[1].data.numpy()[0] # return the argmax
else: # 选随机动作
action = np.random.randint(0, N_ACTIONS)
return action
def store_transition(self, s, a, r, s_):
transition = np.hstack((s, [a, r], s_))
# 如果记忆库满了, 就覆盖老数据
index = self.memory_counter % MEMORY_CAPACITY
self.memory[index, :] = transition
self.memory_counter += 1
def learn(self):
# target net 参数更新
if self.learn_step_counter % TARGET_REPLACE_ITER == 0:
self.target_net.load_state_dict(self.eval_net.state_dict())
self.learn_step_counter += 1
# 抽取记忆库中的批数据
sample_index = np.random.choice(MEMORY_CAPACITY, BATCH_SIZE)
b_memory = self.memory[sample_index, :]
b_s = torch.FloatTensor(b_memory[:, :N_STATES])
b_a = torch.LongTensor(b_memory[:, N_STATES:N_STATES+1].astype(int))
b_r = torch.FloatTensor(b_memory[:, N_STATES+1:N_STATES+2])
b_s_ = torch.FloatTensor(b_memory[:, -N_STATES:])
# 针对做过的动作b_a, 来选 q_eval 的值, (q_eval 原本有所有动作的值)
q_eval = self.eval_net(b_s).gather(1, b_a) # shape (batch, 1)
q_next = self.target_net(b_s_).detach() # q_next 不进行反向传递误差
q_target = b_r + GAMMA * q_next.max(1)[0] # shape (batch, 1)
loss = self.loss_func(q_eval, q_target)
print(loss)
# 计算, 更新 eval net
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
dqn = DQN()
for i_episode in range(400):
#重建环境
s = env.reset()
while True:
a = dqn.choose_action(s)
# 选动作, 得到环境反馈
s_, r, done, info = env.step(a)
# 修改 reward, 使 DQN 快速学习
x, x_dot, theta, theta_dot = s_
r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8
r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
r = r1 + r2
# 存记忆
dqn.store_transition(s, a, r, s_)
if dqn.memory_counter > MEMORY_CAPACITY:
dqn.learn() # 记忆库满了就进行学习
if done: # 如果回合结束, 进入下回合
break
s = s_