时序差分强化学习.本章将会结合一些实例来加深读者的理解.
的一个状态序列信息可以表示为如下的一个序列(即在初始状态执行某动作,获得离开该状态的即时奖励,到达下一个状态
):
时刻状态
的收获可以表述为:
时刻为终止时刻.该策略下某一状态
的价值:
时,先计算
与先前平均值
的差,再将这个差值乘以一定的系数
后作为误差对旧平均值进行修正.如果该式中平均值和新数据分别看成是状态的价值和该状态的收获,那么该公式就变成了递增式蒙特卡洛法更新状态价值.公式如下:
来代替状态计数的倒数,公式变成:
与下一时刻状态
的预估状态价值乘以衰减系数
组成,这符合贝尔曼方程的描述:
称为TD目标值.
称为TD误差.
的过程
)开始往序列终止状态方向观察至状态
,使用这n个状态产生的即时奖励(
)以及状态
的预估价值来计算当前的状态
的价值
时对应的预测值如下表所示.从该表可以看出,MC学习是基于
步预测的
,并定义
收获为:
的所有步收获的权重之和.其中,任意一个n-步收获的权重被设计为
,如下图所示.
收获的计算公式为:
)被描述为:
)对于n-收获的权重分配,左侧阴影部分是3-步收获的权重值,随着n的增大,其n-收获的权重呈几何级数的衰减.当在T时刻到达终止状态时,未分配的权重(右侧阴影部分)全部给予终止状态的实际收获值.如此设计可以使一个完整的状态序列中所有n-步收获的权重加起来为1,离当前状态越远的收获其权重越小.
)的设计使得在一个episode中,后一个状态的状态价值与之前所有状态的状态价值有关,同时也可以说成是一个状态价值参与决定了后续所有状态的状态价值。但是每个状态的价值对于后续状态价值的影响权重是不同的。
:TD
的设计使得在状态序列中,一个状态的价值
由
得到,而后者又间接由所有后续状态价值计算得到,因此可以认为更新一个状态的价值需要知道所有后续状态的价值.也就是说,必须要经历完整的状态序列获得包括终止状态的每一个状态的即时奖励才能更新当前状态的价值.这和MC算法的要求一样,因此TD
算法有着和MC算法一样的劣势.
取值区间为
,当
对应的就是MC算法.这个给实际计算带来了不便.这里可以用一个例子方便大家理解:前向认识就假设一个人坐在状态流上拿着望远镜看向前方,前方是那些将来的状态。当估计当前状态的值函数时,从TD(λ)的定义中可以看到,它需要用到将来时刻的值函数。
为TD
算法进行在线实时单步更新学习提供了理论依据.为了解释这一点,需要引入效用迹这个概念.我们通过一个之前的例子来解释这个问题,如下图所示:
)是一个真判断表达式,表示当
时取值为1,其余条件下取值为0.
发生,那么
对应的效用迹的值就加1,如果在某一段时间
未发生,则按照某个衰减因子进行衰减,这也就是上面的效用迹的计算公式了。
都保存了一个效用迹。我们可以将效用迹理解为一个权重,状态
被访问的时间离现在越久远,其对于值函数的影响就越小,状态
被访问的次数越少,其对于值函数的影响也越小。同样用一个例子说明:有个人坐在状态流上,手里拿着话筒,面朝着已经经历过的状态获得当前回报并利用下一个状态的值函数得到TD偏差之后,此人会向已经经历过的状态喊话告诉这些已经经历过的状态处的值函数需要利用当前时刻的TD偏差进行更新。此时过往的每个状态值函数更新的大小应该跟距离当前状态的步数有关。
时,
一直成立,此时价值更新等同于TD(0)算法:
= 1时,在每完成一个状态序列后更新状态价值时,其完全等同于MC学习;但在引入了效用迹后,可以每经历一个状态就更新状态的价值,这种实时更新的方法并不完全等同于MC.
时,在每完成一个状态序列后更新价值时,基于前向认识的TD(
)与基于反向认识的TD(
)完全等效;不过在进行在线实时学习时,两者存在一些差别.
from random import shuffle
from queue import Queue
from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.mplot3d import Axes3D
from utils import str_key,set_dict,get_dict
class Gamer():
'''游戏者'''
def __init__(self, name = "", A = None, display = False):
self.name = name
self.cards = [] #手中的牌
self.display = display #是否显示对局文字信息
self.policy = None #策略
self.lerning_method = None #学习方法
self.A = A #行为空间
def __str__(self):
return self.name
def _value_of(self, card):
'''
根据牌的字符判断牌的数值大小,A被输出为1,JQK均为10,其余按牌字符对应的数字取值
:param card: 牌面信息str
:return: 牌的大小数值int, A返回1
'''
try:
v = int(card)
except:
if card == "A":
v = 1
elif card in ['J','Q','K']:
v = 10
else:
v = 0
finally:
return v
def get_points(self):
'''
统计一手牌分值,如果使用了A的1点,同时返回True
:return:tuple(返回牌的总点数,是否使用了可复用的Ace)
例如['A','10','3'] 返回 (14,False)
['A','10'] 返回(21, True)
'''
num_of_uneable_ace = 0 #默认没有拿到Ace
total_point = 0 #总值
cards = self.cards
if cards is None:
return 0, False
for card in cards:
v = self._value_of(card)
if v == 1:
num_of_uneable_ace += 1
v = 11
total_point += v
while total_point > 21 and num_of_uneable_ace > 0:
total_point -= 10
num_of_uneable_ace -= 1
return total_point,bool(num_of_uneable_ace)
def receive(self, cards = []):
'''
玩家获得一张或者多张牌
:param cards: 玩家获得的牌
:return: None
'''
cards = list(cards)
for card in cards:
self.cards.append(card)
def discharge_cards(self):
'''
玩家把手中的牌清空,扔牌
:return: None
'''
self.cards.clear()
def cards_info(self):
'''
展示牌面具体信息
:return: None
'''
self._info("{}{}现在的牌:{}\n".format(self.role,self,self.cards))
def _info(self,msg):
if self.display:
print(msg, end="")
class Dealer(Gamer):
'''
庄家
'''
def __init__(self, name = "", A = None, display = False):
super(Dealer,self).__init__(name, A, display)
self.role = "庄家" #角色
self.policy = self.dealer_policy #庄家策略
def first_card_value(self):
'''
显示第一张明牌
:return: 明牌的点数
'''
if self.cards is None or len(self.cards) == 0:
return 0
return self._value_of(self.cards[0])
def dealer_policy(self, Dealer = None):
'''
庄家策略的细节
:param Dealer:庄家
:return:庄家的策略(行为)
'''
action = ""
dealer_points, _ = self.get_points()
if dealer_points >= 17:
action = self.A[1] #停止叫牌
else:
action = self.A[0] #继续叫牌
return action
class Player(Gamer):
'''
玩家
'''
def __init__(self, name = "", A = None, display = False):
super(Player, self).__init__(name, A, display)
self.policy = self.native_policy
self.role = "玩家"
def get_state(self, dealer):
dealer_first_card_value = dealer.first_card_value()
player_points, useable_ace = self.get_points()
return dealer_first_card_value, player_points, useable_ace
def get_state_name(self, dealer):
return str_key(self.get_state(dealer))
def native_policy(self, dealer = None):
player_points, _ = self.get_points()
if player_points < 20:
action = self.A[0]
else:
action = self.A[1]
return action
class Arena():
'''
负责游戏管理
'''
def __init__(self, display = None, A = None):
self.cards = ['A','2','3','4','5','6','7','8','9','10','J','Q',"K"] * 4
self.card_q = Queue(maxsize=52) #发牌器,里面是洗好的牌
self.cards_in_pool = [] #已经用过的公开的牌
self.display = display
self.episodes = [] #产生的对局信息列表
self.load_cards(self.cards) #把初始状态的52张牌装入发牌器
self.A = A #获得行为空间
def load_cards(self, cards):
'''
把收集的牌洗一洗,重新装到发牌器中
:param cards: 要装入发牌器的多张牌list
:return: None
'''
shuffle(cards) #洗牌
for card in cards: #deque数据结构只能一个一个添加
self.card_q.put(card)
cards.clear()#把原来的牌清空
return
def reward_of(self, dealer, player):
'''
判断玩家奖励值,附带玩家,庄家的牌点信息
:param dealer: 庄家
:param player:玩家
:return: tuple 奖励值 玩家点数 庄家点数 是否使用A
'''
dealer_points, _ = dealer.get_points()
player_points, useable_ace = player.get_points()
if player_points > 21:
reward = -1
else:
if player_points > dealer_points or dealer_points > 21:
reward = 1
elif player_points == dealer_points:
reward = 0
else:
reward = -1
return reward, player_points, dealer_points, useable_ace
def serve_card_to(self, player, n=1):
'''
给庄家或玩家发牌,如果牌不够则将公开牌池的牌洗一洗重新发牌
:param player: 一个庄家或玩家
:param n: 一次连续发牌的数量
:return:None
'''
cards = [] #将要发出的牌
for _ in range(n):
#要考虑发牌器没有牌的情况
if self.card_q.empty():
self._info("\n发牌器没牌了,整理废牌,重新发牌;")
shuffle(self.cards_in_pool)
self._info("一共整理了{}张已用牌, 重新放入发牌器\n".format(len(self.cards_in_pool)))
assert (len(self.cards_in_pool) > 20)
# 确保一次能收集较多的牌
# 代码编写不合理时,可能会出现即使某一玩家爆点了也还持续地叫牌,会导致玩家手中的牌变多而发牌器和
# 已使用的牌都很少,需避免这种情况
self.load_cards(self.cards_in_pool) # 将收集来的用过的牌洗好送入发牌器重新使用
cards.append(self.card_q.get()) # 从发牌器发出一张牌
self._info("发了{}张牌({})给{}{}".format(n, cards, player.role,player))
player.receive(cards) #某玩家接受发出的牌
player.cards_info()
def _info(self, message):
if self.display:
print(message, end = "")
def recycle_cards(self, *players):
'''
回收玩家手中的牌到公开使用过的牌池中
:param players: 一个玩家或庄家
:return:None
'''
if len(players) == 0:
return
for player in players:
for card in player.cards:
self.cards_in_pool.append(card)
player.discharge_cards()#玩家不再留有这些牌
def play_game(self, dealer, player):
'''
玩一局21点,生成一个状态序列以及最终奖励(中间奖励为0)
:param dealer: 庄家
:param player: 玩家
:return: tuple: episode, reward
'''
self._info("=============开始新一局=============\n")
self.serve_card_to(player, n=2) #发两张牌给玩家
self.serve_card_to(dealer, n=2) #发两张牌给庄家
episode = [] #记录一个对局信息
if player.policy is None:
self._info("玩家需要一个策略")
return
if dealer.policy is None:
self._info("庄家需要一个策略")
return
while True:
action = player.policy(dealer)
# 玩家的策略产生一个行为
self._info("{}{}选择:{};".format(player.role, player, action))
episode.append((player.get_state_name(dealer), action)) #记录一个(s,a)
if action == self.A[0]: #继续叫牌
self.serve_card_to(player) # 发一张牌给玩家
else: #停止叫牌
break
#玩家停止叫牌后要计算下玩家手中的点数,玩家如果爆了,庄家就不用继续了
reward, player_points, dealer_points, useable_ace = self.reward_of(dealer, player)
if player_points > 21:
self._info("玩家爆点{}输了,得分:{}\n".format(player_points,reward))
self.recycle_cards(player, dealer)
self.episodes.append((episode, reward)) #预测的时候需要行为episode list后集中学习V
# 在蒙特卡洛控制的时候, 可以不需要episodes list,生成一个episode学习一个,下同
self._info("===============本局结束===============")
return episode, reward
#玩家并没有超过21点
self._info("\n")
while True:
action = dealer.policy() #庄家从其策略中获取一个行为
self._info("{}{}选择:{};".format(dealer.role,dealer,action))
#状态只记录庄家第一张牌信息,此时玩家不再叫牌,(s,a)不必重复记录
if action == self.A[0]: #庄家继续叫牌
self.serve_card_to(dealer)
else:
break;
#双方均停止叫牌了
self._info("\n双方均停止叫牌")
reward, player_points, dealer_points,useable_ace = self.reward_of(dealer,player)
player.cards_info()
dealer.cards_info()
if reward == +1:
self._info("玩家赢了!")
elif reward == -1:
self._info("玩家输了!")
else:
self._info("双方和局!")
self._info("玩家{}点,庄家{}点\n".format(player_points, dealer_points))
self._info("=================本局结束==================")
self.recycle_cards(player, dealer) #回收玩家和庄家手中的牌至公开牌池
self.episodes.append((episode, reward)) #将刚才产生的完整对局添加值状态序列列表,蒙特卡洛控制不需要
return episode, reward
def play_games(self, dealer, player, num = 2, show_statistic = True):
'''
一次性玩多局游戏
:param dealer:
:param player:
:param num:
:param show_statistic:
:return:
'''
results = [0, 0, 0] #玩家负, 和, 胜局数
self.episodes.clear()
for i in tqdm(range(num)):
episode, reward = self.play_game(dealer, player)
results[1 + reward] += 1
if player.lerning_method is not None:
player.lerning_method(episode, reward)
if show_statistic:
print("共玩了{}局,玩家赢{}局,和{}局,输{}局,胜率:{:.2f},不输率:{:.2f}"\
.format(num, results[2], results[1], results[0], results[2]/num,\
(results[2] + results[1]) / num))
return
def _info(self, message):
if self.display:
print(message, end="")
A = ["继续叫牌","停止叫牌"]
display = False
#创建一个玩家一个庄家,玩家使用原始策略,庄家使用其固定的策略
player = Player(A = A, display = display)
dealer = Dealer(A = A, display = display)
#创建一个场景
arena = Arena(A = A, display = display)
#生成num个完整的对局
arena.play_games(dealer,player,num=200000)
# 共玩了200000局,玩家赢58564局,和11369局,输130067局,胜率:0.29,不输率:0.35
# 100%|██████████| 200000/200000 [00:17<00:00, 11140.95it/s]
def policy_evaluate(episodes, V, Ns):
'''
统计每个状态的价值,衰减因子为1,中间状态的即时奖励为0,递增式蒙特卡洛评估
:param episodes: 状态序列
:param V:状态价值字典
:param Ns:状态被访问的次数节点
:return:
'''
for episode, r in episodes:
for s, a in episode:
ns = get_dict(Ns, s)
v = get_dict(V, s)
set_dict(Ns, ns+1, s)
set_dict(V, v+(r-v)/(ns+1), s)
V = {} #状态价值字典
Ns = {}#状态被访问的次数节点
policy_evaluate(arena.episodes, V, Ns) #学习V值
def draw_value(value_dict, useable_ace = 0, is_q_dict = False, A = None):
# 定义figure
fig = plt.figure()
# 将figure变为3d
ax = Axes3D(fig)
# 定义x, y
x = np.arange(1, 11, 1) # 庄家第一张牌
y = np.arange(12, 22, 1) # 玩家总分数
# 生成网格数据
X, Y = np.meshgrid(x, y)
# 从V字典检索Z轴的高度
row, col = X.shape
Z = np.zeros((row,col))
if is_q_dict:
n = len(A)
for i in range(row):
for j in range(col):
state_name = str(X[i,j])+"_"+str(Y[i,j])+"_"+str(useable_ace)
if not is_q_dict:
Z[i,j] = get_dict(value_dict, state_name)
else:
assert(A is not None)
for a in A:
new_state_name = state_name + "_" + str(a)
q = get_dict(value_dict, new_state_name)
if q >= Z[i,j]:
Z[i,j] = q
# 绘制3D曲面
ax.plot_surface(X, Y, Z, rstride = 1, cstride = 1, cmap = plt.cm.cool)
plt.show()
draw_value(V, useable_ace=True, A = A)#绘制有可用的A时状态价值图
draw_value(V, useable_ace=False, A = A)#绘制无可用的A时状态价值图
display = True
player.display, dealer.display, arena.display = display,display,display
arena.play_games(dealer,player, num=2)
# =============开始新一局=============
# 发了2张牌(['2', 'Q'])给玩家玩家现在的牌:['2', 'Q']
# 发了2张牌(['J', 'A'])给庄家庄家现在的牌:['J', 'A']
# 玩家选择:继续叫牌;发了1张牌(['10'])给玩家玩家现在的牌:['2', 'Q', '10']
# 玩家选择:停止叫牌;玩家爆点22输了,得分:-1
# ===============本局结束===============共玩了2局,玩家赢0局,和0局,输1局,胜率:0.00,不输率:0.00
# =============开始新一局=============
# 发了2张牌(['5', 'Q'])给玩家玩家现在的牌:['5', 'Q']
# 发了2张牌(['5', '5'])给庄家庄家现在的牌:['5', '5']
# 玩家选择:继续叫牌;发了1张牌(['9'])给玩家玩家现在的牌:['5', 'Q', '9']
# 玩家选择:停止叫牌;玩家爆点24输了,得分:-1
# ===============本局结束===============共玩了2局,玩家赢0局,和0局,输2局,胜率:0.00,不输率:0.00