DDPG总是选择边界动作

发布于 2025-01-30 14:45:20 字数 7938 浏览 3 评论 0原文

IAM试图实现DDPG算法,该算法以8个值的状态和大小= 4的输出操作。 这些动作由[5,5,0,0]的下限,上限为[40,40,15,15]。

当我训练DDPG时,它总是选择其中一个边界之一,例如[5,40,0,15]或[40,40,0,0]。 此后,我实施了SAC算法,并且知道我在健身游戏中尝试了我的DDPG代理商,并且可以使用。也许问题在于提高政策代理中的动作。

在这里看看我拥有的模型

class Buffers:
def __init__(self, buffer_capacity=100000, batch_size=64):
    # Number of "experiences" to store at max
    self.buffer_capacity = buffer_capacity
    num_states = 8
    num_actions = 4
    # Num of tuples to train on.
    self.batch_size = batch_size

    # Its tells us num of times record() was called.
    self.buffer_counter = 0

    # Instead of list of tuples as the exp.replay concept go
    # We use different np.arrays for each tuple element
    self.state_buffer = np.zeros((self.buffer_capacity, num_states))
    self.action_buffer = np.zeros((self.buffer_capacity, num_actions))
    self.reward_buffer = np.zeros((self.buffer_capacity, 1))
    self.next_state_buffer = np.zeros((self.buffer_capacity, num_states))

# Takes (s,a,r,s') obervation tuple as input
def record(self, obs_tuple):
    # Set index to zero if buffer_capacity is exceeded,
    # replacing old records
    index = self.buffer_counter % self.buffer_capacity

    self.state_buffer[index] = obs_tuple[0]
    self.action_buffer[index] = obs_tuple[1]
    self.reward_buffer[index] = obs_tuple[2]
    self.next_state_buffer[index] = obs_tuple[3]

    self.buffer_counter += 1




import random
import numpy as np
from collections import deque
import tensorflow as tf
from keras.models import Sequential
from keras.callbacks import History 
from keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from keras import backend as K
from tensorflow.keras import layers
import keras.backend as K
import import_ipynb
from Noise import OUActionNoise
import tensorflow as tf
keras = tf.keras  
#tf.compat.v1.disable_eager_execution()
class DQLearningAgent:
    def __init__(self, seed ,discount_factor =0.95):

        self.tau = 0.05
        self.gamma = discount_factor

        self.critic_lr = 0.002
        self.actor_lr = 0.001
        self.std_dev = [0.7,0.7,0.2,0.2]
        self.buffer = Buffers(50000, 64)
        self.M = 16
        
        self.upper_bound = [40,40,self.M-1 ,self.M-1 ]
        self.lower_bound = [5,5,0,0]
        self.action_scale = (np.array(self.upper_bound) - np.array(self.lower_bound)) / 2.0
        self.action_bias = (np.array(self.upper_bound) + np.array(self.lower_bound)) / 2.0

        
        self._state_size = 8 # unchange
        self._action_size = 4

        self.seed = seed
#         random.seed(self.seed)
#         np.random.seed(self.seed)
        
        self.actor_model = self.get_actor()
        self.critic_model = self.get_critic()
        self.target_actor = self.get_actor()
        self.target_critic = self.get_critic()

        # Making the weights equal initially
        self.target_actor.set_weights(self.actor_model.get_weights())
        self.target_critic.set_weights(self.critic_model.get_weights())
        
        self.critic_optimizer = tf.keras.optimizers.Adam(self.critic_lr)
        self.actor_optimizer = tf.keras.optimizers.Adam(self.actor_lr)
        
        
        self.ou_noise = OUActionNoise(mean=np.zeros(self._action_size ), std_deviation=np.array(self.std_dev))
        



    def get_actor(self):
        # Initialize weights between -3e-3 and 3-e3
        last_init = tf.random_uniform_initializer(minval=-0.003, maxval=0.003)
        inputs = layers.Input(shape=(self._state_size,))
        out = layers.Dense(28, activation=keras.layers.LeakyReLU(alpha=0.01))(inputs)
#         out = layers.Dense(28,activation=keras.layers.LeakyReLU(alpha=0.01))(out)
#         out = layers.Dense(28, activation=keras.layers.LeakyReLU(alpha=0.01))(out)
        out = layers.Dense(28, activation=keras.layers.LeakyReLU(alpha=0.01))(out)
        outputs = layers.Dense(self._action_size, activation="tanh", kernel_initializer=last_init)(out)
        def antirectifier(x):
            outputs = self.action_scale*x + self.action_bias
            return outputs 
        outputs = layers.Lambda(antirectifier )(outputs)
        model = tf.keras.Model(inputs, outputs)
        
        return model
    
    def get_critic(self):
        # State as input
        state_input = layers.Input(shape=(self._state_size))
#         state_out = layers.Dense(28, activation="relu")(state_input)


        # Action as input
        action_input = layers.Input(shape=(self._action_size))
#         action_out = layers.Dense(16, activation="relu")(action_input)

        # Both are passed through seperate layer before concatenating
        concat = layers.Concatenate()([state_input, action_input])

        out = layers.Dense(28, activation=keras.layers.LeakyReLU(alpha=0.01))(concat)
#         out = layers.Dense(28, activation=keras.layers.LeakyReLU(alpha=0.01))(out)
#         out = layers.Dense(28, activation=keras.layers.LeakyReLU(alpha=0.01))(out)
        out = layers.Dense(28, activation=keras.layers.LeakyReLU(alpha=0.01))(out)
        outputs = layers.Dense(1)(out)
        # Outputs single value for give state-action
        model = tf.keras.Model([state_input, action_input], outputs)
        return model
    
    
    def learn(self):
        # Get sampling range
        record_range = min(self.buffer.buffer_counter, self.buffer.buffer_capacity)
        # Randomly sample indices
        batch_indices = np.random.choice(record_range, self.buffer.batch_size)
#         print(self.buffer.action_buffer[batch_indices].shape)

        # Convert to tensors
        state_batch = tf.convert_to_tensor(self.buffer.state_buffer[batch_indices])
        action_batch = tf.convert_to_tensor(self.buffer.action_buffer[batch_indices])
        reward_batch = tf.convert_to_tensor(self.buffer.reward_buffer[batch_indices])
        reward_batch = tf.cast(reward_batch, dtype=tf.float32)
        next_state_batch = tf.convert_to_tensor(self.buffer.next_state_buffer[batch_indices])

        return self.update(state_batch, action_batch, reward_batch, next_state_batch)
        
        
    def update(self, state_batch, action_batch, reward_batch, next_state_batch):
        with tf.GradientTape() as tape:
            target_actions_new = self.target_actor(next_state_batch)
            y = reward_batch + self.gamma * self.target_critic([next_state_batch,target_actions_new])
            q = self.critic_model([state_batch,action_batch])
            critic_loss = tf.math.reduce_mean(tf.math.square(y - q))
        critic_grad = tape.gradient(critic_loss, self.critic_model.trainable_variables)
        self.critic_optimizer.apply_gradients( zip(critic_grad, self.critic_model.trainable_variables))
        
        with tf.GradientTape() as tape:
            actions = self.actor_model(state_batch)
            critic_value = self.critic_model([state_batch , actions])
            actor_loss = -tf.math.reduce_mean(critic_value)
        actor_grad = tape.gradient(actor_loss , self.actor_model.trainable_variables)
        self.actor_optimizer.apply_gradients( zip(actor_grad, self.actor_model.trainable_variables))
        
        self.update_target(self.target_actor.variables , self.actor_model.variables)
        self.update_target(self.target_critic.variables , self.critic_model.variables)   
        
        return actor_loss,critic_loss
    
    
    def update_target(self,target_weights, weights):
        for (a, b) in zip(target_weights, weights):
            a.assign(b * self.tau + a * (1 - self.tau))
    

    def policy(self,state):
        sampled_actions = tf.squeeze(self.actor_model(state))
        noise = self.ou_noise()
        # Adding noise to action
        sampled_actions = sampled_actions.numpy() + noise

        # We make sure action is within bounds
        legal_action = np.clip(sampled_actions, self.lower_bound, self.upper_bound)

        return [np.squeeze(legal_action)]
  

Iam trying to implement DDPG algorithm that take a state of 8 values and output action of size=4.
The actions are lower bounded by [5,5,0,0] and upper bounded by [40,40,15,15].

When I train my DDPG it always choose one of the boundaries for example [5,40,0,15] or [40,40,0,0].
I implemented SAC algorithm after that and it works, knowing that I tried my DDPG agent on a gym game and it works. Maybe the problem is with upscaling the actions in the policy agent.

here have a look on the model I have

class Buffers:
def __init__(self, buffer_capacity=100000, batch_size=64):
    # Number of "experiences" to store at max
    self.buffer_capacity = buffer_capacity
    num_states = 8
    num_actions = 4
    # Num of tuples to train on.
    self.batch_size = batch_size

    # Its tells us num of times record() was called.
    self.buffer_counter = 0

    # Instead of list of tuples as the exp.replay concept go
    # We use different np.arrays for each tuple element
    self.state_buffer = np.zeros((self.buffer_capacity, num_states))
    self.action_buffer = np.zeros((self.buffer_capacity, num_actions))
    self.reward_buffer = np.zeros((self.buffer_capacity, 1))
    self.next_state_buffer = np.zeros((self.buffer_capacity, num_states))

# Takes (s,a,r,s') obervation tuple as input
def record(self, obs_tuple):
    # Set index to zero if buffer_capacity is exceeded,
    # replacing old records
    index = self.buffer_counter % self.buffer_capacity

    self.state_buffer[index] = obs_tuple[0]
    self.action_buffer[index] = obs_tuple[1]
    self.reward_buffer[index] = obs_tuple[2]
    self.next_state_buffer[index] = obs_tuple[3]

    self.buffer_counter += 1




import random
import numpy as np
from collections import deque
import tensorflow as tf
from keras.models import Sequential
from keras.callbacks import History 
from keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from keras import backend as K
from tensorflow.keras import layers
import keras.backend as K
import import_ipynb
from Noise import OUActionNoise
import tensorflow as tf
keras = tf.keras  
#tf.compat.v1.disable_eager_execution()
class DQLearningAgent:
    def __init__(self, seed ,discount_factor =0.95):

        self.tau = 0.05
        self.gamma = discount_factor

        self.critic_lr = 0.002
        self.actor_lr = 0.001
        self.std_dev = [0.7,0.7,0.2,0.2]
        self.buffer = Buffers(50000, 64)
        self.M = 16
        
        self.upper_bound = [40,40,self.M-1 ,self.M-1 ]
        self.lower_bound = [5,5,0,0]
        self.action_scale = (np.array(self.upper_bound) - np.array(self.lower_bound)) / 2.0
        self.action_bias = (np.array(self.upper_bound) + np.array(self.lower_bound)) / 2.0

        
        self._state_size = 8 # unchange
        self._action_size = 4

        self.seed = seed
#         random.seed(self.seed)
#         np.random.seed(self.seed)
        
        self.actor_model = self.get_actor()
        self.critic_model = self.get_critic()
        self.target_actor = self.get_actor()
        self.target_critic = self.get_critic()

        # Making the weights equal initially
        self.target_actor.set_weights(self.actor_model.get_weights())
        self.target_critic.set_weights(self.critic_model.get_weights())
        
        self.critic_optimizer = tf.keras.optimizers.Adam(self.critic_lr)
        self.actor_optimizer = tf.keras.optimizers.Adam(self.actor_lr)
        
        
        self.ou_noise = OUActionNoise(mean=np.zeros(self._action_size ), std_deviation=np.array(self.std_dev))
        



    def get_actor(self):
        # Initialize weights between -3e-3 and 3-e3
        last_init = tf.random_uniform_initializer(minval=-0.003, maxval=0.003)
        inputs = layers.Input(shape=(self._state_size,))
        out = layers.Dense(28, activation=keras.layers.LeakyReLU(alpha=0.01))(inputs)
#         out = layers.Dense(28,activation=keras.layers.LeakyReLU(alpha=0.01))(out)
#         out = layers.Dense(28, activation=keras.layers.LeakyReLU(alpha=0.01))(out)
        out = layers.Dense(28, activation=keras.layers.LeakyReLU(alpha=0.01))(out)
        outputs = layers.Dense(self._action_size, activation="tanh", kernel_initializer=last_init)(out)
        def antirectifier(x):
            outputs = self.action_scale*x + self.action_bias
            return outputs 
        outputs = layers.Lambda(antirectifier )(outputs)
        model = tf.keras.Model(inputs, outputs)
        
        return model
    
    def get_critic(self):
        # State as input
        state_input = layers.Input(shape=(self._state_size))
#         state_out = layers.Dense(28, activation="relu")(state_input)


        # Action as input
        action_input = layers.Input(shape=(self._action_size))
#         action_out = layers.Dense(16, activation="relu")(action_input)

        # Both are passed through seperate layer before concatenating
        concat = layers.Concatenate()([state_input, action_input])

        out = layers.Dense(28, activation=keras.layers.LeakyReLU(alpha=0.01))(concat)
#         out = layers.Dense(28, activation=keras.layers.LeakyReLU(alpha=0.01))(out)
#         out = layers.Dense(28, activation=keras.layers.LeakyReLU(alpha=0.01))(out)
        out = layers.Dense(28, activation=keras.layers.LeakyReLU(alpha=0.01))(out)
        outputs = layers.Dense(1)(out)
        # Outputs single value for give state-action
        model = tf.keras.Model([state_input, action_input], outputs)
        return model
    
    
    def learn(self):
        # Get sampling range
        record_range = min(self.buffer.buffer_counter, self.buffer.buffer_capacity)
        # Randomly sample indices
        batch_indices = np.random.choice(record_range, self.buffer.batch_size)
#         print(self.buffer.action_buffer[batch_indices].shape)

        # Convert to tensors
        state_batch = tf.convert_to_tensor(self.buffer.state_buffer[batch_indices])
        action_batch = tf.convert_to_tensor(self.buffer.action_buffer[batch_indices])
        reward_batch = tf.convert_to_tensor(self.buffer.reward_buffer[batch_indices])
        reward_batch = tf.cast(reward_batch, dtype=tf.float32)
        next_state_batch = tf.convert_to_tensor(self.buffer.next_state_buffer[batch_indices])

        return self.update(state_batch, action_batch, reward_batch, next_state_batch)
        
        
    def update(self, state_batch, action_batch, reward_batch, next_state_batch):
        with tf.GradientTape() as tape:
            target_actions_new = self.target_actor(next_state_batch)
            y = reward_batch + self.gamma * self.target_critic([next_state_batch,target_actions_new])
            q = self.critic_model([state_batch,action_batch])
            critic_loss = tf.math.reduce_mean(tf.math.square(y - q))
        critic_grad = tape.gradient(critic_loss, self.critic_model.trainable_variables)
        self.critic_optimizer.apply_gradients( zip(critic_grad, self.critic_model.trainable_variables))
        
        with tf.GradientTape() as tape:
            actions = self.actor_model(state_batch)
            critic_value = self.critic_model([state_batch , actions])
            actor_loss = -tf.math.reduce_mean(critic_value)
        actor_grad = tape.gradient(actor_loss , self.actor_model.trainable_variables)
        self.actor_optimizer.apply_gradients( zip(actor_grad, self.actor_model.trainable_variables))
        
        self.update_target(self.target_actor.variables , self.actor_model.variables)
        self.update_target(self.target_critic.variables , self.critic_model.variables)   
        
        return actor_loss,critic_loss
    
    
    def update_target(self,target_weights, weights):
        for (a, b) in zip(target_weights, weights):
            a.assign(b * self.tau + a * (1 - self.tau))
    

    def policy(self,state):
        sampled_actions = tf.squeeze(self.actor_model(state))
        noise = self.ou_noise()
        # Adding noise to action
        sampled_actions = sampled_actions.numpy() + noise

        # We make sure action is within bounds
        legal_action = np.clip(sampled_actions, self.lower_bound, self.upper_bound)

        return [np.squeeze(legal_action)]
  

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文