强化学习-6-DQN算法

强化学习的笔记、理解、感悟及代码实现,仅按个人思维进行精华总结和记录,使用的教程:动手学强化学习

环境准备:

1
2
3
4
5
6
7
8
import random
import gym
import numpy as np
import collections
from tqdm import tqdm
import torch
import torch.nn.functional as F
import imageio
深度 Q 网络(deep Q network,DQN)算法是从Q-learning 扩展而来。在之前的Q-learning算法中,动作价值函数q是使用数组表格来表示的,适用于离散或有限的状态和动作空间,而DQN算法则是使用神经网络来表示动作价值函数q,可以适用于更大或连续的状态和动作空间。

使用Q网络来表示价值函数时,若动作是连续的,可以将(s,a)作为输入,输出对应价值标量;若动作是离散的,可以采取动作连续的做法,也可以将s作为输入,输出每一个动作的Q值。

损失函数

Q-learning算法的更新规则是基于TD误差的,即:

收敛时TD误差接近于0,不难想到对于每一组数据,要使接近于TD目标,故可以定义损失函数为均方误差:

经验回放

Q-learning算法中,每一个数据只用来更新一次Q值,而在有监督学习中,每一个训练数据会被使用多次。为了将Q-learning和深度神经网络更好的结合,DQN采用了经验回放的方法:维护一个回放缓冲区,每次都将采样数据存入其中,训练Q网络时从缓冲区中随机抽取一批数据进行训练。作用如下:
  • 使样本满足独立假设:直接采样的数据不满足独立假设,总是一个序列的数据,会使神经网络拟合到最近训练的数据上。采取经验回放,就可以打破样本的相关性。
  • 提高效率:类似于Dyna-Q算法,每个样本可以使用多次,十分适合深度神经网络的梯度学习。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class ReplayBuffer:
''' 经验回放池 '''
def __init__(self, capacity):
self.buffer = collections.deque(maxlen=capacity) # 队列,先进先出

def add(self, state, action, reward, next_state, done): # 将数据加入buffer
self.buffer.append((state, action, reward, next_state, done))

def sample(self, batch_size): # 从buffer中采样数据,数量为batch_size
transitions = random.sample(self.buffer, batch_size)
state, action, reward, next_state, done = zip(*transitions)
return np.array(state), action, reward, np.array(next_state), done

def size(self): # 目前buffer中数据的数量
return len(self.buffer)

目标网络

DQN算法中,Q网络的损失函数本身就包含目标网络的输出,Q网络一边训练更新,目标网络也一直在改变,很容易造成训练的不稳定性。所以需要分为训练网络和目标网络两套Q网络,训练网络用于更新参数,每一步都更新,目标网络用于估计目标值暂时固定,每隔一定步数再和训练网络同步。

训练网络的流程如下: alt text

实验

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class Qnet(torch.nn.Module):
''' 只有一层隐藏层的Q网络 '''
def __init__(self, state_dim, hidden_dim, action_dim):
super(Qnet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, action_dim)

def forward(self, x):
x = F.relu(self.fc1(x)) # 隐藏层使用ReLU激活函数
return self.fc2(x)

class DQN:
''' DQN算法 '''
def __init__(self,learning_rate=2e-3, gamma=0.98,epsilon=0.01,target_update_step=10,ReplayBuffer_size=10000,device=torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")):
self.train_env = gym.make('CartPole-v1',new_step_api=True)
self.eval_env = gym.make('CartPole-v1',new_step_api=True)
self.env_seed = 0
#经验回放缓冲区
self.replay_buffer = ReplayBuffer(ReplayBuffer_size)
#Q网络
self.state_dim = self.train_env.observation_space.shape[0]
self.action_dim = self.train_env.action_space.n
self.q_net = Qnet(self.state_dim, 128,self.action_dim).to(device) #训练Q网络
self.target_q_net = Qnet(self.state_dim, 128,self.action_dim).to(device)# 目标Q网络
self.target_update_step = target_update_step # 目标网络更新间隔步长
#优化器
self.optimizer = torch.optim.Adam(self.q_net.parameters(),lr=learning_rate)# 使用Adam优化器
# 超参数
self.gamma = gamma # 折扣因子
self.epsilon = epsilon # epsilon-贪婪策略
self.device = device
def set_env_seed(self,seed):
self.env_seed = seed

def take_action(self, state): # epsilon-贪婪策略采取动作
if np.random.random() < self.epsilon:
action = np.random.randint(self.action_dim)
else:
state = torch.tensor([state], dtype=torch.float).to(self.device)
action = self.q_net(state).argmax().item()
return action

def evaluate_model(self,num_episodes=10):
total_reward = 0
for i_episode in range(num_episodes):
state = self.eval_env.reset(seed=self.env_seed)
done = False
while not done:
# 使用 numpy.array 将状态转换为张量
state_tensor = torch.tensor(np.array([state]), dtype=torch.float).to(self.device) # 转换为 numpy 数组
q_values = self.q_net(state_tensor) # 获取 Q 值
action = np.argmax(q_values.detach().cpu().numpy()) # 选择具有最高 Q 值的动作
# 采取动作并观察新的状态和奖励
next_state, reward, done, _,_ = self.eval_env.step(action)
state = next_state
total_reward += reward
return total_reward/num_episodes

def update_q_net(self, transition_dict):
states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)
actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)
rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device)
next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device)
dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device)

q_values = self.q_net(states).gather(1, actions) # Q值
max_next_q_values = self.target_q_net(next_states).max(1)[0].view(-1, 1) # 下个状态的最大Q值
q_targets = rewards + self.gamma * max_next_q_values * (1 - dones) # TD误差目标
dqn_loss = torch.mean(F.mse_loss(q_values, q_targets)) # 均方误差损失函数
self.optimizer.zero_grad() # PyTorch中默认梯度会累积,这里需要显式将梯度置为0
dqn_loss.backward() # 反向传播计算梯度
self.optimizer.step()# 更新参数

def run(self,num_episodes=500,batch_size=64):
max_return = 0 # 记录最大奖励
q_net_update_times = 0 # Q网络更新次数
episode_return_list = []
pbar = tqdm(total=num_episodes, desc="Training")
for i_episode in range(num_episodes):
episode_return = 0
state = self.train_env.reset(seed=self.env_seed)
done = False
while not done:
action = self.take_action(state)
next_state, reward, done, _,_ = self.train_env.step(action)
episode_return += reward
self.replay_buffer.add(state, action, reward, next_state, done)
state = next_state
if self.replay_buffer.size() > 500:#数据大于500条时开始训练
b_s, b_a, b_r, b_ns, b_d = self.replay_buffer.sample(batch_size)
transition_dict = {'states': b_s,'actions': b_a,'next_states': b_ns,'rewards': b_r,'dones': b_d}
self.update_q_net(transition_dict)
if q_net_update_times % self.target_update_step == 0:#训练网络更新target_update_step次,目标网络同步一次
self.target_q_net.load_state_dict(self.q_net.state_dict())
q_net_update_times += 1
episode_return_list.append(episode_return)
if (i_episode+1) % 10 == 0:#每10轮更新一次进度条、评估模型、保存模型
pbar.set_postfix({"Episode": i_episode + 1, "Return": f"{np.mean(episode_return_list[-10:]):.3f}"})
current_return = self.evaluate_model(10)
if current_return > max_return:
max_return = current_return
torch.save(self.q_net.state_dict(), 'q_net.pth')
pbar.update(1)

def show_result(self):
env_name = 'CartPole-v1'
# env = gym.make(env_name, render_mode='human',new_step_api=True)
env = gym.make(env_name,new_step_api=True)
self.q_net.load_state_dict(torch.load('q_net.pth'))
self.q_net.eval() # 将模型设置为评估模式
# 准备捕获的帧
frames = []
total_reward = 0
for i_episode in range(10):
state = env.reset(seed=self.env_seed) # 重置环境
done = False
while not done:
# # 转换状态为张量并进行 Q 值预测
# state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device) # 增加批次维度
# 使用 numpy.array 将状态转换为张量
state_tensor = torch.tensor(np.array([state]), dtype=torch.float).to(self.device) # 转换为 numpy 数组
q_values = self.q_net(state_tensor) # 获取 Q 值
action = np.argmax(q_values.detach().cpu().numpy()) # 选择具有最高 Q 值的动作
# 采取动作并观察新的状态和奖励
next_state, reward, done, _,_ = env.step(action)
state = next_state
total_reward += reward
# 渲染环境并捕获当前帧
frame = env.render(mode='rgb_array')
frames.append(frame)
print(f"Avarage reward: {total_reward/10:.3f}")
# 保存为 GIF
imageio.mimsave('cartpole.gif', frames, duration=0.1)
if __name__ == '__main__':
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)
agent = DQN(learning_rate=2e-3,gamma=0.98,epsilon=0.01,target_update_step=10,ReplayBuffer_size=10000)
agent.set_env_seed(0)
agent.run(num_episodes=500,batch_size=64)
agent.show_result()

结果

在车杆环境中,有一辆小车,智能体的任务是通过左右移动保持车上的杆竖直,若杆的倾斜度数过大,或者车子离初始位置左右的偏离程度过大,或者坚持时间到达 200 帧,则游戏结束。智能体的状态是一个维数为 4 的向量,每一维都是连续的,其动作是离散的,动作空间大小为 2。在游戏中每坚持一帧,智能体能获得分数为 1 的奖励,坚持时间越长,则最后的分数越高,坚持 200 帧即可获得最高的分数。

alt text

采样500次数据同时更新模型参数,每10次进行一次模型评估,保存结果最好的模型,其平均回报Avarage reward: 218.000。

alt text