Outline

  1. DQN
  2. Cartpole
  3. Cartpole Code

DQN(Deep Q Network)

DQN이란?

DQN의 특징

경험 리플레이

리플레이 메모리

[DQN Algorithm 구조]

dqn

Cartpole

cart

Markov Decision Process (MDP)

Cartpole Code

Environments

1
2
3
4
5
6
7
# CartPole-v1 환경, v1은 최대 타임스텝 500, v0는 최대 타임스텝 200
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0] # 4
action_size = env.action_space.n # 2
print("state_size:", state_size)
print("action_size:", action_size)

DQNAgent Class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class DQNAgent:
  def __init__(self, state_size, action_size):
    # 클래스를 사용할 떄 자동으로 실행된다.

  def build_model(self):
    # 상태가 입력된다. 큐함수가 출력인 인공신경망을 생성한다.

  def update_target_model(self):
   # 타겟 모델을 모델의 가중치로 업데이트

  def get_action(self, state):
    # 입실론 탐욕 정책으로 행동 선택

  def append_sample(self, state, action, reward, next_state, done):
    # 샘플 <s, a, r, s'>을 리플레이 메모리에 저장
    # done : false 였다가 게임이 끝나면 True로 바뀜

  def train_model(self):
    # 리플레이 메모리에서 배치 사이즈 만큼 무작위로 추출해서 학습하는 함수

def __init__

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def __init__(self, state_size, action_size):
        self.render = False
        self.load_model = False

        # 상태와 행동의 크기 정의
        self.state_size = state_size # 4
        self.action_size = action_size # 2

        # DQN hyperparameter
        self.discount_factor = 0.99
        self.learning_rate = 0.001
        # epsilon이 1이면 무조건 무작위로 행동을 선택한다.
        self.epsilon = 1.0
        self.epsilon_decay = 0.999
        # 지속적인 탐험을 위해 epsilon을 0으로 만들지 않고 하한선을 설정함.
        self.epsilon_min = 0.01
        self.batch_size = 64
        self.train_start = 1000

        # 리플레이 메모리, 최대크기 2000
        self.memory = deque(maxlen = 2000)

        # 모델과 타겟 모델 생성
        # DQN의 특징 중 하나는 타겟신경망(모델)을 사용한다는 것
        # 가중치가 무작위로 초기화 되기 때문에 현재 두 모델이 같지 않음
        self.model = self.build_model()
        self.target_model = self.build_model()

        # 타겟 모델 초기화
        self.update_target_model()

        if self.load_model:
            self.model.load_weights("./save_model/cartpole_dqn.h5")

I. Epsilon Greedy Algorithm

II. Replay Memory

III. Target Network

build_model

1
2
3
4
5
6
7
8
def build_model(self):
        model = Sequential()
        model.add(Dense(24, input_dim = self.state_size, activation = 'relu', kernel_initializer = 'he_uniform'))
        model.add(Dense(24, activation = 'relu', kernel_initializer = 'he_uniform'))
        model.add(Dense(self.action_size, activation = 'linear', kernel_initializer = 'he_uniform'))
        model.summary()
        model.compile(loss = 'mse', optimizer = Adam(lr = self.learning_rate))
        return model

update_target_model

1
2
def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())

get_action

1
2
3
4
5
6
7
8
9
10
11
12
def get_action(self, state):
        # 2 <= 3 : 첫번째 숫자가 두번째 보다 같거나 더 작은가? -> True of False
        # np.random.rand() : 0~1 사이 실수 1개 / np.random.rand(5) : 0~1 사이 실수 5개
        # random.randrange(5) : 0~4 임의의 정수 / random.randrange(-5,5) : -5 ~ 4 임의의 정수
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        else:
            # q_value = [[-1.3104991 -1.6175464]]
            # q_value[0] = [-1.3104991 -1.6175464]
            # np.argmax(q_value[0]) = -1.3104991
            q_value = self.model.predict(state)
            return np.argmax(q_value[0])

append_ sample

1
2
def append_sample(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

triain_model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def train_model(self):
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        # 메모리에서 배치 크기만큼 무작위로 샘플 추출
        # mini_batch의 모양: 64 x 5
        # np.shape(mini_batch)
        mini_batch = random.sample(self.memory, self.batch_size)

        # 모델의 업데이트는 배치로 샘플들을 모아서 한 번에 진행하기 때문에
        # model.fit(states, target)에 들어가는 states는 배치여야함
        # 따라서 np.zeros를 사용해 states의 형태를 배치 형태로 지정함.
        # np.zeros( (2, 3) ) : 2x3 영행렬
        states = np.zeros((self.batch_size, self.state_size)) # 64 x 4
        next_states = np.zeros((self.batch_size, self.state_size)) # 64 x 4
        actions, rewards, dones = [], [], []

        # def append_sample(self, state, action, reward, next_state, done):
        # mini_batch의 모양: 64 x 5
        # actions의 모양 : np.shape(actions)
        for i in range(self.batch_size):
            states[i] = mini_batch[i][0]
            actions.append(mini_batch[i][1])
            rewards.append(mini_batch[i][2])
            next_states[i] = mini_batch[i][3]
            dones.append(mini_batch[i][4])

        # self.model = self.build_model()
        # self.target_model = self.build_model()
        # target 의 size: 64 x 2
        # target_val 의 size : 64 x 2
        target = self.model.predict(states) # 큐함수 값
        target_val = self.target_model.predict(next_states) # 정답 큐함수 값 (부트스트랩)

        # 벨만 최적 방정식을 이용한 업데이트 타겟
        # amax 함수는 array 의 최댓값을 반환하는 함수
        for i in range(self.batch_size): # i: 0 ~ 63
            # actions[i] : 0 or 1
            # dones[i] : False or True
            if dones[i]:
                target[i][actions[i]] = rewards[i]
            else:
                target[i][actions[i]] = rewards[i] + self.discount_factor * (np.amax(target_val[i]))

Train

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
scores, episodes = [], []

N_EPISODES = 100
for e in range(N_EPISODES):
    done = False
    score = 0

    # env 초기화
    # state의 모양 : 4
    state = env.reset()
    # state의 모양 : 4 -> 1 x 4
    state = np.reshape(state, [1, state_size])

    # done : false 였다가 한 에피소드가 끝나면 True로 바뀜
    while not done:
        # render = True 이면 학습영상 보여줌
        if agent.render:
            env.render()

        # 현재 상태로 행동을 선택
        action = agent.get_action(state) # q함수를 얻었다.

        # 선택한 행동으로 환경에서 한 타임스텝 진행
        # next_state = np.reshape(next_state, [1, state_size]) : 1x1 -> 1x4
        # info : {} / 넣고싶은 정보가 있으면 추가하면 됨. 없으면 안넣어도 됨.
        next_state, reward, done, info = env.step(action)
        next_state = np.reshape(next_state, [1, state_size])

        # 에피소드가 중간에 끝나면 -100 보상
        # reward = reward if not done else -100
        reward = reward if not done or score == 499 else -100

        # 리플레이 메모리에 샘플 <s,a,r,s'> 저장
        agent.append_sample(state, action, reward, next_state, done)

        # 매 타임스텝마다 학습문
        # self.train_start = 1000
        # 이렇게 하는 이유는 DQN에서는 배치로 학습하기 때문에 샘플이 어느정도 모일때 까지 기다려야 하기때문.
        if len(agent.memory) >= agent.train_start:
            agent.train_model()

        score += reward
        state = next_state

        if done:
            # 각 에피소드마다 타겟 모델을 모델의 가중치로 업데이트
            agent.update_target_model()

            score = score if score == 500 else score + 100

            # 에피소드 마다 학습결과 출력
            scores.append(score)
            episodes.append(e)
            pylab.plot(episodes, scores, 'b')
            if not os.path.exists("./save_graph"):
              os.makedirs("./save_graph")
            pylab.savefig("./save_graph/cartpole_dqn.png")
            print("episode:", e, " score:", score, " memory length:", len(agent.memory), " epsilon:", agent.epsilon)

            # 이전 10개 에피소드의 점수 평균이 490보다 크면 학습 중단
            # np.mean([1, 2, 3]) = 2.0 / np.mean() : 평균
            # min([1, 2, 3]) = 1 / min : 가장 작은 값

            # a = [ 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
            # print(a[-10:])
            # b = [1,2,3,4,5,6,7,8,9]
            # print(b[-9:])
            if np.mean(scores[-min(10, len(scores)):]) > 490:
                if not os.path.exists("./save_model"):
                  os.makedirs("./save_model")
                agent.model.save_weights("./save_model/cartpole_dqn.h5")
                sys.exit()