Learning Rate WarmUp은 많은 논문에서 사용하고 있는 유명한 기법입니다.

WarmUp 방법을 통해 학습률은 시간이 지남에 따라 아래 그림처럼 변화합니다.

 

구현은 아래 두 가지 코드(scheduler, callback 버전)을 참고하시고, decay_fn 등 하이퍼파라미터는 알맞게 변경해서 사용하면 됩니다.

 

Scheduler 버전

class LRSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, init_lr, warmup_epoch,
                 steps_per_epoch,
                 decay_fn, *,
                 continue_epoch = 0):
        self.init_lr = init_lr
        self.decay_fn = decay_fn
        self.warmup_epoch = warmup_epoch
        self.continue_epoch = continue_epoch
        self.steps_per_epoch = steps_per_epoch
        self.lr = 1e-4 # remove

    def on_epoch_begin(self, epoch):
        epoch = tf.cast(epoch, tf.float64)
        
        global_epoch = tf.cast(epoch + 1, tf.float64)
        warmup_epoch_float = tf.cast(self.warmup_epoch, tf.float64)
        
        lr = tf.cond(
            global_epoch < warmup_epoch_float,
            lambda: tf.cast(self.init_lr * (global_epoch / warmup_epoch_float), tf.float64),
            lambda: tf.cast(self.decay_fn(epoch - warmup_epoch_float), tf.float64)
        )
        self.lr = lr
    
    def __call__(self, step):
        def compute_epoch(step):
            return step // self.steps_per_epoch
        
        epoch = compute_epoch(step)
        epoch = epoch + self.continue_epoch
        
        self.on_epoch_begin(epoch)
        
        return self.lr

def get_steps(x_size, batch_size):
    if x_size / batch_size == 0:
        return x_size // batch_size
    else:
        return x_size // batch_size + 1

# data_size: train_set 크기
data_size = 100000
BATCH_SIZE = 512
EPOCHS = 100
warmup_epoch = int(EPOCHS * 0.1)
init_lr = 0.1
min_lr = 1e-6
power = 1.
    
lr_scheduler = tf.keras.optimizers.schedules.PolynomialDecay(
    initial_learning_rate = init_lr,
    decay_steps = EPOCHS - warmup_epoch,
    end_learning_rate = min_lr,
    power = power
)

# get_steps: epoch당 전체 step 수 계산
lr_schedule = LRSchedule(init_lr, warmup_epoch,
                         steps_per_epoch = get_steps(data_size, BATCH_SIZE),
                         decay_fn = lr_scheduler,
                         continue_epoch = 0)

# 사용 예시
optimizer = tf.keras.optimizers.Adam(learning_rate = lr_schedule)

 

Callback 버전

class LRSchedule(tf.keras.callbacks.Callback):
    def __init__(self, init_lr, warmup_epoch, decay_fn):
        self.init_lr = init_lr
        self.decay_fn = decay_fn
        self.warmup_epoch = warmup_epoch
        self.lrs = []

    def on_epoch_begin(self, epoch, logs = None):
        global_epoch = tf.cast(epoch + 1, tf.float64)
        warmup_epoch_float = tf.cast(self.warmup_epoch, tf.float64)

        lr = tf.cond(
                global_epoch < warmup_epoch_float,
                lambda: init_lr * (global_epoch / warmup_epoch_float),
                lambda: self.decay_fn(global_epoch - warmup_epoch_float),
                )

        tf.print('learning rate: ', lr)
        tf.keras.backend.set_value(self.model.optimizer.lr, lr)
        
        self.lrs.append(lr)
        
        
epochs = 1000
warmup_epoch = int(epochs * 0.1)
init_lr = 0.1
min_lr = 1e-6
power = 1.
    
lr_scheduler = tf.keras.optimizers.schedules.PolynomialDecay(
    initial_learning_rate = init_lr,
    decay_steps = epochs - warmup_epoch,
    end_learning_rate = min_lr,
    power = power
)

# lr_schedule = LRSchedule(init_lr = init_lr,
#                          warmup_epoch = warmup_epoch,
#                          decay_fn = lr_scheduler)

# for i in range(epochs):
#     lr_schedule.on_epoch_begin(i)

# 사용 예시
model.fit(..., callbacks = [LRSchedule(init_lr = init_lr,
                                      warmup_epoch = warmup_epoch,
                                      decay_fn = lr_scheduler)],
          initial_epoch = 0)

 

결론부터 말하자면, Gradient Accumulation 방법은 GPU memory issue를 보완하기 위한 방법입니다.

배치 크기는 성능에 영향을 주는 중요한 하이퍼파라미터 중 하나인데요.
이 때문에 배치 크기(Batch Size)와 성능(Performance) 관계는 이전부터 많은 방법(다양한 조건에서의 실험, 수학적 풀이, 최적화 관점 등)으로 연구되어 왔습니다.

대표적인 논문만 간단히 보자면,

위 논문뿐만 아니라 관련 논문을 보면 아래와 같이 결론을 얻을 수 있습니다.

    ① LB(Large Batch)는 학습 속도도 빠르고, 성능도 좋을 수 있다
    ② LB 일수록 더 큰 학습률(Learning Rate)를 사용하면 좋다 (하지만 큰 학습률을 사용하면 수렴이 안되는 문제가?)
    ③ 하지만 LB보다 SB(Small Batch)가 안정적으로 학습할 수 있어 일반화(Generalization)에 강하다

Gradient Accumulation 방법이 배치 크기와 관련이 있다보니 간단하게 배치 크기 관련 이야기를 해보았는데, 이번 글은 어떤 이유에서든 "비록 가지고 있는 GPU memory가 제한적이지만 일단 LB를 사용해서 학습시키고 싶다."라는 생각입니다.

Gradient Accumulation?

큰 배치 크기를 활용할때 가장 큰 문제는 '성능이 좋아질 수 있다', '안좋아질 수 있다'가 아닐겁니다. 아마도 가장 중요하게 고려하는 것은 지금 보유하고 있는 GPU를 가지고 '1,024, 2,048, ... 처럼 큰 배치 크기를 돌려볼 수 있느냐'입니다.

누구나 다 겪는 문제인데요. 만약 이와 같은 문제에 직면했다면, 이를 해결하기 위한 방법 중 하나로 Gradient Accumulation 방법을 생각해볼 수 있습니다. 가장 좋은 방법은 돈을 들이는 것...

일반적인 학습 방법과 Gradient Accumulation 방법의 차이는 아래 그림에서 바로 확인할 수 있습니다.

General Training vs Training with GA

일반적인 학습 방법이 미니 배치를 통해 gradient를 구한 후, Update를 즉시 진행하는 방법이라면,

Gradient Accumulation 방법을 포함한 학습은 미니 배치를 통해 구해진 gradient를 n-step동안 Global Gradients에 누적시킨 후, 한번에 업데이트하는 방법입니다.

매우 간단합니다. 즉, 핵심 아이디어는 256 mini-batch를 통해 gradient를 업데이트하는 것과 64 mini-batch를 4번(64 * 4 = 256) 누적하여 업데이트하는 것이 비슷한 결과를 가져다 줄 것이라는 생각입니다.

이 예와 같다면, 우리가 보유하고 있는 GPU memory 한계상 64 mini-batch 밖에 사용할 수 없다면, 이 학습 방법을 통해 4번 누적하여 업데이트할 경우 256 mini-batch를 사용하여 학습하는 것과 다름없게 되는 셈이죠.

하지만 이 방법은 memory issue를 중점적으로 해결하고자 하는 방법이지 속도, 성능과는 아직 explicit하게 증명된 바가 없습니다.

아래의 간단한 구현을 통해 위에서 글로서 살펴본 것보다 좀 더 쉽게 이해할 수 있을 거에요.


전체 코드는 깃허브_저장소에 있습니다.

Setup

from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Input, Dense, Conv2D, MaxPooling2D, Flatten
from tensorflow.keras.models import Model
import tensorflow as tf

print(tf.__version__)

MNIST Dataset Laod

(x_train, y_train), (x_test, y_test) = mnist.load_data()

Make TF Dataset

def make_datasets(x, y):
    # (28, 28) -> (28, 28, 1)
    def _new_axis(x, y):
        y = tf.one_hot(y, depth = 10)
        
        return x[..., tf.newaxis], y
            
    ds = tf.data.Dataset.from_tensor_slices((x, y))
    ds = ds.map(_new_axis, num_parallel_calls = tf.data.experimental.AUTOTUNE)
    ds = ds.shuffle(100).batch(32) # 배치 크기 조절하세요
    ds = ds.prefetch(tf.data.experimental.AUTOTUNE)
    
    return ds
    
ds = make_datasets(x_train, y_train)

Make Models

# rescaling, 1 / 255
preprocessing_layer = tf.keras.models.Sequential([
        tf.keras.layers.experimental.preprocessing.Rescaling(1./255),
    ])

# simple CNN model
def get_model():
    inputs = Input(shape = (28, 28, 1))
    preprocessing_inputs = preprocessing_layer(inputs)
    
    x = Conv2D(filters = 32, kernel_size = (3, 3), activation='relu')(preprocessing_inputs)
    x = MaxPooling2D((2, 2))(x)
    x = Conv2D(filters = 64, kernel_size = (3, 3), activation='relu')(x)
    x = MaxPooling2D((2, 2))(x)
    x = Conv2D(filters = 64, kernel_size =(3, 3), activation='relu')(x)
    
    x = Flatten()(x)
    x = Dense(64, activation = 'relu')(x)
    outputs = Dense(10, activation = 'softmax')(x)
    
    model = Model(inputs = inputs, outputs = outputs)
    
    return model

model = get_model()
model.summary()

Training with Gradient Accumulation

epochs = 10
num_accum = 4 # 누적 횟수

loss_fn = tf.keras.losses.CategoricalCrossentropy(from_logits = True)
optimizer = tf.keras.optimizers.Adam()
train_acc_metric = tf.keras.metrics.CategoricalAccuracy()
@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x)
        loss_value = loss_fn(y, logits)
    gradients = tape.gradient(loss_value, model.trainable_weights)
    
    # update metrics    
    train_acc_metric.update_state(y, logits)
    
    return gradients, loss_value

def train():
    for epoch in range(epochs):
        print(f'################ Start of epoch: {epoch} ################')
        # 누적 gradient를 담기 위한 zeros_like 선언
        accumulation_gradients = [tf.zeros_like(ele) for ele in model.trainable_weights]
        
        for step, (batch_x_train, batch_y_train) in enumerate(ds):
            gradients, loss_value = train_step(batch_x_train, batch_y_train)
            
            if step % num_accum == 0:
                accumulation_gradients = [grad / num_accum for grad in accumulation_gradients]
                optimizer.apply_gradients(zip(gradients, model.trainable_weights))

                # zero-like init
                accumulation_gradients = [tf.zeros_like(ele) for ele in model.trainable_weights]
            else:
                accumulation_gradients = [(accum_grad + grad) for accum_grad, grad in zip(accumulation_gradients, gradients)]

            if step % 100 == 0:
                print(f"Loss at Step: {step} : {loss_value:.4f}")
            
        train_acc = train_acc_metric.result()
        print(f'Accuracy : {(train_acc * 100):.4f}%')
        train_acc_metric.reset_states()
        
# start training
train()

예전에 Pytorch의 Image augmentation 방법을 보고 커스텀이 되게 편리하게 구성된 것 같아 TensorFlow도 이와 같은 방법으로 augmentation하도록 만들어줬으면 좋겠다는 생각을 하고 있었는데, 연동 가능한 라이브러리 albumentation이 있었습니다.
(만들어진지 좀 됬는데 늦게 알게 됨..)

transforms.Compose([
	transforms.CenterCrop(10),
	transforms.ToTensor(),
])

torchvision의 augmentation 방법

사실 아래 공식 홈페이지 튜토리얼 코드를 보면 TensorFlow도 이와 같은 방법으로 Keras Layer를 활용한 augmentation이 가능토록 제공하고 있고, 앞으로도 이러한 방법으로 제공할 예정인가 싶습니다.

data_augmentation = tf.keras.Sequential([
  layers.experimental.preprocessing.RandomFlip("horizontal_and_vertical"),
  layers.experimental.preprocessing.RandomRotation(0.2),
])

출처: www.tensorflow.org/tutorials/images/data_augmentation?hl=ko


tf.image를 사용하는 방법

albumentation 라이브러리를 사용해보기 전에, tf.image를 사용해서 어떻게 augmentation 할 수 있는지 코드를 첨부합니다.

test = tf.data.Dataset.from_tensor_slices(dict(df))

# 이미지와 레이블을 얻습니다.
def get_image_label(dt):
    img_path = dt['image']
    
    image = tf.io.read_file(img_path)
    image = tf.image.decode_jpeg(image)
    image = tf.image.convert_image_dtype(image, tf.float32)
    image = tf.image.resize(image, [IMG_SIZE, IMG_SIZE])
    image = (image / 255.0)

    label = []
    
    for key in class_col:
        label.append(dt[key])
    
    return image, label

# data augment
# refer: https://www.tensorflow.org/tutorials/images/data_augmentation
def augment(image,label):
    # Add 6 pixels of padding
    image = tf.image.resize_with_crop_or_pad(image, IMG_SIZE + 6, IMG_SIZE + 6) 
    # Random crop back to the original size
    image = tf.image.random_crop(image, size=[IMG_SIZE, IMG_SIZE, 3])
    image = tf.image.random_brightness(image, max_delta=0.5) # Random brightness
    image = tf.clip_by_value(image, 0, 1)

    return image, label

    
dataset = test.map(get_image_label)
dataset = dataset.shuffle(50).map(augment).batch(4)

이 방법도 괜찮긴하지만 문제는 tf.Dataset 작동 구조를 알아야 정확히 쓸 수 있습니다.
map 함수 작동 방식이라던가, shuffle과 batch의 위치 등등..

확실히 위 방법보다 augmentation 방법을 Layer 구조로 가져가는게 훨씬 가독성이 좋아보이기도 합니다. 아래처럼요.

model = tf.keras.Sequential([
  resize_and_rescale,
  data_augmentation,
  layers.Conv2D(16, 3, padding='same', activation='relu'),
  layers.MaxPooling2D(),
  # Rest of your model
])

resize_and_rescale과 data_augmentation을 Sequential 안에 Layer 형태로 사용하고 있습니다.

하지만 아직 많이 쓰이고 있는 방법은 아닌 것 같기에 확실하게 자리잡기 전(?)까지 다른 방법을 사용해도 꽤나 무방합니다.

 

Albumentation 라이브러리를 사용하는 방법

사용하는 방법은 꽤나 단순합니다. 위에서 예제로 보여드렸던 코드와 형식이 동일하니까요.

먼저, 필요 라이브러리를 불러옵니다.

from tensorflow.keras.datasets import cifar10

import albumentations
import cv2
import numpy as np
import matplotlib.pyplot as plt
import cv2
# augmentation method를 import합니다.
from albumentations import (
    Compose, HorizontalFlip, CLAHE, HueSaturationValue,
    RandomBrightness, RandomContrast, RandomGamma,
    ToFloat, ShiftScaleRotate
)

torchvision의 Compose, TensorFlow의 Sequential과 Albumentation의 Compose의 사용되는 장소가 같습니다. 위에서 호출한 함수 외에도 다양한 augmentation 방법을 제공합니다. 더 궁금하면 공식 문서를 참조하고, 자세히 설명되어 있습니다.

albumentations.ai/docs/getting_started/image_augmentation/

이제 다양한 함수를 Compose안에 list 형태로 제공하고, 사용할 준비를 끝마칩니다.

# 각 함수에 대한 설명은
# https://albumentations.ai/docs/
# document를 참고하세요.
Aug_train = Compose([
    HorizontalFlip(p=0.5),
    RandomContrast(limit=0.2, p=0.5),
    RandomGamma(gamma_limit=(80, 120), p=0.5),
    RandomBrightness(limit=0.2, p=0.5),
    HueSaturationValue(hue_shift_limit=5, sat_shift_limit=20,
                       val_shift_limit=10, p=.9),
    ShiftScaleRotate(
        shift_limit=0.0625, scale_limit=0.1, 
        rotate_limit=15, border_mode=cv2.BORDER_REFLECT_101, p=0.8), 
    ToFloat(max_value=255)
])

Aug_test = Compose([
    ToFloat(max_value=255)
])

 

실험하기에 가장 좋은 예제는 tensorflow.keras.datasets에서 제공하는 CIFAR-10입니다.
또, Sequence 클래스를 상속받아 제네레이터처럼 활용할 수 있도록 합니다.
아래 코드 __getitem__ 부분의 self.augment(image=x)["image"]에서 변환 작업이 수행됩니다.

from tensorflow.python.keras.utils.data_utils import Sequence

# Sequence 클래스를 상속받아 generator 형태로 사용합니다.
class CIFAR10Dataset(Sequence):
    def __init__(self, x_set, y_set, batch_size, augmentations):
        self.x, self.y = x_set, y_set
        self.batch_size = batch_size
        self.augment = augmentations

    def __len__(self):
        return int(np.ceil(len(self.x) / float(self.batch_size)))

    # 지정 배치 크기만큼 데이터를 로드합니다.
    def __getitem__(self, idx):
        batch_x = self.x[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_y = self.y[idx * self.batch_size:(idx + 1) * self.batch_size]
        
        # augmentation을 적용해서 numpy array에 stack합니다.
        return np.stack([
            self.augment(image=x)["image"] for x in batch_x
        ], axis=0), np.array(batch_y)

# CIFAR-10 Dataset을 불러옵니다.
(x_train, y_train), (x_test, y_test) = cifar10.load_data()

BATCH_SIZE = 8

# Dataset을 생성합니다.
train_gen = CIFAR10Dataset(x_train, y_train, BATCH_SIZE, Aug_train)
test_gen = CIFAR10Dataset(x_test, y_test, BATCH_SIZE, Aug_test)

train_gen을 통해 이미지를 그려보면 다음과 같이 변환이 일어난 것을 볼 수 있습니다.

# 데이터를 그려봅시다.
images, labels = next(iter(train_gen))

fig = plt.figure()

for i, (image, label) in enumerate(zip(images, labels)):
    ax = fig.add_subplot(3, 3, i + 1)
    ax.imshow(image)
    ax.set_xlabel(label)
    ax.set_xticks([]); ax.set_yticks([])

plt.tight_layout()
plt.show()

Augmentation이 수행된 이미지

 

Reference

텐서플로우 공식 홈페이지

Albumentation Document

medium.com/the-artificial-impostor/custom-image-augmentation-with-keras-70595b01aeac

Warm-up 방식의 학습 방법, 학습률을 높였다가 낮췄다가, 다시 높였다가 낮췄다가 등 학습 과정에서 다양한 학습률로 local optima를 빠져나오도록 장치하는 방법이 fixed learning rate 방법보다 성능이 더 좋다는 것은 이미 오래전부터 논문을 통해 증명되어 왔습니다.

Cosine Annealing을 사용하면 learning rate가 어떻게 변하는지 알아보겠습니다.

설명하는 코드는 케라스 콜백과 같이 등록하여 사용하면 됩니다.


import tensorflow as tf
import tensorflow.keras.backend as backend
import math

# CosineAnneling Example.
class CosineAnnealingLearningRateSchedule(tf.keras.callbacks.Callback):
    # constructor
    def __init__(self, n_epochs, n_cycles, lrate_max, min_lr, verbose = 0):
        self.epochs = n_epochs
        self.cycles=  n_cycles
        self.lr_max = lrate_max
        self.min_lr = min_lr
        self.lrates = list()
    
    # caculate learning rate for an epoch
    def cosine_annealing(self, epoch, n_epochs, n_cycles, lrate_max):
        # 전체 epoch / 설정 cycle 수만큼 cycle을 반복합니다.
        epochs_per_cycle = math.floor(n_epochs/n_cycles)
        cos_inner = (math.pi * (epoch % epochs_per_cycle)) / (epochs_per_cycle)
        
        return lrate_max/2 * (math.cos(cos_inner) + 1)
  
    # calculate and set learning rate at the start of the epoch
    def on_epoch_begin(self, epoch, logs = None):
        if(epoch < 101):
            # calculate learning rate
            lr = self.cosine_annealing(epoch, self.epochs, self.cycles, self.lr_max)
            print('\nEpoch %05d: CosineAnnealingScheduler setting learng rate to %s.' % (epoch + 1, lr))
        # 101번째 epoch부터는 해당 설정한 min_lr을 사용
        else:
            lr = self.min_lr
            
        #     elif((epoch >= 65) and (epoch < 75)):
        #       lr = 1e-5
        #       print('\n No CosineAnnealingScheduler set lr 1e-5')
        #     elif((epoch >= 75) and (epoch < 85)):
        #       lr = 1e-6
        #       print('\n No CosineAnnealingScheduler set lr 1e-6')
        #     elif((epoch >= 85)):
        #       lr = 1e-7
        #       print('\n No CosineAnnealingScheduler set lr 1e-7')

        # set learning rate
        # 아래 예제 코드 실행을 위해선 밑 코드를 주석 처리 해주세요.
        backend.set_value(self.model.optimizer.lr, lr)
        # log value
        self.lrates.append(lr)

위 코드를 사용하면, 아래와 같은 학습률 변화를 볼 수 있습니다.

cosine_schedule = CosineAnnealingLearningRateSchedule(n_epochs = 100, n_cycles = 5, lrate_max = 1e-3, min_lr = 1e-6)

for i in range(1, 100 + 1):
    cosine_schedule.on_epoch_begin(i)
    
import matplotlib.pyplot as plt

plt.plot(cosine_schedule.lrates)
plt.title('Cosine Annealing_Toy')
plt.xlabel('epochs'); plt.ylabel('learning_rate')
plt.grid()
plt.show()

n_cycle을 5로 지정한만큼, 20(100/5) 수를 기준으로 cycle이 반복되고 있습니다. 

사실 텐서플로우를 사용한다면 위처럼 직접 정의하여 사용하지 않아도 됩니다.
텐서플로우 공식 홈페이지를 보면 이미 학습률을 조절할 수 있는 다양한 방법들을 제공하고 있기 때문에 가져다 사용하면 됩니다.
(CosineDecayRestarts, CosineDecay 등)

다음 글에서는 텐서플로우에서 제공하는 함수를 사용하여 MNIST 데이터셋에 적용해보겠습니다.

load_model() 함수를 사용하면, h5 또는 hdf5로 저장된 모델 구조, 가중치를 한꺼번에 불러올 수 있습니다.

model = load_model('your saved model path')

그런데 만약 모델에 커스텀 객체가 포함되어 있다면, 커스텀 객체를 명시해주지 않는 경우 다음과 같은 에러가 발생할 수 있습니다.

이를 알아보기 전에, 케라스에서 커스텀 객체를 선언하는 방법은 다음과 같습니다.

커스텀 객체 선언

def Mish(x):
    return x * K.tanh(K.softplus(x))

get_custom_objects().update({'mish': Mish})

Mish Activation 함수를 커스텀 객체로 선언하고 사용한 모델을 load_model() 함수를 사용하여 불러올 때, 커스텀 객체를 명시해주지 않으면(인자로 전달하지 않으면) 다음과 같은 에러를 만날 수 있습니다.

ex) Mish Activation 함수를 커스텀 객체로 선언하고 사용한 모델일 경우

ValueError: Unknown activation function:Mish

모델에선 Mish Activation 함수를 사용하여 구조가 형성되어 있는데, 로드시 이에 대한 정보를 넘겨주지 않았기 때문에 발생합니다. 따라서, 이를 해결하기 위해 다음과 같이 인자로 전달해주면 쉽게 해결할 수 있습니다.

 

커스텀 객체를 포함한 모델 로드

model = load_model('./model/saved_model.hdf5', custom_objects={'Mish':Mish}

'Mish'는 커스텀 객체 선언 시 사용한 객체의 이름이고, Mish는 해당 객체를 넘겨주는 것입니다.