Advertisement

(Deep Reinforcement Learning with Double Q-learning, H. van Hasselt et al., arXiv, 2015)(dqn)练习

阅读量:

代码使用python 2.x版本 ,tensorflow 使用1.1(cpu)版本

论文地址:https://arxiv.org/abs/1509.06461

===============第一个文件 replay_memory.py================================

复制代码
    import numpy as np
    
    
    MEMORYSIZE = 600000
    
    class Replay_memory:
    def __init__(self):
    
        self.previous_state = np.empty((MEMORYSIZE, 4), dtype=np.float32)
        self.action = np.empty((MEMORYSIZE, 1), dtype=np.uint8)#0 is the 1st action,1 is the 2nd action
        self.reward = np.empty((MEMORYSIZE, 1), dtype=np.float32)
        self.next_state = np.empty((MEMORYSIZE, 4), dtype=np.float32)
        self.terminal = np.empty((MEMORYSIZE, 1), dtype=np.bool)
    
        self.index = 0
        self.full_memory = False
    
    def memory_in(self, previous_state, action, reward, next_state, terminal):
        self.previous_state[self.index] = previous_state
        self.action[self.index] = action
        self.reward[self.index] = reward
        self.next_state[self.index] = next_state
    
        self.terminal[self.index] = terminal
    
        self.index += 1
        if self.index == MEMORYSIZE:
            self.index = 0
            self.full_memory = True
    
    
    def memory_out(self,size_minibatch):
        minib_previous_state = []
        minib_action = []
        minib_reward = []
        minib_next_state = []
        minib_terminal = []
    
        if self.full_memory:
            index_sample = np.random.randint(0,MEMORYSIZE,size=size_minibatch).tolist()
        else:
            index_sample = np.random.randint(0, self.index, size=size_minibatch).tolist()
    
        for i in index_sample:
            minib_previous_state.append(self.previous_state[i])
            minib_action.append(self.action[i])
            minib_reward.append(self.reward[i])
            minib_next_state.append(self.next_state[i])
            minib_terminal.append(self.terminal[i])
    
        rs_minib_previous_state=np.asarray(minib_previous_state)
        rs_minib_action=np.asarray(minib_action)
        rs_minib_reward=np.asarray(minib_reward)
        rs_minib_next_state=np.asarray(minib_next_state)
        rs_minib_terminal=np.asarray(minib_terminal)
        #return 5 np_mats with shape(size_minibatch,num_fea)
        return rs_minib_previous_state, rs_minib_action, rs_minib_reward, rs_minib_next_state, rs_minib_terminal
    
    def test_mempry_in(self):
        for i in range(100):
            self.memory_in([1., 1., 1., 1.], [0], [0.1], [1., 1., 1., 1.], [False])
            #self.memory_in([1., 1., 1., 1.], [1], [0.1], [1., 1., 1., 1.], [False])
            #self.memory_in([1., 1, 1., 1.], [0], [-1], [1., 1., 1., 1.], [True])
    
    
    
    #test#test#test#test#test#test#test#test#test#test#test#test
    '''
    if __name__ == "__main__":
    rm = Replay_memory()
    for i in range(10):
        rm.memory_in((1., 2., 3., 4.), [1], [0.1], [1., 2., 3., 4.], True)
        rm.memory_in((2, 2, 3, 4), [0], [0.1], [2, 2, 3, 4], False)
        rm.memory_in((3, 2, 3, 4), [1], [0.1], [3, 2, 3, 4], False)
    s,a,r,ss,t = rm.memory_out(32)
    print ss
    
    '''
    
    if __name__ == "__main__":
    rm = Replay_memory()
    rm.test_mempry_in()
    s,a,r,ss,t = rm.memory_out(32)
    print ss

Replay_memory主要有两个方法:memory_in和memory_out,分类用来往Replay_memory放置经验和从Replay_memory中提取经验。为了效率,使用numpy数组实现,而没有使用deque。方法test_mempry_in是测试时使用的,使用该方法后,将加载一些经验进入Replay_memory。

===========第二个文件 nn.py============================

复制代码
    import tensorflow as tf
    import math
    
    class Fcnn:
    def __init__(self):
        self.batch_size = 32
        self.h1_size = 20
    
        self.input = tf.placeholder(tf.float32, [None,4])
        self.action = tf.placeholder(tf.uint8,[None,1])
    
        self.create_and_init_var(self.h1_size)
        self.Q_net_forward()
    
        self.var_list = [self.Q_net_l1_w,self.Q_net_l1_b,self.Q_net_l2_w,self.Q_net_l2_b]
    
    def create_and_init_var(self,H1_SIZE):
        self.Q_net_l1_w = self.init_w([4, H1_SIZE], 0.01)
        self.Q_net_l1_b = self.init_b([H1_SIZE])
        self.Q_net_l2_w = self.init_w([H1_SIZE, 2], 0.01)
        self.Q_net_l2_b = self.init_b([2])
    
    def test_create_and_init_var(self,H1_SIZE):
        self.Q_net_l1_w = self.test_init_w_1([4, H1_SIZE], 0.001)
        self.Q_net_l1_b = self.test_init_b_1([H1_SIZE])
        self.Q_net_l2_w = self.test_init_w_1([H1_SIZE, 2], 0.01)
        self.Q_net_l2_b = self.test_init_b_1([2])
    
    
    def Q_net_forward(self):
        fc1 = tf.nn.relu(tf.matmul(self.input,self.Q_net_l1_w)+self.Q_net_l1_b)
        Q_value = tf.matmul(fc1, self.Q_net_l2_w) + self.Q_net_l2_b#shape is [batch_size,2]
        self.TEST_Q_value = Q_value
        self.Q_action = tf.expand_dims(tf.arg_max(Q_value,dimension=1),dim=1) #shape is [batch_size,1]
        self.Q_value = tf.reduce_sum(tf.multiply(Q_value,tf.one_hot(tf.squeeze(self.action,squeeze_dims=[1]),2)),reduction_indices=1)  #shape is [batch_size]
    
    def init_w(self, shape,stddev):
        return tf.Variable(tf.truncated_normal(shape,stddev=stddev))
    
    def init_b(self, shape):
        return tf.Variable(tf.ones(shape)*0.01)
    
    def test_init_w_1(self,shape,stddev):
        return tf.Variable(tf.ones(shape))
    
    def test_init_b_1(self,shape):
        return tf.Variable(tf.ones(shape))
    
    #test#test#test#test#test#test#test#test
    if __name__ == "__main__":
    import numpy as np
    a = Fcnn()
    
    sess = tf.Session()
    init = tf.initialize_all_variables()
    sess.run(init)
    print sess.run([a.Q_action,a.TEST_Q_value],feed_dict={a.input:np.array([[-1,-1,-1,-1],[1,1,1,1]]).reshape([2,4])})
    #print sess.run([a.Q_value,a.TEST_Q_value],feed_dict={a.input:np.array([1,2,3,4]).reshape([1,4]),a.action:np.array([0]).reshape([1,1])})
    
    sess.close()

类Fcnn 是构造一个全链接的神经网络,并实现两张计算图(有公共的部分),一张图是输入状态,得到Q值最大的动作序号,另一张图是输入状态和动作序号,得到对应动作序号的Q值。以‘test’开头的方法都是测试时使用的,先不要管。

=======================第三个文件 double_dqn.py===========================

复制代码
    from nn import Fcnn
    import tensorflow as tf
    
    
    class Double_dqn:
    def __init__(self):
    
        self.a_net = Fcnn()
        self.b_net = Fcnn()
        self.gamma = 0.90
    
        self.reward = tf.placeholder(tf.float32,[None,1])
        self.terminal = tf.placeholder(tf.bool,[None,1])
    
        self.update_a_net()
        self.update_b_net()
    
    def update_a_net(self):
    
        self.a_action_next_state = self.a_net.Q_action #shape is [batch_size,1]
        a_q_value = self.a_net.Q_value   #shape is [batch_size]
        a_td_value = self.reward + tf.multiply(tf.to_float(tf.logical_not(self.terminal)),self.gamma * self.b_net.Q_value)  #shape is [batch_size,1]
        self.a_net_cost = tf.clip_by_value(tf.reduce_mean(tf.square(a_td_value - a_q_value)),-1.,1.)
        #self.a_net_cost = tf.reduce_mean(tf.square(a_td_value - a_q_value))
    
    def update_b_net(self):
    
        self.b_action_next_state = self.b_net.Q_action  #shape is [batch_size,1]
        b_q_value = self.b_net.Q_value  # shape is [batch_size]
        b_td_value = self.reward + tf.multiply(tf.to_float(tf.logical_not(self.terminal)),self.gamma * self.a_net.Q_value)  #shape is [batch_size]
        self.b_net_cost = tf.clip_by_value(tf.reduce_mean(tf.square(b_td_value - b_q_value)),-1.,1.)
        #self.b_net_cost = tf.reduce_mean(tf.square(b_td_value - b_q_value))
    
    #test#test#test#test#test#test#test#test
    if __name__ == "__main__":
    from replay_memory import Replay_memory
    rm = Replay_memory()
    rm.test_mempry_in()
    DD = Double_dqn()
    
    s,a,r,ss,t = rm.memory_out(32)
    sess = tf.Session()
    init = tf.initialize_all_variables()
    sess.run(init)
    
    td_action = sess.run(DD.a_action_next_state,feed_dict={
                                                    DD.a_net.input:ss})
    
    print sess.run(DD.b_net_cost,feed_dict={
                                    DD.a_net.input:  s,
                                    DD.a_net.action: a,
                                    DD.b_net.input:  ss,
                                    DD.b_net.action: td_action,
                                    DD.reward:       r,
                                    DD.terminal:     t})
    sess.close()

类Double_dqn对于每一个网络都实现两个计算图,分别是
1 根据经验回放里的s^`,计算Q值网络对应的动作序号,作为TD网络的动作,对应self.a_action_next_state和self.b_action_next_state
2 根据经验回放里的s,a,r,s^`,t以及上面计算的出的TD动作的序号a^`,更新Q值网络,对应self.a_net_cost和self.b_net_cost

======================================第四个文件train.py=================

复制代码
    import tensorflow as tf
    from double_dqn import Double_dqn
    from replay_memory import Replay_memory
    import gym
    import random
    import numpy as np
    
    
    class Train:
    def __init__(self):
        self.START_E_GREEDY = 0.6
        self.END_E_GREEDY = 0.98
        self.LEARN_RATE = 0.0001
    
        self.POSITIVE_REWARD = 0.01
        self.NEGATIVE_REWARD = -1.
        self.rm = Replay_memory()
        self.DD = Double_dqn()
    
        self.e = self.START_E_GREEDY
        self.sess = tf.Session()
        self.a_adam_train_step = tf.train.AdamOptimizer(self.LEARN_RATE).minimize(self.DD.a_net_cost,
                                                                                  var_list=self.DD.a_net.var_list)
        self.b_adam_train_step = tf.train.AdamOptimizer(self.LEARN_RATE).minimize(self.DD.b_net_cost,
                                                                                  var_list=self.DD.b_net.var_list)
        self.sess.run(tf.global_variables_initializer())
        self.add_summary()
        self.merged = tf.summary.merge_all()
        self.writer = tf.summary.FileWriter('/home/wd/tf/summary')
        self.env = gym.make('CartPole-v0')
        self.done = False
    
    def a_generate_memory(self,observation):
        action = self.sess.run(self.DD.a_net.Q_action,
                               feed_dict={self.DD.a_net.input:np.asarray(observation).reshape((1,4))})
        greedy_action = self.egreedy_action(action)#output is int
        observation_next, _, self.done, __ = self.env.step(greedy_action)
        self.rm.memory_in(observation,
                          greedy_action,
                          self.NEGATIVE_REWARD if self.done else self.POSITIVE_REWARD,
                          observation_next,
                          self.done)
    
        return observation_next
    
    def b_generate_memory(self,observation):
        action = self.sess.run(self.DD.b_net.Q_action,
                               feed_dict={self.DD.b_net.input:np.asarray(observation).reshape((1,4))})
        greedy_action = self.egreedy_action(action)#output is int
        observation_next, _, self.done, __ = self.env.step(greedy_action)
        self.rm.memory_in(observation,
                          greedy_action,
                          self.NEGATIVE_REWARD if self.done else self.POSITIVE_REWARD,
                          observation_next,
                          self.done)
    
        return observation_next
    
    def a_train_model(self):
        s, a, r, ss, t = self.rm.memory_out(32)
        a_td_action = self.sess.run(self.DD.a_action_next_state, feed_dict={self.DD.a_net.input: ss})
        summary,_ = self.sess.run([self.merged,self.a_adam_train_step],
                      feed_dict={self.DD.a_net.input:  s,
                                 self.DD.a_net.action: a,
                                 self.DD.b_net.input:  ss,
                                 self.DD.b_net.action: a_td_action,
                                 self.DD.reward:       r,
                                 self.DD.terminal:     t})
        return summary
    
    def b_train_model(self):
        s, a, r, ss, t = self.rm.memory_out(32)
        b_td_action = self.sess.run(self.DD.b_action_next_state, feed_dict={self.DD.b_net.input: ss})
        summary,_ = self.sess.run([self.merged,self.b_adam_train_step],
                      feed_dict={self.DD.b_net.input:  s,
                                 self.DD.b_net.action: a,
                                 self.DD.a_net.input:  ss,
                                 self.DD.a_net.action: b_td_action,
                                 self.DD.reward:       r,
                                 self.DD.terminal:     t})
        return summary
    def egreedy_action(self,action):#output is int,input shape is [1,1]
        ee = random.random()
        if ee < self.e:
            return action[0,0]
        else:
            return random.randint(0,1)
    
    def e_decay(self):
        if self.e < self.END_E_GREEDY:
            self.e += (self.END_E_GREEDY - self.START_E_GREEDY)/2000# 0.98-0.5 =0.48
    
    def session_close(self):
        self.sess.close()
    
    def variable_summary(self,var):
        mean = tf.reduce_mean(var)
        tf.summary.scalar('mean',mean)
    
    def add_summary(self):
        with tf.name_scope('a_net'):
            #with tf.name_scope('w1'):
                #self.variable_summary(self.DD.a_net.Q_net_l1_w)
            tf.summary.scalar('q_value',tf.reduce_mean(self.DD.a_net.Q_value))
            tf.summary.scalar('diff_div_q_value',(tf.reduce_mean(self.DD.a_net.Q_value)-tf.reduce_mean(self.DD.a_net.TEST_Q_value))/tf.reduce_mean(self.DD.a_net.Q_value))
            tf.summary.scalar('diff_q_value',tf.reduce_mean(self.DD.a_net.Q_value)-tf.reduce_mean(self.DD.a_net.TEST_Q_value))
    
        with tf.name_scope('dd_net'):
            tf.summary.scalar('a_cost', self.DD.a_net_cost)
    
    tt = Train()
    
    count = 0
    iters = 0
    for i in range(10000):
    tt.e_decay()
    observation = tt.env.reset()
    if i % 100 == 0:
        print i,'iterations'
        print 'average live time is :',count/100
        count = 0
    for j in range(10000):
        count += 2
        if i >8000:
            tt.env.render()
        observation = tt.a_generate_memory(observation)
        summary = tt.a_train_model()
        if j % 10 == 0:
            tt.writer.add_summary(summary,iters)
            iters += 1
        if tt.done:
            break
    
        if i > 8000:
            tt.env.render()
        observation = tt.b_generate_memory(observation)
        summary = tt.b_train_model()
        if j % 10 == 0:
            tt.writer.add_summary(summary, iters)
            iters += 1
        if tt.done:
            break
    print iters
    
    tt.session_close()

类Train主要实现了4个方法,分别是
1:a_generate_memory用A网络感知环境并形成经验
2:b_generate_memory用B网络感知环境并形成经验
3:a_train_model从经验回放中取出经验训练A网络
4:b_train_model从经验回放中取出经验训练B网络
另外,还使用了egreedy和e_decay(其实e是增加的)

一般在1500次episodes训练后,就能一直保持在200(这个游戏的最大动作次数)了

全部评论 (0)

还没有任何评论哟~