드디어 DQN 포스팅입니다.
DQN 은 Deep Q-Network 의 약자로써 기존의 Q-Network 를 개선하여 DeepMind 팀이 NIPS 2013 과 Nature 2015 에 발표한 내용입니다.
위와 같이 DQN 은 기존의 Q-Network 에 비해 3가지 특징을 가지고 있습니다.
1. Convolution Layers
지금까지 다뤘던 frozenLake 나 cartPole 은 그 input 이 매우 단순해 큰 문제없이 신경망을 구성할 수 있었습니다. 하지만 대부분의 게임은 매우 복잡한 환경과 input을 가지고 있습니다. 이런 복잡한 input 에 대응하기 위한 좋은 방법은 CNN 을 이용하는 것입니다. 그리고 CNN 은 기존의 NN 에 비해 더 deep 하게 학습을 하는데 도움을 줄 수 있습니다.
2. Experience Replay
이 방법은 기존의 정책 학습에서도 비슷하게 사용했던 방법입니다. 우선 첫 번째 아이디어는 step 에 따른 결과로 바로 학습을 하는 것이 아니라 버퍼에 담아 두는 것입니다. 그리고 두 번째 아이디어는 이 버퍼에 담긴 값들을 전부 사용하는 것이 아니라 랜덤하게 뽑아 학습에 이용합니다. 이렇게 랜덤하게 선택을 하여 학습을 하는 이유는 일반적인 환경에서는 에이전트가 연속적으로 움직이기 때문에 연속적으로 쌓인 결과가 매우 근접하고 비슷하며 특정 상황에서 쌓긴 결과일 가능성이 높습니다. 하지만 이렇게 랜덤하게 뽑아 학습을 한다면 전체적인 다양한 경험을 통해 올바른 학습을 할 수 있습니다.
3. Separate Target Network
기존 Q-Network 의 문제점 중 predict Y(행동) 와 target(label) Y 를 산출하는 중간에 모델이 계속 훈련된다는 문제가 있었습니다. 이 것이 문제가 되는 이유는 predict Y 가 target Y 를 향해 가까이 가도록 점점 훈련을 하는 것이 이 학습의 목적인데 target이 계속 바뀜으로써 predict Y 가 어디로 갈지 갈길을 잃는 경우가 생긴다는 것입니다.
이 문제를 해결하기 위해 DQN 에서는 간단하게 두 개의 모델 main, target 을 사용하는 방법을 이용했습니다. 이렇게 모델을 두 개로 분리하여 main 신경망에서는 행동을 계산하고 target 신경망에서는 label Y 를 산출하여 Q value 를 계산합니다. 그리고 이 target 신경망은 일정 batch 에 한 번씩 학습된 main 모델을 target 모델로 복사를 하여 학습을 하게 됩니다.
DQN 이외에도 이 포스팅에서는 추가적으로 두 가지 방법을 사용했습니다.
1. Double DQN
DQN 의 3번째 방법 즉 target 신경망을 따로 만드는 방법 때문에 main 신경망에 비해 훈련이 더딘 target 신경망이 main 신경망의 상황과 맞지 않는 부적확한 target Y 를 만들어낼 확률이 존재한다는 것입니다. 이 부분을 보완하기 위해 Double DQN 논문의 저자는 학습 단계에서 target Y 를 target 신경망에서 최대값을 가져오는 방법을 통해 얻는 것이 아니라 main 신경망에서 선택한 행동에 대한 target Q value 을 생성하는 방법을 사용했습니다. 이렇게 target 신경망에서 행동을 선택하는 부분을 하지 않음으로써 더 믿을만한 값으로 학습을 할 수 있게 됩니다. target Q value 를 업데이트하기 위한 공식은 아래와 같습니다.
Q-Target = r + γQ(s’,argmax(Q(s’,a,ϴ),ϴ’)
2. Dueling DQN
지금까지 Q Network 에서 행동을 구하기 위해 각 행동의 Q-value 를 얻어야했고 이를 위해 상태와 각 행동을 신경망에 넣어 어떤 행동이 가장 좋을지 선택을 하였습니다. 이것을 Q(s,a) 으로 표현했습니다. 하지만 Dueling DQN 은 이 과정에서 신경망을 value function 와 advantage function 로 나눠 각각 value와 advantage 을 계산하여 그것을 다시 결합해 행동을 구합니다. value function 은 상태를 넣어서 이 상태가 얼마나 좋은지 판단하는 함수이며 advantage function 은 행동을 넣어서 다른 것들과 비교하여 주어진 행동이 얼마나 좋은지를 판단해주는 함수입니다.
이는 가치를 다양하게(두 가지로) 분리함으로써 더 좋은 행동을 얻을 수 있게 만들어줍니다. 공식은 다음과 같습니다.
Q(s,a) =V(s) + A(a)
V(s) 는 value function, A(a)는 advantage function 입니다.
원래 DeepMind 팀의 논문에서 사용한 게임은 Atari 게임을 사용했지만 학습 시간이 오래 걸리기에 이 포스팅에서는 gridworld 라는 게임을 만들어 사용했습니다.
이 게임은 빨간 박스를 피해서 파란 박스를 이동시켜 녹색 박스로 이동시키는 게임입니다. 녹색 박스에 도달할 경우 보상 +1, 빨간 박스로 갈 경우 보상 -1 입니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 | from __future__ import division import numpy as np import random import tensorflow as tf import tensorflow.contrib.slim as slim import os from gridworld import gameEnv env = gameEnv(partial=False,size=5) class Qnetwork(): def __init__(self, h_size): # The network recieves a frame from the game, flattened into an array. # It then resizes it and processes it through four convolutional layers. self.scalarInput = tf.placeholder(shape=[None, 21168], dtype=tf.float32) self.imageIn = tf.reshape(self.scalarInput, shape=[-1, 84, 84, 3]) self.conv1 = slim.conv2d( \ inputs=self.imageIn, num_outputs=32, kernel_size=[8, 8], stride=[4, 4], padding='VALID', biases_initializer=None) self.conv2 = slim.conv2d( \ inputs=self.conv1, num_outputs=64, kernel_size=[4, 4], stride=[2, 2], padding='VALID', biases_initializer=None) self.conv3 = slim.conv2d( \ inputs=self.conv2, num_outputs=64, kernel_size=[3, 3], stride=[1, 1], padding='VALID', biases_initializer=None) self.conv4 = slim.conv2d( \ inputs=self.conv3, num_outputs=h_size, kernel_size=[7, 7], stride=[1, 1], padding='VALID', biases_initializer=None) # We take the output from the final convolutional layer and split it into separate advantage and value streams. self.streamAC, self.streamVC = tf.split(self.conv4, 2, 3) self.streamA = slim.flatten(self.streamAC) self.streamV = slim.flatten(self.streamVC) self.AW = tf.Variable(tf.random_normal([h_size // 2, env.actions])) self.VW = tf.Variable(tf.random_normal([h_size // 2, 1])) self.Advantage = tf.matmul(self.streamA, self.AW) self.Value = tf.matmul(self.streamV, self.VW) # Then combine them together to get our final Q-values. self.Qout = self.Value + tf.subtract(self.Advantage, tf.reduce_mean(self.Advantage, axis=1, keep_dims=True)) self.predict = tf.argmax(self.Qout, 1) # Below we obtain the loss by taking the sum of squares difference between the target and prediction Q values. self.targetQ = tf.placeholder(shape=[None], dtype=tf.float32) self.actions = tf.placeholder(shape=[None], dtype=tf.int32) self.actions_onehot = tf.one_hot(self.actions, env.actions, dtype=tf.float32) self.Q = tf.reduce_sum(tf.multiply(self.Qout, self.actions_onehot), axis=1) self.td_error = tf.square(self.targetQ - self.Q) self.loss = tf.reduce_mean(self.td_error) self.trainer = tf.train.AdamOptimizer(learning_rate=0.0001) self.updateModel = self.trainer.minimize(self.loss) ### Experience Replay # This class allows us to store experies and sample then randomly to train the network. class experience_buffer(): def __init__(self, buffer_size=50000): self.buffer = [] self.buffer_size = buffer_size def add(self, experience): if len(self.buffer) + len(experience) >= self.buffer_size: self.buffer[0:(len(experience) + len(self.buffer)) - self.buffer_size] = [] self.buffer.extend(experience) def sample(self, size): return np.reshape(np.array(random.sample(self.buffer, size)), [size, 5]) # This is a simple function to resize our game frames. def processState(states): return np.reshape(states,[21168]) # These functions allow us to update the parameters of our target network with those of the primary network. def updateTargetGraph(tfVars,tau): total_vars = len(tfVars) op_holder = [] for idx,var in enumerate(tfVars[0:total_vars//2]): op_holder.append(tfVars[idx+total_vars//2].assign((var.value()*tau) + ((1-tau)*tfVars[idx+total_vars//2].value()))) return op_holder def updateTarget(op_holder,sess): for op in op_holder: sess.run(op) ### Training the network # Setting all the training parameters batch_size = 32 #How many experiences to use for each training step. update_freq = 4 #How often to perform a training step. y = .99 #Discount factor on the target Q-values startE = 1 #Starting chance of random action endE = 0.1 #Final chance of random action anneling_steps = 10000. #How many steps of training to reduce startE to endE. num_episodes = 1000 #How many episodes of game environment to train network with. pre_train_steps = 10000 #How many steps of random actions before training begins. max_epLength = 50 #The max allowed length of our episode. load_model = False #Whether to load a saved model. path = "./dqn" #The path to save our model to. h_size = 512 #The size of the final convolutional layer before splitting it into Advantage and Value streams. tau = 0.001 #Rate to update target network toward primary network tf.reset_default_graph() mainQN = Qnetwork(h_size) targetQN = Qnetwork(h_size) init = tf.global_variables_initializer() saver = tf.train.Saver() trainables = tf.trainable_variables() targetOps = updateTargetGraph(trainables, tau) myBuffer = experience_buffer() # Set the rate of random action decrease. e = startE stepDrop = (startE - endE) / anneling_steps # create lists to contain total rewards and steps per episode jList = [] rList = [] total_steps = 0 # Make a path for our model to be saved in. if not os.path.exists(path): os.makedirs(path) with tf.Session() as sess: sess.run(init) if load_model == True: print('Loading Model...') ckpt = tf.train.get_checkpoint_state(path) saver.restore(sess, ckpt.model_checkpoint_path) updateTarget(targetOps, sess) # Set the target network to be equal to the primary network. for i in range(num_episodes): episodeBuffer = experience_buffer() # Reset environment and get first new observation s = env.reset() s = processState(s) d = False rAll = 0 j = 0 # The Q-Network while j < max_epLength: # If the agent takes longer than 200 moves to reach either of the blocks, end the trial. j += 1 # Choose an action by greedily (with e chance of random action) from the Q-network if np.random.rand(1) < e or total_steps < pre_train_steps: a = np.random.randint(0, 4) else: a = sess.run(mainQN.predict, feed_dict={mainQN.scalarInput: [s]})[0] s1, r, d = env.step(a) s1 = processState(s1) total_steps += 1 episodeBuffer.add( np.reshape(np.array([s, a, r, s1, d]), [1, 5])) # Save the experience to our episode buffer. if total_steps > pre_train_steps: if e > endE: e -= stepDrop if total_steps % (update_freq) == 0: trainBatch = myBuffer.sample(batch_size) # Get a random batch of experiences. # Below we perform the Double-DQN update to the target Q-values Q1 = sess.run(mainQN.predict, feed_dict={mainQN.scalarInput: np.vstack(trainBatch[:, 3])}) Q2 = sess.run(targetQN.Qout, feed_dict={targetQN.scalarInput: np.vstack(trainBatch[:, 3])}) end_multiplier = -(trainBatch[:, 4] - 1) doubleQ = Q2[range(batch_size), Q1] targetQ = trainBatch[:, 2] + (y * doubleQ * end_multiplier) # Update the network with our target values. _ = sess.run(mainQN.updateModel, \ feed_dict={mainQN.scalarInput: np.vstack(trainBatch[:, 0]), mainQN.targetQ: targetQ, mainQN.actions: trainBatch[:, 1]}) updateTarget(targetOps, sess) # Set the target network to be equal to the primary network. rAll += r s = s1 if d == True: break myBuffer.add(episodeBuffer.buffer) jList.append(j) rList.append(rAll) # Periodically save the model. if i % 1000 == 0: saver.save(sess, path + '/model-' + str(i) + '.cptk') print("Saved Model") if len(rList) % 10 == 0: print(total_steps, np.mean(rList[-10:]), e) saver.save(sess, path + '/model-' + str(i) + '.cptk') print("Percent of succesful episodes: " + str(sum(rList) / num_episodes) + "%") | cs |
먼저 신경망을 구성하는 클래스입니다.
input 은 84,84,3 배열이며 84*84*3 = 21168 의 input 을 받은 후 84,84,3 으로 reshape 합니다.
conv1 은 input 에 8,8 사이즈의 필터를 사용하고 stride는 4,4 이며 padding은 사용하지 않고 output은 32개로 설정하였기에 20,20,32의 activation maps가 나오게 됩니다.
가로 세로 사이즈가 20인 이유는 (84 - 8(필터 크기)) / 4(stride) + 1 으로 계산되기 때문입니다.
conv2 는 conv1 에 4,4 사이즈의 피렅를 사용하기 stride 는 2,2 이며 역시 padding은 사용하지 않고 output은 64개로 설정하였기에 9,9,64의 activation maps가 나오게 됩니다.
같은 방식으로 conv3 는 7,7,64 마지막으로 conv4 는 1,1, h_size 입니다.
class Qnetwork(): def __init__(self, h_size): # The network recieves a frame from the game, flattened into an array. # It then resizes it and processes it through four convolutional layers. self.scalarInput = tf.placeholder(shape=[None, 21168], dtype=tf.float32) self.imageIn = tf.reshape(self.scalarInput, shape=[-1, 84, 84, 3]) self.conv1 = slim.conv2d( \ inputs=self.imageIn, num_outputs=32, kernel_size=[8, 8], stride=[4, 4], padding='VALID', biases_initializer=None) self.conv2 = slim.conv2d( \ inputs=self.conv1, num_outputs=64, kernel_size=[4, 4], stride=[2, 2], padding='VALID', biases_initializer=None) self.conv3 = slim.conv2d( \ inputs=self.conv2, num_outputs=64, kernel_size=[3, 3], stride=[1, 1], padding='VALID', biases_initializer=None) self.conv4 = slim.conv2d( \ inputs=self.conv3, num_outputs=h_size, kernel_size=[7, 7], stride=[1, 1], padding='VALID', biases_initializer=None) | cs |
최종적으로 만들어진 conv4 를 Dueling DQN 방법을 사용하기 위해 color 3을 유지하며 두 개로 나눕니다. (streamA, streamV)
다음으로 h_size / 2 의 크기로 정규 분포를 이용하여 AW 와 VW weights 을 만듭니다. 이때 AW 는 환경의 행동 초기값(4)을 표준 편차로 하여 만들고 VW 는 1을 표준 편차로 하여 만듭니다.
만들어진 AW 는 conv4 을 반으로 나눈 streamA 와 곱하여 Advantage 을 만들고 VW 는 streamV 와 곱하여 Value 을 만듭니다.
# We take the output from the final convolutional layer and split it into separate advantage and value streams. self.streamAC, self.streamVC = tf.split(self.conv4, 2, 3) self.streamA = slim.flatten(self.streamAC) self.streamV = slim.flatten(self.streamVC) self.AW = tf.Variable(tf.random_normal([h_size // 2, env.actions])) self.VW = tf.Variable(tf.random_normal([h_size // 2, 1])) self.Advantage = tf.matmul(self.streamA, self.AW) self.Value = tf.matmul(self.streamV, self.VW) | cs |
Advantage 에서 Advantage 의 평균을 뺀 값과 Value 을 더해 Qout 을 만들어 가장 큰 값 하나를 선택 해 predict 을 구합니다.
# Then combine them together to get our final Q-values. self.Qout = self.Value + tf.subtract(self.Advantage, tf.reduce_mean(self.Advantage, axis=1, keep_dims=True)) self.predict = tf.argmax(self.Qout, 1) | cs |
targetQ 와 actions 을 받을 변수를 만들고 env.actions 값(4)을 depth 로 하여 actions 을 one hot encoding 한 actions_onehot 을 만듭니다. 그 이유는 env.actions 이 초기값 4로 할당되어 있고 이는 actions 의 가지수이기 때문입니다. (상하좌우)
# Below we obtain the loss by taking the sum of squares difference between the target and prediction Q values. self.targetQ = tf.placeholder(shape=[None], dtype=tf.float32) self.actions = tf.placeholder(shape=[None], dtype=tf.int32) self.actions_onehot = tf.one_hot(self.actions, env.actions, dtype=tf.float32) | cs |
신경망에서 예상한 Qout 와 행동의 곱에 axis을 1로 하여 합을 구해 Q에 할당합니다. 이렇게 곱을 하는 이유는 actions_onehot 은 one hot encoding 이 된 상태이기 때문에 각 행동에 해당되는 index 의 값만 1이고 나머지는 0이기 때문입니다. 이런 곱을 통해 Qout 에서 필요한 실질적인 값만 얻어낼 수 있습니다.
self.Q = tf.reduce_sum(tf.multiply(self.Qout, self.actions_onehot), axis=1) | cs |
targetQ 와 위에서 얻은 Q 의 차이의 제곱으로 error 을 구하고 error 의 평균으로 loss 을 정의합니다.
그리고 AdamOptimizer minimize 을 이용하여 모델을 학습시킵니다.
self.td_error = tf.square(self.targetQ - self.Q) self.loss = tf.reduce_mean(self.td_error) self.trainer = tf.train.AdamOptimizer(learning_rate=0.0001) self.updateModel = self.trainer.minimize(self.loss) | cs |
Experience Replay 방법을 위한 클래스입니다.
버퍼 변수를 만들고 버퍼에 값을 넣는 add 함수와 버퍼에서 랜덤하게 값을 얻어낼 수 있는 sample 함수를 만듭니다.
버퍼에 add 시 버퍼가 가득차면 queue 방식으로 제일 먼저 들어온 값을 버립니다.
### Experience Replay # This class allows us to store experies and sample then randomly to train the network. class experience_buffer(): def __init__(self, buffer_size=50000): self.buffer = [] self.buffer_size = buffer_size def add(self, experience): if len(self.buffer) + len(experience) >= self.buffer_size: self.buffer[0:(len(experience) + len(self.buffer)) - self.buffer_size] = [] self.buffer.extend(experience) def sample(self, size): return np.reshape(np.array(random.sample(self.buffer, size)), [size, 5]) | cs |
배열을 21168 으로 reshape 해주는 함수입니다.
# This is a simple function to resize our game frames. def processState(states): return np.reshape(states,[21168]) | cs |
updateTargetGraph 은 학습 가능한 변수들 tfVars 과 main 신경망에서 target 신경망을 업데이트할 때 업데이트되는 비율 tau 을 파라미터로 받습니다.
tfVars 의 앞 절반은 main 신경망의 값들이고 뒤 절반은 target 신경망의 값들이기 때문에
반을 나눠 앞의 절반에 tau 을 곱하였기 때문에 main 신경망의 weights 에 tau 이 곱해지고
뒤에 절반에 1 - tau 을 곱했기 때문에 target 신경망의 weights 에 1 - tau 이 곱해지게 됩니다.
이 tau 가 여기서는 0.001 으로 정의되었기 때문에 99.999% 의 비율로 target 신경망의 값들이 유지되고 여기에 0.001% 의 비율의 main 신경망의 값이 더해지게 됩니다.
이렇게 만들어진 operation 배열을 반환하는 함수가 updateTargetGraph 입니다.
다음으로 updateTarget 은 op_holder 을 받아 배열안의 operation 들을 실행시키는 역할을 하는데 여기서는 사실상 target 신경망을 업데이트 시키는 역할을 합니다.
# These functions allow us to update the parameters of our target network with those of the primary network. def updateTargetGraph(tfVars,tau): total_vars = len(tfVars) op_holder = [] for idx,var in enumerate(tfVars[0:total_vars//2]): op_holder.append(tfVars[idx+total_vars//2].assign((var.value()*tau) + ((1-tau)*tfVars[idx+total_vars//2].value()))) return op_holder def updateTarget(op_holder,sess): for op in op_holder: sess.run(op) | cs |
신경망 학습에 필요한 파라미터 값들을 설정하는 부분입니다.
batch_size: 각각의 step 에서 experience buffer 에서 셈플을 랜덤하게 몇 개나 가져올지에 대한 값
update_freq: update_freq 번에 한 번씩 experience buffer 에서 셈플을 랜덤하게 꺼내와 학습을 합니다.
y: target 신경망을 학습할 때 사용되는 Discount factor
startE: e-greedy 의 시작 확률 값. 이후 학습 과정에서 1을 표준 편차로 하는 정규 분포의 값을 통해 랜덤 여부를 결정합니다.
endE: e-greedy 의 종료 확률 값.
anneling_steps: startE 에서 endE 까지 몇 단계에 걸쳐 값을 줄일 것인지. anneling_steps 번 이후에 e-greedy 의 확률 값이 endE가 됩니다.
num_episodes: 에피소드 반복 횟수
pre_train_steps: total_steps 이 pre_train_steps 보다 작을 경우 무조건 랜덤한 선택을 합니다. 즉 학습 시작 전에 pre_train_steps 번은 무조건 랜덤한 행동을 선택합니다.
max_epLength: 한 번의 에피소드에서 최대 이동(step) 거리
load_model: 저장해둔 모델을 로드할지.
path: 모델 저장 경로
h_size: convolutional layer 크기 (Advantage and Value streams 의 합)
tau: main 신경망에서 target 신경망을 업데이트할 때 업데이트되는 비율
### Training the network # Setting all the training parameters batch_size = 32 #How many experiences to use for each training step. update_freq = 4 #How often to perform a training step. y = .99 #Discount factor on the target Q-values startE = 1 #Starting chance of random action endE = 0.1 #Final chance of random action anneling_steps = 10000. #How many steps of training to reduce startE to endE. num_episodes = 10000 #How many episodes of game environment to train network with. pre_train_steps = 10000 #How many steps of random actions before training begins. max_epLength = 50 #The max allowed length of our episode. load_model = False #Whether to load a saved model. path = "./dqn" #The path to save our model to. h_size = 512 #The size of the final convolutional layer before splitting it into Advantage and Value streams. tau = 0.001 #Rate to update target network toward primary network | cs |
Qnetwork 클래스를 통해 main, target 신경망을 만듭니다.
mainQN = Qnetwork(h_size) targetQN = Qnetwork(h_size) | cs |
trainables: 학습 가능한 변수들
targetOps: updateTargetGraph 함수의 반환값
myBuffer: experience buffer
e: e-greedy 확률값
stepDrop: 각 step 마다 e가 얼마나 차감되는지
jList: 각 에피소드의 총 이동(step) 을 담아둘 list
rList: 각 에피소드의 보상을 담아둘 list
trainables = tf.trainable_variables() targetOps = updateTargetGraph(trainables, tau) myBuffer = experience_buffer() # Set the rate of random action decrease. e = startE stepDrop = (startE - endE) / anneling_steps # create lists to contain total rewards and steps per episode jList = [] rList = [] total_steps = 0 | cs |
먼저 target 신경망을 업데이트 시킵니다. 이 과정을 통해 target 신경망과 main 신경망은 동일한 weights 값들을 가지게 됩니다.
updateTarget(targetOps, sess) # Set the target network to be equal to the primary network. | cs |
에피소드가 시작이 되면 먼저 초기화된 experience_buffer 을 만들어 episodeBuffer에 할당합니다.
그리고 환경을 초기화하고 상태값 s 를 processState 으로 reshape 합니다.
d: 완료 여부
rAll: 에피도스의 보상의 합
j: 해당 에피소드의 이동의 합
for i in range(num_episodes): episodeBuffer = experience_buffer() # Reset environment and get first new observation s = env.reset() s = processState(s) d = False rAll = 0 j = 0 | cs |
만약 이동 수가 max_epLength(50)보다 클 경우 종료 시킵니다.
j에 1을 더해주고,
e-greedy 방식을 통해 main 신경망 또는 무작위로 행동을 결정합니다. 이때 total_steps 이 pre_train_steps 보다 작을 경우에도 항상 랜덤하게 행동을 선택합니다.
# The Q-Network while j < max_epLength: # If the agent takes longer than 200 moves to reach either of the blocks, end the trial. j += 1 # Choose an action by greedily (with e chance of random action) from the Q-network if np.random.rand(1) < e or total_steps < pre_train_steps: a = np.random.randint(0, 4) else: a = sess.run(mainQN.predict, feed_dict={mainQN.scalarInput: [s]})[0] | cs |
환경 step 을 통해 상태, 보상, 완료 여부를 얻스빈다.
얻어진 상태 s1을 reshape 하고 experience_buffer 에 얻어진 결과를 담습니다.
s1, r, d = env.step(a) s1 = processState(s1) total_steps += 1 episodeBuffer.add( np.reshape(np.array([s, a, r, s1, d]), [1, 5])) # Save the experience to our episode buffer. | cs |
e-greedy 확률 값을 조정합니다.
if total_steps > pre_train_steps: if e > endE: e -= stepDrop | cs |
update_freq 에 한 번씩 학습과정을 거칩니다.
먼저 experience_buffer 인 myBuffer 에서 랜덤하게 셈플을 가지고 옵니다.
다음으로 Double DQN을 위해 main 신경망에서는 행동(Q1)을 선택하고 target 신경망에서 각 행동에 대한 모든 Q values(Q2)을 얻습니다.
end_multiplier 은 Q-Learning 에서는 해당 에피소드가 완료된 경우 얻어진 보상으로 바로 학습하고 완료되지 않은 경우 r + γ(max(Q(s’,a’)) 와 같은 계산을 하여 학습을 하게 되는데 여기서는 이 과정을 if 문으로 나누지 않고 하나의 식으로 표현하기 위해 만들어진 변수입니다. trainBatch[:, 4] 은 trainBatch 의 index 4(d 즉 완료 여부) 만을 slicing 한 것이므로 0 또는 1의 값이고 만약 완료하지 못한 경우라면 값이 0이기에 end_multiplier 은 1, 완료했을 경우 값이 1이기에 end_multiplier은 0이 됩니다. 그리고 이 값을 y * doubleQ 에 곱함으로써 완료했을 경우에는 y * doubleQ 가 의미없게 만들어줍니다.
다음으로 main 신경망에서 얻어진 행동에 따른 target 신경망의 Q value 을 doubleQ 에 담습니다.
이제 학습할 보상값 targetQ 을 계산하기 위해 trainBatch[:, 2] 으로 보상만을 slicing하고 y * doubleQ * end_multiplier 을 더해서 (y 는 discount factor) 값을 구합니다.
이제 학습을 하기 위해 main 신경망을 updateModel 하고 updateTarget 을 통해 target 신경망도 일부 학습합니다.
여기서는 DQN 의 방법인 일정 횟수에 한 번씩 main 신경망을 target 신경망으로 복사하는 것이 아닌 main 신경망을 학습시킬 때 마다 main 신경망의 일부 비율값을 target 신경망으로 복사하는 방법을 사용했습니다.
if total_steps % (update_freq) == 0: trainBatch = myBuffer.sample(batch_size) # Get a random batch of experiences. # Below we perform the Double-DQN update to the target Q-values Q1 = sess.run(mainQN.predict, feed_dict={mainQN.scalarInput: np.vstack(trainBatch[:, 3])}) Q2 = sess.run(targetQN.Qout, feed_dict={targetQN.scalarInput: np.vstack(trainBatch[:, 3])}) end_multiplier = -(trainBatch[:, 4] - 1) doubleQ = Q2[range(batch_size), Q1] targetQ = trainBatch[:, 2] + (y * doubleQ * end_multiplier) # Update the network with our target values. _ = sess.run(mainQN.updateModel, \ feed_dict={mainQN.scalarInput: np.vstack(trainBatch[:, 0]), mainQN.targetQ: targetQ, mainQN.actions: trainBatch[:, 1]}) updateTarget(targetOps, sess) # Set the target network to be equal to the primary network. | cs |
배열에 값을 담고 해당 에피소드가 완료된 경우 break 합니다.
rAll += r s = s1 if d == True: break | cs |
experience_buffer 에 해당 에피소드의 버퍼를 더합니다.
myBuffer.add(episodeBuffer.buffer) jList.append(j) rList.append(rAll) | cs |