强化学习-3-动态规划

强化学习的笔记、理解、感悟及代码实现,仅按个人思维进行精华总结和记录,使用的教程:动手学强化学习

动态规划依赖于子问题分解和递归方程,基于动态规划的强化学习算法主要有策略迭代和价值迭代两种,分别使用贝尔曼期望方程和贝尔曼最优方程,需要事先知道环境的状态转移和奖励函数,适用于有限的状态和动作空间。

策略迭代

策略迭代分策略评估和策略提升两部分,策略评估计算出每个状态的价值函数,策略提升根据价值函数改进策略,不断迭代直到收敛。

策略评估

贝尔曼期望方程为状态价值函数提供了一个递归关系。假设在某个状态s下,策略π(a|s)的价值函数为v(s,π),则贝尔曼期望方程为:

其中,π(a|s)是策略π在状态s下采取动作a的概率,r(s,a)是状态s下采取动作a的奖励,γ是折扣因子,p(s’|s,a)是状态s下采取动作a转移到状态s’的概率。

随机初始化所有状态的价值函数V(s)后,根据上一轮价值函数计算下一轮价值函数V’(s),随着即时奖励R(s,a)和未来奖励的传播积累,价值函数会越来越准确并收敛即(收敛性证明略)。

策略提升

贪心地在每一个状态选择动作价值最大的动作,得到新的策略:

策略迭代算法

alt text

实验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import copy
class CliffWalkingEnv:
""" 悬崖漫步环境"""
def __init__(self, ncol=12, nrow=4):
self.ncol = ncol # 定义网格世界的列
self.nrow = nrow # 定义网格世界的行
# 转移矩阵P[state][action] = [(p, next_state, reward, done)]包含下一个状态、概率和奖励,done表示是否终止(悬崖或者终点)
self.P = self.createP()

def createP(self):
# 初始化
P = [[[] for j in range(4)] for i in range(self.nrow * self.ncol)]
# 4种动作, change[0]:上,change[1]:下, change[2]:左, change[3]:右。坐标系原点(0,0)
# 定义在左上角
change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
for i in range(self.nrow):
for j in range(self.ncol):
for a in range(4):
# 位置在悬崖或者目标状态,因为无法继续交互,任何动作奖励都为0
if i == self.nrow - 1 and j > 0:
P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0,
True)]
continue
# 其他位置
next_x = min(self.ncol - 1, max(0, j + change[a][0]))
next_y = min(self.nrow - 1, max(0, i + change[a][1]))
next_state = next_y * self.ncol + next_x
reward = -1
done = False
# 下一个位置在悬崖或者终点
if next_y == self.nrow - 1 and next_x > 0:
done = True
if next_x != self.ncol - 1: # 下一个位置在悬崖
reward = -100
P[i * self.ncol + j][a] = [(1, next_state, reward, done)]
return P

class PolicyIteration:
""" 策略迭代算法 """
def __init__(self, env, theta, gamma):
self.env = env
self.v = [0] * self.env.ncol * self.env.nrow # 初始化价值为0
self.pi = [[0.25, 0.25, 0.25, 0.25]
for i in range(self.env.ncol * self.env.nrow)] # 初始化为均匀随机策略
self.theta = theta # 策略评估收敛阈值
self.gamma = gamma # 折扣因子
def policy_evaluation(self):
""" 策略评估 """
cnt1 = 0
while True:
cnt1 += 1
new_v = [0] * self.env.ncol * self.env.nrow
for s in range(self.env.ncol * self.env.nrow):
qsa_list = []#状态s下所有动作的价值列表
for a in range(4):#遍历s下所有4个动作
qsa = 0
for res in self.env.P[s][a]:#遍历状态s下动作a的所有可能到达状态
p, next_state, reward, done = res
qsa += p * (reward + self.gamma * self.v[next_state]*(1-done))#计算动作价值
qsa_list.append(self.pi[s][a] * qsa)#后续求和所以乘以策略动作概率
new_v[s] = sum(qsa_list) # 状态s的价值等于动作价值之和
delta = max([abs(self.v[s] - new_v[s]) for s in range(self.env.ncol * self.env.nrow)])#这里使用最大范数差判断价值函数是否收敛,根据需要也可结合使用均方误差等其他方式
self.v = new_v # 更新价值函数
if delta < self.theta: # 价值函数收敛
break
print(f"策略评估{cnt1}次完成")
def policy_improvement(self):
""" 策略提升 """
for s in range(self.env.ncol * self.env.nrow):
qsa_list = []
for a in range(4):
qsa = 0
for res in self.env.P[s][a]:
p, next_state, reward, done = res
qsa += p * (reward + self.gamma * self.v[next_state]*(1-done))
qsa_list.append(qsa)#后续比较动作价值,不乘以策略动作概率
max_qsa = max(qsa_list)
cntq=qsa_list.count(max_qsa)
self.pi[s]=[1/cntq if q==max_qsa else 0 for q in qsa_list]#更新策略
return self.pi
def policy_iteration(self):
""" 策略迭代 """
cnt = 0
while True:
cnt += 1
old_pi = copy.deepcopy(self.pi) # 保存上一次的策略
self.policy_evaluation() # 策略评估
new_pi = self.policy_improvement() # 策略提升
print("策略提升!")
if old_pi == new_pi: # 策略不再变化,收敛
break
print(f"策略迭代{cnt}轮完成!")
return new_pi
def print_result(self):
""" 打印结果 """
print("状态价值:")
for i in range(self.env.nrow):
for j in range(self.env.ncol):
print("{:.3f}".format(self.v[i * self.env.ncol + j]).center(8), end="")
print('\n')
print("策略:")
actions = ['^', 'v', '<', '>']
for i in range(self.env.nrow):
for j in range(self.env.ncol):
if i == self.env.nrow - 1 and j == self.env.ncol - 1:#终点
print("goal".center(5), end="")
elif i == self.env.nrow - 1 and self.env.ncol-1>j > 0:#悬崖
print("x".center(5), end="")
else:
a=self.pi[i*self.env.ncol+j]
a_str=''.join( actions[i] if a[i]>0 else 'o' for i in range(len(a)) )
print(a_str.center(5), end="")
print('\n')

if __name__ == '__main__':
env = CliffWalkingEnv()
agent = PolicyIteration(env, theta=0.001, gamma=0.9)
agent.policy_iteration()
agent.print_result()

结果:
alt text

价值迭代

相比策略迭代,价值迭代直接迭代优化、更新状态价值函数V(s)直至收敛,再从最优状态价值函数V*(s)中返回一个确定的最优策略

根据贝尔曼最优方程,状态价值函数V*(s)的最优解为:

迭代更新方式:

价值迭代算法:
alt text

实验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class ValueIteration:
""" 策略迭代算法 """
def __init__(self, env, theta, gamma):
self.env = env
self.v = [0] * self.env.ncol * self.env.nrow # 初始化价值为0
self.pi = [[0.25, 0.25, 0.25, 0.25]
for i in range(self.env.ncol * self.env.nrow)] # 初始化为均匀随机策略
self.theta = theta # 策略评估收敛阈值
self.gamma = gamma # 折扣因子
def value_iteration(self):
""" 价值迭代 """
cnt1 = 0
while True:
cnt1 += 1
new_v = [0] * self.env.ncol * self.env.nrow
for s in range(self.env.ncol * self.env.nrow):
qsa_list = []#状态s下所有动作的价值列表
for a in range(4):#遍历s下所有4个动作
qsa = 0
for res in self.env.P[s][a]:#遍历状态s下动作a的所有可能到达状态
p, next_state, reward, done = res
qsa += p * (reward + self.gamma * self.v[next_state]*(1-done))#计算动作价值
qsa_list.append(qsa)#后续比较动作价值,不乘以策略动作概率
new_v[s] = max(qsa_list) # 价值迭代算法直接取最大动作价值进行状态价值函数更新
delta = max([abs(self.v[s] - new_v[s]) for s in range(self.env.ncol * self.env.nrow)])#这里使用最大范数差判断价值函数是否收敛,根据需要也可结合使用均方误差等其他方式
self.v = new_v # 更新价值函数
if delta < self.theta: # 价值函数收敛
break
print(f"价值迭代{cnt1}次完成")
self.get_policy() # 得到策略
def get_policy(self):
""" 从最优状态价值函数中得到策略 """
for s in range(self.env.nrow * self.env.ncol):
qsa_list = []
for a in range(4):
qsa = 0
for res in self.env.P[s][a]:
p, next_state, r, done = res
qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
qsa_list.append(qsa)
maxq = max(qsa_list)
cntq = qsa_list.count(maxq) # 计算有几个动作得到了最大的Q值
# 让这些动作均分概率
self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list]
def print_result(self):
""" 打印结果 """
print("状态价值:")
for i in range(self.env.nrow):
for j in range(self.env.ncol):
print("{:.3f}".format(self.v[i * self.env.ncol + j]).center(8), end="")
print('\n')
print("策略:")
actions = ['^', 'v', '<', '>']
for i in range(self.env.nrow):
for j in range(self.env.ncol):
if i == self.env.nrow - 1 and j == self.env.ncol - 1:#终点
print("goal".center(5), end="")
elif i == self.env.nrow - 1 and self.env.ncol-1>j > 0:#悬崖
print("x".center(5), end="")
else:
a=self.pi[i*self.env.ncol+j]
a_str=''.join( actions[i] if a[i]>0 else 'o' for i in range(len(a)) )
print(a_str.center(5), end="")
print('\n')

if __name__ == '__main__':
"""价值迭代算法"""
env = CliffWalkingEnv()
agent = ValueIteration(env, theta=0.001, gamma=0.9)
agent.value_iteration()
agent.print_result()

结果:

alt text

总结

策略迭代和价值迭代是两种动态规划的算法,都需要事先知道奖励函数和状态转移概率的环境模型,适用于有限的状态和动作空间。策略迭代通过迭代求解最优策略,价值迭代通过迭代求解最优状态价值函数。两者的区别在于,策略迭代依赖于策略评估和策略提升,价值迭代直接优化状态价值函数。

具体表现在编码上:

  • 策略迭代更新状态价值函数时取状态s所有动作价值的和(sum),策略提升时取状态s动作价值最大的动作均分概率(max),策略评估、策略提升两部分循环进行收敛;
  • 价值迭代更新状态价值函数时直接取状态s的最大的动作价值(价值),迭代收敛为最优状态价值函数V*(s),再还原出最优策略。
  • 策略迭代需要多轮迭代,价值迭代只需一轮。