이번에는 슬롯 머신에 이어 gym 의 CartPole 이라는 게임 환경으로 policy-based 강화 학습을 공부해보는 포스팅입니다.
CartPole 은 좌우 입력으로 막대기의 중심을 맞춰 오랜 시간 막대기를 쓰러뜨리지 않는 것이 목표인 게임입니다.
https://gym.openai.com/envs/CartPole-v0
CartPole 은 앞선 포스팅의 게임에 비해 다음과 같은 조건이 추가된 게임입니다.
- Observations: 막대기가 현재 어디에 있는지 그리고 균형을 맞추기 위해 막대기의 각도를 알 필요가 있습니다.
- Delayed reward : 오랜 시간 막대기가 균형을 잡고 있으면 더 많은 보상을 주어야 됩니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | import numpy as np import cPickle as pickle import tensorflow as tf import matplotlib.pyplot as plt import math import gym env = gym.make('CartPole-v0') env.reset() random_episodes = 0 reward_sum = 0 while random_episodes < 10: env.render() observation, reward, done, _ = env.step(np.random.randint(0,2)) reward_sum += reward if done: random_episodes += 1 print "Reward for this episode was:",reward_sum reward_sum = 0 env.reset() # hyperparameters H = 10 # number of hidden layer neurons batch_size = 5 # every how many episodes to do a param update? learning_rate = 1e-2 # feel free to play with this to train faster or more stably. gamma = 0.99 # discount factor for reward D = 4 # input dimensionality tf.reset_default_graph() #This defines the network as it goes from taking an observation of the environment to #giving a probability of chosing to the action of moving left or right. observations = tf.placeholder(tf.float32, [None,D] , name="input_x") W1 = tf.get_variable("W1", shape=[D, H], initializer=tf.contrib.layers.xavier_initializer()) layer1 = tf.nn.relu(tf.matmul(observations,W1)) W2 = tf.get_variable("W2", shape=[H, 1], initializer=tf.contrib.layers.xavier_initializer()) score = tf.matmul(layer1,W2) probability = tf.nn.sigmoid(score) #From here we define the parts of the network needed for learning a good policy. tvars = tf.trainable_variables() input_y = tf.placeholder(tf.float32,[None,1], name="input_y") advantages = tf.placeholder(tf.float32,name="reward_signal") # The loss function. This sends the weights in the direction of making actions # that gave good advantage (reward over time) more likely, and actions that didn't less likely. loglik = tf.log(input_y*(input_y - probability) + (1 - input_y)*(input_y + probability)) loss = -tf.reduce_mean(loglik * advantages) newGrads = tf.gradients(loss,tvars) # Once we have collected a series of gradients from multiple episodes, we apply them. # We don't just apply gradeients after every episode in order to account for noise in the reward signal. adam = tf.train.AdamOptimizer(learning_rate=learning_rate) # Our optimizer W1Grad = tf.placeholder(tf.float32,name="batch_grad1") # Placeholders to send the final gradients through when we update. W2Grad = tf.placeholder(tf.float32,name="batch_grad2") batchGrad = [W1Grad,W2Grad] updateGrads = adam.apply_gradients(zip(batchGrad,tvars)) def discount_rewards(r): """ take 1D float array of rewards and compute discounted reward """ discounted_r = np.zeros_like(r) running_add = 0 for t in reversed(xrange(0, r.size)): running_add = running_add * gamma + r[t] discounted_r[t] = running_add return discounted_r xs, hs, dlogps, drs, ys, tfps = [], [], [], [], [], [] running_reward = None reward_sum = 0 episode_number = 1 total_episodes = 10000 init = tf.initialize_all_variables() # Launch the graph with tf.Session() as sess: rendering = False sess.run(init) observation = env.reset() # Obtain an initial observation of the environment # Reset the gradient placeholder. We will collect gradients in # gradBuffer until we are ready to update our policy network. gradBuffer = sess.run(tvars) for ix, grad in enumerate(gradBuffer): gradBuffer[ix] = grad * 0 while episode_number <= total_episodes: # Rendering the environment slows things down, # so let's only look at it once our agent is doing a good job. if reward_sum / batch_size > 100 or rendering == True: env.render() rendering = True # Make sure the observation is in a shape the network can handle. x = np.reshape(observation, [1, D]) # Run the policy network and get an action to take. tfprob = sess.run(probability, feed_dict={observations: x}) action = 1 if np.random.uniform() < tfprob else 0 xs.append(x) # observation y = 1 if action == 0 else 0 # a "fake label" ys.append(y) # step the environment and get new measurements observation, reward, done, info = env.step(action) reward_sum += reward drs.append(reward) # record reward (has to be done after we call step() to get reward for previous action) if done: episode_number += 1 # stack together all inputs, hidden states, action gradients, and rewards for this episode epx = np.vstack(xs) epy = np.vstack(ys) epr = np.vstack(drs) tfp = tfps xs, hs, dlogps, drs, ys, tfps = [], [], [], [], [], [] # reset array memory # compute the discounted reward backwards through time discounted_epr = discount_rewards(epr) # size the rewards to be unit normal (helps control the gradient estimator variance) discounted_epr -= np.mean(discounted_epr) discounted_epr /= np.std(discounted_epr) # Get the gradient for this episode, and save it in the gradBuffer tGrad = sess.run(newGrads, feed_dict={observations: epx, input_y: epy, advantages: discounted_epr}) for ix, grad in enumerate(tGrad): gradBuffer[ix] += grad # If we have completed enough episodes, then update the policy network with our gradients. if episode_number % batch_size == 0: sess.run(updateGrads, feed_dict={W1Grad: gradBuffer[0], W2Grad: gradBuffer[1]}) for ix, grad in enumerate(gradBuffer): gradBuffer[ix] = grad * 0 # Give a summary of how well our network is doing for each batch of episodes. running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01 print 'Average reward for episode %f. Total average reward %f.' % ( reward_sum / batch_size, running_reward / batch_size) if reward_sum / batch_size > 200: print "Task solved in", episode_number, 'episodes!' break reward_sum = 0 observation = env.reset() print episode_number, 'Episodes completed.' | cs |
먼저 CarePole 게임에 랜덤하게 input을 입력할 경우 어떻게 되는지 다음과 같은 코드로 볼 수 있습니다.
env.reset() random_episodes = 0 reward_sum = 0 while random_episodes < 10: env.render() observation, reward, done, _ = env.step(np.random.randint(0,2)) reward_sum += reward if done: random_episodes += 1 print "Reward for this episode was:",reward_sum reward_sum = 0 env.reset() | cs |
대략 20점 정도를 얻는 것을 확인할 수 있습니다.
# hyperparameters H = 10 # number of hidden layer neurons batch_size = 5 # every how many episodes to do a param update? learning_rate = 1e-2 # feel free to play with this to train faster or more stably. gamma = 0.99 # discount factor for reward D = 4 # input dimensionality | cs |
feed forward 부분입니다. 은닉층을 하나 두었고 은닉층이기에 relu 을 사용했습니다. W1 은 4 * 10, W2 는 10 * 1이며 W2의 layer score 는 당연하게도 결과를 얻어야되기 때문에 sigmoid 를 사용했습니다.
#This defines the network as it goes from taking an observation of the environment to #giving a probability of chosing to the action of moving left or right. observations = tf.placeholder(tf.float32, [None,D] , name="input_x") W1 = tf.get_variable("W1", shape=[D, H], initializer=tf.contrib.layers.xavier_initializer()) layer1 = tf.nn.relu(tf.matmul(observations,W1)) W2 = tf.get_variable("W2", shape=[H, 1], initializer=tf.contrib.layers.xavier_initializer()) score = tf.matmul(layer1,W2) probability = tf.nn.sigmoid(score) | cs |
loss function 을 만드는 부분입니다.
여기서 input_y 는 fake label 로써 선택된 action 의 반대값입니다. 예를 들어 action 이 1으로 선택되면 input_y 값은 0이 됩니다.
그리고 probability 은 모델을 통해 얻어진 예상값입니다. 여기서는 예상값과 label 값의 차이를 error 로 하는 대신에
error 을 input_y*(input_y - probability) + (1 - input_y)*(input_y + probability) 와 같이 정의하였습니다.
그 이유는 input_y 와 probability 에 적당한 값을 넣어보면 알 수 있습니다.
그리고 앞선 포스팅과 같이 얻어진 error 에 자연로그를 사용했고, 보상을 곱했습니다.
#From here we define the parts of the network needed for learning a good policy. tvars = tf.trainable_variables() input_y = tf.placeholder(tf.float32,[None,1], name="input_y") advantages = tf.placeholder(tf.float32,name="reward_signal") # The loss function. This sends the weights in the direction of making actions # that gave good advantage (reward over time) more likely, and actions that didn't less likely. loglik = tf.log(input_y*(input_y - probability) + (1 - input_y)*(input_y + probability)) loss = -tf.reduce_mean(loglik * advantages) newGrads = tf.gradients(loss,tvars) | cs |
학습을 시키는 부분입니다. AdamOptimizer 방법을 사용했고 gradient 를 바로 학습시키지 않고 모았다가 적용을 합니다.
cartPole 이라는 게임은 매 입력마다 얻어지는 결과가 매우 서로 밀접합니다. 즉 막대기가 조금씩 움직인다는 이야기지요. 만약 이것을 그대로 학습한다면 무의미한 노이즈까지 학습을 하는 결과를 낳게 됩니다. 하지만 이렇게 모았다 학습을 하면 전체적인 큰 그림에서의 결과를 학습할 수 있게 됩니다.
# Once we have collected a series of gradients from multiple episodes, we apply them. # We don't just apply gradeients after every episode in order to account for noise in the reward signal. adam = tf.train.AdamOptimizer(learning_rate=learning_rate) # Our optimizer W1Grad = tf.placeholder(tf.float32,name="batch_grad1") # Placeholders to send the final gradients through when we update. W2Grad = tf.placeholder(tf.float32,name="batch_grad2") batchGrad = [W1Grad,W2Grad] updateGrads = adam.apply_gradients(zip(batchGrad,tvars)) | cs |
보상을 계산하는 함수입니다. cartPole 게임은 막대기를 오랜 시간 쓰러뜨리지않고 유지하는 것이 목표인 게임이기 때문에 이런 방향으로 학습을 유도하기 위해서 가장 마지막에 행동부터 보상을 계산하여 이후 행동에는 gamma 값 discounted factor 를 곱해 뒤쪽 행동에 더 많은 가중치를 둔 보상값을 만들 수 있습니다.
def discount_rewards(r): """ take 1D float array of rewards and compute discounted reward """ discounted_r = np.zeros_like(r) running_add = 0 for t in reversed(xrange(0, r.size)): running_add = running_add * gamma + r[t] discounted_r[t] = running_add return discounted_r | cs |
각각 아래와 같은 값들을 저장하는 배열을 만듭니다.
xs: state
hs: hidden
dlogps: policy gradient
drs: reward
y: action
xs, hs, dlogps, drs, ys, tfps = [], [], [], [], [], [] | cs |
running_reward = None reward_sum = 0 episode_number = 1 total_episodes = 10000 | cs |
# Reset the gradient placeholder. We will collect gradients in # gradBuffer until we are ready to update our policy network. gradBuffer = sess.run(tvars) for ix, grad in enumerate(gradBuffer): gradBuffer[ix] = grad * 0 | cs |
x 를 만들고 feed forward 으로 행동을 얻습니다.
# Make sure the observation is in a shape the network can handle. x = np.reshape(observation, [1, D]) # Run the policy network and get an action to take. tfprob = sess.run(probability, feed_dict={observations: x}) action = 1 if np.random.uniform() < tfprob else 0 | cs |
상태와 행동을 저장합니다.
xs.append(x) # observation y = 1 if action == 0 else 0 # a "fake label" ys.append(y) | cs |
step 을 통해 새로운 상태와 보상값 등을 얻습니다.
# step the environment and get new measurements observation, reward, done, info = env.step(action) reward_sum += reward drs.append(reward) # record reward (has to be done after we call step() to get reward for previous action) | cs |
만약 게임이 종료되었다면 학습을 위해 각 값을 새롭게 가공합니다.
if done: episode_number += 1 # stack together all inputs, hidden states, action gradients, and rewards for this episode epx = np.vstack(xs) epy = np.vstack(ys) epr = np.vstack(drs) tfp = tfps xs, hs, dlogps, drs, ys, tfps = [], [], [], [], [], [] # reset array memory | cs |
보상을 얻은 후 평균을 빼주고 표준편차로 나눠주는 방법을 통해 보상을 standardization 합니다. (평균1, 표준 편차(standard deviation) 1) cartPole 은 각각의 step 에서 항상 보상으로 1로 주기 때문에 모든 행동에 대해 양수의 보상을 얻게 됩니다. 하지만 이런 경우 학습이 잘 이뤄지지 않기 때문에 이런 과정을 통해 좋은 행동을 encourage 하게 되어 더 학습이 잘 되게 만들어줍니다.
# compute the discounted reward backwards through time discounted_epr = discount_rewards(epr) # size the rewards to be unit normal (helps control the gradient estimator variance) discounted_epr -= np.mean(discounted_epr) discounted_epr /= np.std(discounted_epr) | cs |
이 에피소드의 gradient 를 얻어 buffer 에 담아둡니다.
# Get the gradient for this episode, and save it in the gradBuffer tGrad = sess.run(newGrads, feed_dict={observations: epx, input_y: epy, advantages: discounted_epr}) for ix, grad in enumerate(tGrad): gradBuffer[ix] += grad | cs |
일정 batch_size 마다 모아둔 buffer 에서 gradient 를 꺼내 학습을 시킵니다.
# If we have completed enough episodes, then update the policy network with our gradients. if episode_number % batch_size == 0: sess.run(updateGrads, feed_dict={W1Grad: gradBuffer[0], W2Grad: gradBuffer[1]}) for ix, grad in enumerate(gradBuffer): gradBuffer[ix] = grad * 0 | cs |