I want to have adaptive learning rate based on time steps instead of epochs unlike most of the schedulers are based. I have a model as:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras import layers
from tensorflow.keras.optimizers import Adam
class DQNagent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.model = self.build_model() # original model
self.target_model = self.build_model() # target model
self.lr = 1e-2
def build_model(self):
x_in = layers.Input(shape=(self.step_size, self.state_size))
x_out = layers.Dense(20, activation='relu')(x_in)
output = layers.Dense(self.action_size, activation='linear')(x_out)
self.learning_rate = CustomSchedule()
opt = tf.keras.optimizers.Adam(self.learning_rate)
model = Model(inputs=x_in, outputs=output_y, name="DQN")
model.compile(loss=['mse'], optimizer=opt)
return model
and I want to make a scheduler as something like this:
class CustomSchedule:
def __init__(self, lr=1e-2):
super(CustomSchedule, self).__init__()
self.lr = lr
self.t = 0
def __call__(self):
self.t +=1
if self.t % 100 ==0:
self.lr /= 10
return self.lr
and my main code without declaration of everything has something like this:
dqn = DQNagent(state_size, action_size)
for step in range(1000):
states_all = np.array([[[0, 0, 1],[1,0,1], [0, -1, 1], [1,-1,1]]])
Q_values = dqn.model.predict(state_all)[0]
# training
batch = memory.sample(batch_size)
batch_states = utils.get_states_user(batch) # assuming I have generated states using this method
Q_states = dqn.model.predict(batch_states) # assuming I have sampled batch states
dqn.model.fit(batch_states, Q_states, verbose =0)
I want to schedule my learning rate in a way that if my lets say step%100==0
the learning rate is decreased as learning_rate/10
. Seems like for the CustomSchedule
class that I have created, I will have to recompiled the model
which doesn't seem efficient to save and load weights. Is there any other way I can do this?
EDITS:
I have edited my code as following @FedericoMalerba answer
Created a decay_func
as:
def decay_func(step, lr):
return lr/10**(step/100)
then I added followings changes to my DQNAgent
class:
class DQNAgent():
def __init__(self, state_size, action_size):
self.lr = 1e-2
self.t_step = tf.Variable(0, trainable=False, name='Step', dtype=tf.int64)
self.decaying_lr = partial(decay_func, step=self.step, lr=self.lr)
def __call__(self):
self.step.assign_add(1)
return self.step
and called dqn()
in my main code for every step. The callable decaying_lr
is is passed to the optimiser in build_model()
as
opt = tf.keras.optimizers.Adam(self.decaying_lr)