Warm-up 방식의 학습 방법, 학습률을 높였다가 낮췄다가, 다시 높였다가 낮췄다가 등 학습 과정에서 다양한 학습률로 local optima를 빠져나오도록 장치하는 방법이 fixed learning rate 방법보다 성능이 더 좋다는 것은 이미 오래전부터 논문을 통해 증명되어 왔습니다.

Cosine Annealing을 사용하면 learning rate가 어떻게 변하는지 알아보겠습니다.

설명하는 코드는 케라스 콜백과 같이 등록하여 사용하면 됩니다.


import tensorflow as tf
import tensorflow.keras.backend as backend
import math

# CosineAnneling Example.
class CosineAnnealingLearningRateSchedule(tf.keras.callbacks.Callback):
    # constructor
    def __init__(self, n_epochs, n_cycles, lrate_max, min_lr, verbose = 0):
        self.epochs = n_epochs
        self.cycles=  n_cycles
        self.lr_max = lrate_max
        self.min_lr = min_lr
        self.lrates = list()
    
    # caculate learning rate for an epoch
    def cosine_annealing(self, epoch, n_epochs, n_cycles, lrate_max):
        # 전체 epoch / 설정 cycle 수만큼 cycle을 반복합니다.
        epochs_per_cycle = math.floor(n_epochs/n_cycles)
        cos_inner = (math.pi * (epoch % epochs_per_cycle)) / (epochs_per_cycle)
        
        return lrate_max/2 * (math.cos(cos_inner) + 1)
  
    # calculate and set learning rate at the start of the epoch
    def on_epoch_begin(self, epoch, logs = None):
        if(epoch < 101):
            # calculate learning rate
            lr = self.cosine_annealing(epoch, self.epochs, self.cycles, self.lr_max)
            print('\nEpoch %05d: CosineAnnealingScheduler setting learng rate to %s.' % (epoch + 1, lr))
        # 101번째 epoch부터는 해당 설정한 min_lr을 사용
        else:
            lr = self.min_lr
            
        #     elif((epoch >= 65) and (epoch < 75)):
        #       lr = 1e-5
        #       print('\n No CosineAnnealingScheduler set lr 1e-5')
        #     elif((epoch >= 75) and (epoch < 85)):
        #       lr = 1e-6
        #       print('\n No CosineAnnealingScheduler set lr 1e-6')
        #     elif((epoch >= 85)):
        #       lr = 1e-7
        #       print('\n No CosineAnnealingScheduler set lr 1e-7')

        # set learning rate
        # 아래 예제 코드 실행을 위해선 밑 코드를 주석 처리 해주세요.
        backend.set_value(self.model.optimizer.lr, lr)
        # log value
        self.lrates.append(lr)

위 코드를 사용하면, 아래와 같은 학습률 변화를 볼 수 있습니다.

cosine_schedule = CosineAnnealingLearningRateSchedule(n_epochs = 100, n_cycles = 5, lrate_max = 1e-3, min_lr = 1e-6)

for i in range(1, 100 + 1):
    cosine_schedule.on_epoch_begin(i)
    
import matplotlib.pyplot as plt

plt.plot(cosine_schedule.lrates)
plt.title('Cosine Annealing_Toy')
plt.xlabel('epochs'); plt.ylabel('learning_rate')
plt.grid()
plt.show()

n_cycle을 5로 지정한만큼, 20(100/5) 수를 기준으로 cycle이 반복되고 있습니다. 

사실 텐서플로우를 사용한다면 위처럼 직접 정의하여 사용하지 않아도 됩니다.
텐서플로우 공식 홈페이지를 보면 이미 학습률을 조절할 수 있는 다양한 방법들을 제공하고 있기 때문에 가져다 사용하면 됩니다.
(CosineDecayRestarts, CosineDecay 등)

다음 글에서는 텐서플로우에서 제공하는 함수를 사용하여 MNIST 데이터셋에 적용해보겠습니다.

load_model() 함수를 사용하면, h5 또는 hdf5로 저장된 모델 구조, 가중치를 한꺼번에 불러올 수 있습니다.

model = load_model('your saved model path')

그런데 만약 모델에 커스텀 객체가 포함되어 있다면, 커스텀 객체를 명시해주지 않는 경우 다음과 같은 에러가 발생할 수 있습니다.

이를 알아보기 전에, 케라스에서 커스텀 객체를 선언하는 방법은 다음과 같습니다.

커스텀 객체 선언

def Mish(x):
    return x * K.tanh(K.softplus(x))

get_custom_objects().update({'mish': Mish})

Mish Activation 함수를 커스텀 객체로 선언하고 사용한 모델을 load_model() 함수를 사용하여 불러올 때, 커스텀 객체를 명시해주지 않으면(인자로 전달하지 않으면) 다음과 같은 에러를 만날 수 있습니다.

ex) Mish Activation 함수를 커스텀 객체로 선언하고 사용한 모델일 경우

ValueError: Unknown activation function:Mish

모델에선 Mish Activation 함수를 사용하여 구조가 형성되어 있는데, 로드시 이에 대한 정보를 넘겨주지 않았기 때문에 발생합니다. 따라서, 이를 해결하기 위해 다음과 같이 인자로 전달해주면 쉽게 해결할 수 있습니다.

 

커스텀 객체를 포함한 모델 로드

model = load_model('./model/saved_model.hdf5', custom_objects={'Mish':Mish}

'Mish'는 커스텀 객체 선언 시 사용한 객체의 이름이고, Mish는 해당 객체를 넘겨주는 것입니다.

시계열 데이터를 다룰 때 사용하면 매우 유용합니다.
시계열 데이터를 다룰 때 다음 함수와 비슷한 것들을 직접 정의하여 sequence를 만들어주어야 하는 번거로움이 있습니다. 

def make_sequence(data, n):
    X, y = list(), list()
    
    for i in range(len(data)):
        _X = data.iloc[i:(i + n), :-1]
        if(i + n) < len(data):
            X.append(np.array(_X))
            y.append(data.iloc[i + n, -1])
        else:
            break
            
    return np.array(X), np.array(y)

tf.data를 사용하면 여러 줄로 구성되어 있는 위의 코드가 단 하나의 함수로 해결됩니다.

dataset = tf.data.Dataset.range(10)
dataset = dataset.window(5, shift=1)
for window_dataset in dataset:
  for val in window_dataset:
    print(val.numpy(), end=" ")
  print()
  • dataset.window의 첫 번째 인자는 window size이고, 두 번째는 shift 크기를 전달합니다.
  • 결과는 다음과 같습니다.

0 1 2 3 4 
1 2 3 4 5 
2 3 4 5 6 
3 4 5 6 7 
4 5 6 7 8 
5 6 7 8 9 
6 7 8 9 
7 8 9 
8 9 

결과에서 window_size = 5만큼의 데이터를 얻다가, 끝 부분에서 [6, 7, 8, 9], [7, 8, 9], ... 의 원치않는 결과를 얻고 있습니다.
이는 가져오려는 window_size가 데이터셋의 크기를 초과했기 때문에 그렇습니다.

이를 방지하기 위해 drop_remainder = True 인자를 사용합니다.

dataset = tf.data.Dataset.range(10)
dataset = dataset.window(5, shift=1, drop_remainder=True)
for window_dataset in dataset:
  for val in window_dataset:
    print(val.numpy(), end=" ")
  print()
  • for-loop를 2중으로 사용하는 이유는 dataset.windowTensor가 아닌 Dataset을 반환하기 때문입니다.
  • 이는 flat_map 함수를 사용해서 window_dataset을 flat해주어 바로 사용할 수 있습니다.
  • 이 말은 쉽게 설명하면 원래 같은 경우 5 -> 4 -> 3 -> 처럼 iter 형식으로 받을 수 있었는데, flat_map을 사용하면 [5, 4, 3, 2, 1]로 바로 받을 수 있습니다.
dataset = tf.data.Dataset.range(10)
dataset = dataset.window(5, shift=1, drop_remainder=True)
dataset = dataset.flat_map(lambda window: window.batch(5))
for window in dataset:
  print(window.numpy())

마지막으로 다음과 같이 사용할 수도 있습니다.

dataset = tf.data.Dataset.range(10)
dataset = dataset.window(5, shift=1, drop_remainder=True)
dataset = dataset.flat_map(lambda window: window.batch(5))
dataset = dataset.map(lambda window: (window[:-1], window[-1:]))
for x,y in dataset:
  print(x.numpy(), y.numpy())
  • 결과는 다음과 같습니다.
  • [0 1 2 3] [4]
    [1 2 3 4] [5]
    [2 3 4 5] [6]
    [3 4 5 6] [7]
    [4 5 6 7] [8]
    [5 6 7 8] [9]

 

reference

https://www.tensorflow.org/guide/data

 

tf.data: Build TensorFlow input pipelines  |  TensorFlow Core

The tf.data API enables you to build complex input pipelines from simple, reusable pieces. For example, the pipeline for an image model might aggregate data from files in a distributed file system, apply random perturbations to each image, and merge random

www.tensorflow.org

 

https://www.youtube.com/watch?v=51YtxSH-U3Y&list=PLQY2H8rRoyvzuJw20FG82Lgm2SZjTdIXU&index=7


최근 텐서플로우는 파이토치 때문에 연구에는 불편하다는 인식이 있습니다(개인적인 의견일 수도..).

이번 영상에서는 텐서플로우가 효율적인 연구를 위해 제공하는 기능을 알아보도록 하겠습니다.

 

파라미터의 상태를 제어한다는 것은 연구에서 매우 중요한 작업입니다.
예를 들어, 케라스 Dense layer의 파라미터나 bias는 층에 저장되어 있긴 하지만, 여전히 state를 다루기엔 매우 불편합니다.

더욱 편리한 제어를 위해 tf.variable_creator_scope를 사용합니다.

class FactorizedVariable(tf.Module):
    def __init__(self, a, b):
        self.a = a
        self.b = b

tf.register_tensor_conversion_function(
  FactorizedVariable, lambda x, *a, **k: tf.matmul(x.a, x.b))

def scope(next_creator, **kwargs):
    shape = kwargs['initial_value']().shape
    if len(shape) != 2: return next_creator(**kwargs)
    return FactorizedVariable(tf.Variable(tf.random.normal([shape[0], 2])),
                                         tf.Variable(tf.random.normal([2, shape[1]])))

with tf.variable_creator_scope(scope):
    d = tf.keras.layer.Dense(10)
    d(tf.zeros[20, 10])
assert isinstance(d.kernel, FactorizedVariable)
  • 먼저, 저장하고 싶은 값을 선택하고, tf.Module을 상속받은 클래스를 정의합니다.
    tf.Module은 저장하고 싶은 변수를 자동으로 추적할 수 있도록 도와줍니다.

위의 코드는 매우 간단하지만, 실제로 사용하는 모델에서는 파라미터가 매우 많기 때문에 관리가 힘듭니다. 따라서 tf.variable_creator_scope를 사용하면 자동 추적 및 파라미터의 변화를 확인할 수 있기 때문에 매우 편리합니다.

딥러닝을 연구하는 데에 있어서 계산 속도는 매우 중요합니다. 텐서플로우는 TensorFlow compiler, XLA 등을 통해 빠른 연산 속도를 지원하고 있습니다. 더욱 효과적으로 사용하려면 @tf.function(experimental_compile=True)를 사용하세요.

활성화 함수의 예를 보겠습니다. 활성화 함수에서는 element-wise 연산 때문에 속도 측면에서 부정적인 영향을 줄지도 모릅니다.
다음 예제 코드에서 속도 차이를 볼 수 있습니다.

def f(x):
    return tf.math.log(2*tf.exp(tf.nn.relu(x+1)))

c_f = tf.function(f, experimental_compile=True)
c_f(tf.zeros([100, 100]))

f = tf.function(f)
f(tf.zeros([100, 100]))

print(timeit.timeit(lambda: f(tf.zeros([100, 100])), number = 10))
# 0.007

print(timeit.timeit(lambda: c_f(tf.zeros([100, 100])), number = 10))
# 0.005 -- ~25% faster!
  • tf.function 사용은 동일합니다. 단지, experimental_compile=True를 추가합니다.
  • linear operations가 포함된 함수나 Bert를 포함한 large-scale 모델에서 효과를 볼 수 있습니다.

element-wise 연산은 옵티마이저에서도 매우 빈번하게 일어납니다. @tf.function을 옵티마이저 코드에 추가한다면 효과를 볼 수 있습니다.
다음은 직접 옵티마이저를 정의해서 @tf.function을 사용하는 예제입니다.

class MyOptimizer(tf.keras.optimizers.Optimizer):
    def __init__(self, lr, power, avg):
        super().__init__(name="MyOptimizer")
        self.lrate, self.pow, self.avg = lr, power, avg
        
    def get_config(self): pass
    def _create_slots(self, var_list):
        for v in var_list: self.add_slot(v, "accum", tf.zeros_like(v))
    
    @tf.function(experimental_compile=True)
    def _resource_apply_dense(self, grad, var, apply_state = None):
        acc = self.get_slot(var, "accum")
        acc.assign(self.avg * tf.pow(grad, self.pow) + (1-self.avg) * acc)
        
        return var.assign_sub(self.lrate * grad/tf.pow(acc, self.pow))

 

다음은 Vectorization을 이야기해보겠습니다. 이는 성능 향상을 위해 매우~! 중요한 지표입니다.
머신 러닝 모델을 다루기 위해 Vectorization이 중요하다는 것은 이미 다 알고 있는 사실이지만, 다루기가 어렵습니다.

그래서 텐서플로우는 이를 위해 auto-Vectorization을 제공합니다. 이 기능은 element-wise 연산이나 batch computation에서 성능 향상을 위해 사용될 것입니다.

Jacobian 연산을 수행하는 예제 코드입니다. jacobian은 미분값을 저장해놓은 행렬입니다.
이를 위해선 tf.GradientTape에서 tape.gradient를 무수히 호출해야하고, 다수의 for-loop를 사용하고, Tensor를 쌓아야 합니다.
이러한 과정을 거치는 코드는 언제나 작동하지만, 좀 더 효율적으로 다룰 수 있는 방법을 텐서플로우가 제공합니다.

tf.vectorized_map을 사용하는 것입니다.

x = tf.random.normal([10, 10])

with tf.GradientTape(persistent=True) as t:
    t.watch(x)
    y = tf.exp(tf.matmul(x, x))
    jac = tf.vectorized_map(
                            lambda yi: tf.vectorized_map(
                            lambda yij: t.gradient(yij, x), yi), y)
  • tf.vectorized_map을 사용하면 빠른 속도로 연산을 수행할 수 있습니다. 하지만 코드가 복잡합니다.
  • 텐서플로우는 이를 위해 jacobian을 아예 함수로 제공합니다.
x = tf.random.normal([10, 10])

with tf.GradientTape() as t:
    t.watch(x)
    y = tf.exp(tf.matmul(x, x))
jac = t.jacobian(y, x)
  • 제공하는 jacobian을 사용하면, 기존 코드보다 10배는 빠르다고 합니다.

마지막으로 데이터에 관한 이야기입니다.
텐서플로우를 사용하는 우리는 항상 매우 커다란 크기의 array를 다루게 됩니다.

 

또, 머신 러닝 모델을 다루다보면 서로 다른 타입의 데이터를 다루기도 합니다. type도 다르고, shape 다르고...
예를 들어, 텐서플로우는 다음과 같은 예를 임베딩 형태로 만들어 줍니다.

텐서플로우는 서로 다른 길이의 데이터를 다루기 위해 ragged tensor 형태를 사용합니다.

data = [['this', 'is', 'a', 'sentence'],
       ['another', 'one'],
       ['a', 'somewhat', 'longer', 'one', ',', 'this']]

rt = tf.ragged.constant(data)
vocab = tf.lookup.StaticVocabularyTable(
    tf.lookup.KeyValueTensorInitializer(
    ['This', 'is', 'a', 'sentence', 'another', 'one', 'somewhat', 'longer'],
    tf.range(8, dtype = tf.int64)), 1)

rt = tf.ragged.map_flat_values(lambda x:vocab.lookup(x), rt)
embedding_table = tf.Variable(tf.random.normal([9, 10]))
rt = tf.gather(embedding_table, rt)
tf.math.reduce_mean(rt, axis = 1)
# Result has shape (3, 10)

길이가 다르고, type이 다르면 tf.ragged를 사용하세요!