Policy-based Sequential Decision
别看底下有英文,真的很简单,不信你读 
点个赞啊亲,写的很累的啊

做Reinforcement Learning方向的,要明确其目标: 找到可以让agent获得最优回报的最优行为策略

 [公式] ,所以对策略直接进行建模并按照梯度提升就是一个很自然的想法了。

Vanilla Policy Gradient / REINFORCE

- on-policy
- either discrete or continuous action spaces

Policy gradient输出不是 action 的 value, 而是具体的那一个 action, 这样 policy gradient 就跳过了 value 评估这个阶段, 对策略本身进行评估。

Theory

[公式]

We must find the best parameters (θ) to maximize a score function, J(θ).

[公式]

There are two steps:

  • Measure the quality of a π (policy) with a policy score function J(θ) (策略评估)
  • Use policy gradient ascent to find the best parameter θ that improves our π. (策略提升)

Policy score function

  • episode environment with the same start state s1
    [公式]
  • continuous environment (use the average value) 
    [公式]
    where

 [公式] , 

[公式] means Number of occurrences of the state, 

[公式] represents the Total number of occurrences of all state. So

 [公式] 代表在策略 

[公式] 下马尔科夫链的平稳分布 (on-policy state distribution under π), 详见Policy Gradient Algorithms - lilianweng's blog

  • use the average reward per time step. The idea here is that we want to get the most reward per time step.

Policy gradient asscent

与我们惯用的梯度下降相反,这里用的是梯度上升

[公式]

[公式]

Our score function J(θ) can be also defined as:

Since J(θ)  is composed of state distribution and action distribution, when we gradient with respect to θ , the effect of the action is simple to find but the state affect is much more complicated due to the unknown environment. The solution is to use Policy Gradient Theorem:

我们将上一节的三种policy score function归纳为:

It provides a nice reformation of the derivative of the objective function to not involve the derivative of the state distribution

 [公式] and simplify the gradient computation

 [公式] a lot.

[公式]

Proof: Policy Gradient Algorithms - lilianweng's blog

It is also hard to differentiating

 [公式] , unless we can transform it into a logarithm. (likelihood ratio trick)

分解

 [公式] , 去掉不影响偏导的无关项, 就可以得到只与当前动作-状态对有关的最大似然估计.

那么这个log的偏导怎么求呢?

在Coding的时候就是这段:

y = np.zeros([self.act_space])
y[act] = 1 # 制作离散动作空间,执行了的置1
self.gradients.append(np.array(y).astype('float32')-prob)

最后, 我们得到了VPG的更新方法:

对应的code就是, 这里对reward做了归一化:

def learn(self):
    gradients = np.vstack(self.gradients)
    rewards = np.vstack(self.rewards)
    rewards = self.discount_rewards(rewards)
    # reward归一化
    rewards = (rewards - np.mean(rewards)) / (np.std(rewards) + 1e-7)
    gradients *= rewards
    X = np.squeeze(np.vstack([self.states]))
    Y = self.act_probs + self.alpha * np.squeeze(np.vstack([gradients]))

Pseudocode

REINFORCE: 一种基于整条回合数据的更新, remember that? Monte-Carlo method!

其中,
 [公式] 可以理解为在状态 s 对所选动作的 a 的吃惊度,
 [公式]概率越小,反向的
 [公式] (即 -log(P)) 反而越大. 如果在 Policy(s, a) 很小的情况下, 拿到了一个大的 R, 也就是大的 V, 那
 [公式] 就更大, 表示更吃惊, (我选了一个不常选的动作, 却发现原来它能得到了一个好的 reward, 那我就得对我这次的参数进行一个大幅修改). 这就是吃惊度的物理意义.

VPG (OpenAI SpinningUp的定义)

可以发现引入了值函数/优势函数,这是后期改进之后的版本,使其可以用于非回合制的环境。

Implement

'''
用于回合更新的离散控制 REINFORCE
'''
class Skylark_VPG():
    def __init__(self, env, alpha = 0.1, gamma = 0.6, epsilon=0.1, update_freq = 200):
        self.obs_space = 80*80  # 视根据具体gym环境的state输出格式,具体分析
        self.act_space = env.action_space.n
        self.env = env
        self.alpha = alpha      # learning rate
        self.gamma = gamma      # discount rate
        self.states = []
        self.gradients = []
        self.rewards = []
        self.act_probs = []
        self.total_step = 0

        self.model = self._build_model()
        self.model.summary()

    def _build_model(self):
        model = Sequential()
        model.add(Reshape((1, 80, 80), input_shape=(self.obs_space,)))
        model.add(Conv2D(32, (6, 6), activation="relu", strides=(3, 3), 
                        padding="same", kernel_initializer="he_uniform"))
        model.add(Flatten())
        model.add(Dense(64, activation='relu', kernel_initializer='he_uniform'))
        model.add(Dense(32, activation='relu', kernel_initializer='he_uniform'))
        # softmax策略使用描述状态和行为的特征ϕ(s,a) 与参数\theta的线性组合来权衡一个行为发生的概率
        # 输出为每个动作的概率
        model.add(Dense(self.act_space, activation='softmax'))
        opt = Adam(lr=self.alpha)
        model.compile(loss='categorical_crossentropy', optimizer=opt)
        return model
    
    def choose_action(self, state):
        state = state.reshape([1, self.obs_space])
        act_prob = self.model.predict(state).flatten()
        prob = act_prob / np.sum(act_prob)
        self.act_probs.append(act_prob)
        # 按概率选取动作
        action = np.random.choice(self.act_space, 1, p=prob)[0]
        return action, prob
        
    def store_trajectory(self, s, a, r, prob):
        y = np.zeros([self.act_space])
        y[a] = 1 # 制作离散动作空间,执行了的置1
        self.gradients.append(np.array(y).astype('float32')-prob)
        self.states.append(s)
        self.rewards.append(r)

    def discount_rewards(self, rewards):
        '''
        从回合结束位置向前修正reward
        '''
        discounted_rewards = np.zeros_like(rewards)
        running_add = 0
        for t in reversed(range(0, rewards.size)):
            if rewards[t] != 0:
                running_add = 0
            running_add = running_add * self.gamma + rewards[t]
            discounted_rewards[t] = np.array(running_add)
        return discounted_rewards

    def learn(self):
        gradients = np.vstack(self.gradients)
        rewards = np.vstack(self.rewards)
        rewards = self.discount_rewards(rewards)
        # reward归一化
        rewards = (rewards - np.mean(rewards)) / (np.std(rewards) + 1e-7)
        gradients *= rewards
        X = np.squeeze(np.vstack([self.states]))
        Y = self.act_probs + self.alpha * np.squeeze(np.vstack([gradients]))
        self.model.train_on_batch(X, Y)
        self.states, self.act_probs, self.gradients, self.rewards = [], [], [], []

    def train(self, num_episodes, batch_size = 128, num_steps = 100):
        for i in range(num_episodes):
            state = self.env.reset()

            steps, penalties, reward, sum_rew = 0, 0, 0, 0
            done = False
            while not done:
                # self.env.render()
                state = preprocess(state)
                action, prob = self.choose_action(state)
                # Interaction with Env
                next_state, reward, done, info = self.env.step(action) 
                
                self.store_trajectory(state, action, reward, prob)

                sum_rew += reward
                state = next_state
                steps += 1
                self.total_step += 1
            if done:
                self.learn()
                print('Episode: {} | Avg_reward: {} | Length: {}'.format(i, sum_rew/steps, steps))
        print("Training finished.")

更多实现方式见本专栏关联Github

Feature

Advantages

  1. 输出的这个 action 可以是一个连续值, 之前我们说到的 value-based 方法输出的都是不连续的值, 然后再选择值最大的 action. 而 policy gradient 可以在一个连续分布上选取 action.
  2. Convergence: The problem with value-based methods is that they can have a big oscillation while training. This is because the choice of action may change dramatically for an arbitrarily small change in the estimated action values.

On the other hand, with policy gradient, we just follow the gradient to find the best parameters. We see a smooth update of our policy at each step.

Because we follow the gradient to find the best parameters, we’re guaranteed to converge on a local maximum (worst case) or global maximum (best case).

  1. Policy gradients can learn stochastic policies
  2. we don't need to implement an exploration/exploitation trade-off.
  3. get rid of the problem of perceptual aliasing.

Disadvantages

  1. A lot of the time, they converge on a local maximum rather than on the global optimum.
  2. in a situation of Monte Carlo, waiting until the end of the episode to calculate the reward.

Reference

  1. Policy Gradients - 莫烦
  2. Policy Gradient Algorithms - lilianweng's blog
  3. An introduction to Policy Gradients with Cartpole and Doom