强化学习中有两种重要的方法:Policy Gradients和Q-learning。其中Policy Gradients方法直接预测在某个环境下应该采起的Action,而Q-learning方法预测某个环境下全部Action的指望值(即Q值)。通常来讲,Q-learning方法只适合有少许离散取值的Action环境,而Policy Gradients方法适合有连续取值的Action环境。在与深度学习方法结合后,这两种算法就变成了DPG(Deterministic Policy Gradient)和DQN(Deep Q-learning Network),他们都是由DeepMind提出来的。DDPG(Deep Deterministic Policy Gradient)则是利用 DQN 扩展 Q 学习算法的思路对DPG方法进行改造获得的(Actor-Critic,AC)框架的算法,该算法可用于解决连续动做空间上的 DRL 问题。git
这里推荐莫烦大神的关于强化学习的代码总结,git地址:github
https://github.com/MorvanZhou/Reinforcement-learning-with-tensorflow/tree/master/contents算法
你们也能够搜索他的博客,里面有不少关于强化的系列文章,颇有价值。网络
一、gym环境框架
先安装openai的gym环境,gym是一个模拟游戏环境的小引擎,openai开发gym就是为了方便广大机器学习爱好者作一些游戏交互方面的机器学习尝试。dom
pip install gym安装完成后,试试gym能不能正常运行。机器学习
运行下面代码:函数
import gym # 建立一个小车倒立摆模型 env = gym.make('CartPole-v0') # 初始化环境 env.reset() # 刷新当前环境,并显示 env.render()
CartPole-v0’是小车游戏名称。出现上面这个窗口就表示gym环境已经能够正常运行了!学习
第二步,让小车动起来!优化
count = 0 for t in range(100): action = env.action_space.sample() #随机采样动做 print("action:%s"%(action)) observation, reward, done, info = env.step(action) #与环境交互,得到下一步的时刻 if done: break env.render() #绘制场景 count+=1 time.sleep(0.2) #每次等待0.2s print(count)
env.step(action)是让小车按照action的动做动起来,返回动起来以后的小车状态,回报值,是否结束等信息。
done=True表示本次游戏结束;observation是小车状态,reward是本次游戏动做的回报值。
二、Q-learning
Q-learning是一种基于Q值计算的强化学习,还有一种是基于策略梯度(Policy Gradient)的强化学习。
在Q-learning中,咱们维护一张Q值表,表的维数为:状态数S * 动做数A,表中每一个数表明在当前状态S下能够采用动做A能够得到的将来收益的折现和。咱们不断的迭代咱们的Q值表使其最终收敛,而后根据Q值表咱们就能够在每一个状态下选取一个最优策略。
其中更新的方法是用 Bellman Equation:
Q(S,a)=r+γ*max(Q(S',a')
其中,
S 表明当前的状态,a 表明当前状态所采起的行动,
S’ 表明这个行动所引发的下一个状态,a’ 是这个新状态时采起的行动,
r 表明采起这个行动所获得的奖励 reward,γ 是 discount 因子,
Q(S,a)称为Q-target,即咱们使用贝尔曼方程加贪心策略认为实际应该获得的奖励,咱们的目标就是使咱们的Q值不断的接近Q-target值。
Q-learning只要明白它的原理就能够了,具体算法的代码咱们不用太过于深究,由于如今直接用Q-learning的场景很是很是的少,重点能够看下DQN的具体实现。
三、深度Q网络(DQN)
为何会出现DQN呢
在普通的Q-learning中,当状态和动做空间是离散且维数不高时可以使用Q-Table储存每一个状态动做对的Q值,而当状态和动做空间是高维连续时,使用Q-Table不现实。
如何将原始的Q-learning转换成深度学习问题
将Q-Table的更新问题变成一个函数拟合问题,相近的状态获得相近的输出动做。以下式,经过更新参数 θ 使Q函数逼近最优Q值 。所以,DQN就是要设计一个神经网络结构,经过函数来拟合Q值。DQN网络输入状态s,输出动做action。
这里的Deep即一样使用DQN中的经验池和双网络结构来促进神经网络可以有效学习。DQN的主要特色:
1、经过Q-Learning使用reward来构造标签。
2、经过experience replay(经验池)的方法来解决相关性及非静态分布问题。
3、使用一个神经网络产生当前Q值,使用另一个神经网络产生Target Q值。
经验回放
经验池的功能主要是解决相关性及非静态分布问题。具体作法是把每一个时间步agent与环境交互获得的转移样本 (st,at,rt,st+1) 储存到回放记忆单元,要训练时就随机拿出一些(minibatch)来训练。(其实就是将游戏的过程打成碎片存储,训练时随机抽取就避免了相关性问题)。
s是当前状态,a是当前动做,r是回报值,s_是下阶段状态。经验存储的代码以下。
def store_transition(self,s,a,r,s_):
if not hasattr(self, 'memory_counter'):
self.memory_counter = 0
# hstack:Stack arrays in sequence horizontally
transition = np.hstack((s,[a,r],s_))
index = self.memory_counter % self.memory_size
self.memory[index,:] = transition
self.memory_counter += 1
双网络结构
DQN使用单个网络来进行选择动做和计算目标Q值;Nature DQN使用了两个网络,一个当前主网络用来选择动做,更新模型参数,另外一个目标网络用于计算目标Q值,两个网络的结构是如出一辙的。目标网络的网络参数不须要迭代更新,而是每隔一段时间从当前主网络复制过来,即延时更新,这样能够减小目标Q值和当前的Q值相关性。Nature DQN和DQN相比,除了用一个新的相同结构的目标网络来计算目标Q值之外,其他部分基本是彻底相同的。
Nature DQN的实现流程以下:
(1)首先构建神经网络,一个主网络,一个目标网络,他们的输入都为obervation,输出为不一样action对应的Q值。
(2)在一个episode结束时(游戏胜利或死亡),将env重置,即observation恢复到了初始状态observation,经过贪婪选择法ε-greedy选择action。根据选择的action,获取到新的next_observation、reward和游戏状态。将[observation, action, reward, next_observation, done]放入到经验池中。经验池有必定的容量,会将旧的数据删除。
(3)从经验池中随机选取batch个大小的数据,计算出observation的Q值做为Q_target。对于done为False的数据,使用reward和next_observation计算discount_reward。而后将discount_reward更新到Q_traget中。
(4)每个action进行一次梯度降低更新,使用MSE做为损失函数。注意参数更新不是发生在每次游戏结束,而是发生在游戏进行中的每一步。
(5)每一个batch咱们更新参数epsilon,egreedy的epsilon是不断变小的,也就是随机性不断变小。
(6)每隔固定的步数,从主网络中复制参数到目标网络。
引入target_net后,再一段时间里目标Q值使保持不变的,必定程度下降了当前Q值和目标Q值的相关性,提升了算法稳定性。
接下来,咱们重点看一下咱们DQN相关的代码。
定义相关输入
这了,咱们用s表明当前状态,用a表明当前状态下采起的动做,r表明得到的奖励,s_表明转移后的状态。
self.s = tf.placeholder(tf.float32,[None,self.n_features],name='s')
self.s_ = tf.placeholder(tf.float32,[None,self.n_features],name='s_')
self.r = tf.placeholder(tf.float32,[None,],name='r')
self.a = tf.placeholder(tf.int32,[None,],name='a')
双网络DQN代码
w_initializer, b_initializer = tf.random_normal_initializer(0., 0.3), tf.constant_initializer(0.1) # ------------------ build evaluate_net ------------------ with tf.variable_scope('eval_net'): e1 = tf.layers.dense(self.s,20,tf.nn.relu,kernel_initializer=w_initializer, bias_initializer=b_initializer,name='e1' ) self.q_eval = tf.layers.dense(e1,self.n_actions,kernel_initializer=w_initializer, bias_initializer=b_initializer,name='q') # ------------------ build target_net ------------------ with tf.variable_scope('target_net'): t1 = tf.layers.dense(self.s_, 20, tf.nn.relu, kernel_initializer=w_initializer, bias_initializer=b_initializer, name='t1') self.q_next = tf.layers.dense(t1, self.n_actions, kernel_initializer=w_initializer, bias_initializer=b_initializer, name='t2')
网络结构也能够本身定义,总的原则就是输入是状态,输出是N个action的可能性。
每隔必定的步数,咱们就要将target_net中的参数复制到eval_net中:
t_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,scope='target_net')
e_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,scope='eval_net')
with tf.variable_scope('soft_replacement'):
self.target_replace_op = [tf.assign(t,e) for t,e in zip(t_params,e_params)]
计算损失并优化
首先,对于eval_net来讲,咱们只要获得当前的网络输出便可,可是咱们定义的网络输出是N个动做对应的q-eval值,咱们要根据实际的a来选择对应的q-eval值,这一部分的代码以下:
with tf.variable_scope('q_eval'):
a_indices = tf.stack([tf.range(tf.shape(self.a)[0], dtype=tf.int32), self.a], axis=1)
# 用indices从张量params获得新张量
# 这里self.q_eval是batch * action_number,a_indices是batch * 1,也就是说选择当前估计每一个动做的Q值
self.q_eval_wrt_a = tf.gather_nd(params=self.q_eval, indices=a_indices)
第一部分的R咱们是已经获得了的,剩下的就是根据贪心策略选择N个输出中最大的一个便可:
with tf.variable_scope('q_target'):
q_target = self.r + self.gamma * tf.reduce_max(self.q_next,axis=1,name='Qmax_s_')
# 一个节点被 stop以后,这个节点上的梯度,就没法再向前BP了
self.q_target = tf.stop_gradient(q_target)
接下来,咱们就能够定义咱们的损失函数并选择优化器进行优化:
with tf.variable_scope('loss'): self.loss = tf.reduce_mean(tf.squared_difference(self.q_target,self.q_eval_wrt_a,name='TD_error')) with tf.variable_scope('train'): self._train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss)
网络的训练
每隔必定的步数,咱们就要将eval_net中的参数复制到target_net中,同时咱们要从经验池中选择batch大小的数据输入到网络中进行训练。
def learn(self): if self.learn_step_counter % self.replace_target_iter == 0: self.sess.run(self.target_replace_op) print('\ntarget_params_replaced\n') if self.memory_counter > self.memory_size: sample_index = np.random.choice(self.memory_size,size=self.batch_size) else: sample_index = np.random.choice(self.memory_counter,size = self.batch_size) batch_memory = self.memory[sample_index,:] _,cost = self.sess.run( [self._train_op,self.loss], feed_dict={ self.s:batch_memory[:,:self.n_features], self.a:batch_memory[:,self.n_features], self.r:batch_memory[:,self.n_features+1], self.s_:batch_memory[:,-self.n_features:] } )