이 글은 다음 예제를 번역한 것입니다.


Introduction

이 예제는 JPEG 파일을 직접 저장하고, 미리 학습된 모델을 사용하지 않으면서 이미지 분류를 어떻게 행하는지 알아보는 예제입니다. 'Cats vs Dogs' 데이터셋을 케라스를 활용하여 분류해봅니다.

데이터셋 생성을 위해 image_dataset_from_directory를 사용하고, 표준화와 augmentation을 위해 Keras Preprocessing Layer를 사용해봅니다.

Setup

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

 

Load the data: the Cats vs Dogs dataset

Raw data download

먼저, 765M 크기의 데이터를 다운로드합니다.
예제와 다르게 그냥 쥬피터 노트북 상에서 zip파일을 다운받고 싶으면, 아래 코드를 활용하면 됩니다.

import requests

target_path = 'datasets.zip'

url = 'https://download.microsoft.com/download/3/E/1/3E1C3F21-ECDB-4869-8368-6DEBA77B919F/kagglecatsanddogs_3367a.zip'
response = requests.get(url, stream = True)
handle = open(target_path, 'wb')

for chunk in response.iter_content(chunk_size = 512):
    if chunk:
        handle.write(chunk)
        
handle.close()   

압축을 풀었다면, 최상위 폴더에는 Cat, Dog 하위 폴더가 존재합니다.

Filter out corrupted images

손상된 이미지는 어디에나 존재합니다. 여기서는 "JFIF" 헤더를 가진 이미지를 필터링해보겠습니다.
필터링 시, 해당 헤더가 아니면 손상된 이미지이므로 제거합니다.

import os

num_skipped = 0
for folder_name in ("Cat", "Dog"):
    folder_path = os.path.join("PetImages", folder_name)
    for fname in os.listdir(folder_path):
        fpath = os.path.join(folder_path, fname) # PetImages/Cat/63.jpg
        try:
            fobj = open(fpath, "rb")
            # fobj.peek(10)은 10바이트를 가져오는데, 여기서는 보통
            # 헤더를 읽기 위해 전체를 가져온다고 생각해도 무방합니다.
            is_jfif = tf.compat.as_bytes("JFIF") in fobj.peek(10)
        finally:
            fobj.close()

        if not is_jfif:
            num_skipped += 1
            # Delete corrupted image
            os.remove(fpath)

print("Deleted %d images" % num_skipped)

 

Generate a Dataset

image_size = (180, 180)
batch_size = 32

train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    "PetImages",
    validation_split=0.2,
    subset="training",
    seed=1337,
    image_size=image_size,
    batch_size=batch_size,
)
val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    "PetImages",
    validation_split=0.2,
    subset="validation",
    seed=1337,
    image_size=image_size,
    batch_size=batch_size,
)

Visualize the data

9개 이미지를 그려봅니다. '1'은 Dog이고, '0'은 Cat에 해당합니다.

import matplotlib.pyplot as plt

# figure 크기를 조절합니다.
plt.figure(figsize=(10, 10))

# 배치 하나를 가져옵니다.
for images, labels in train_ds.take(1):
    for i in range(9):
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"))
        plt.title(int(labels[i]))
        plt.axis("off")

Using Image data augmentation

데이터가 다양하지 않거나, 데이터 개수가 충분하지 않을때 랜덤으로 회전시키거나 fliping하는 등 augmentation을 활용하면 이를 해결할 수 있습니다. 이는 모델이 다양한 데이터를 볼 수 있게 도와줌과 동시에 이를 통해 Overfitting을 (어느정도) 극복할 수 있습니다.

data_augmentation = keras.Sequential(
    [
        layers.experimental.preprocessing.RandomFlip("horizontal"),
        layers.experimental.preprocessing.RandomRotation(0.1),
    ]
)

data_augmentation을 통해 변형된 이미지를 그려봅니다.

plt.figure(figsize=(10, 10))
for images, _ in train_ds.take(1):
    for i in range(9):
        augmented_images = data_augmentation(images)
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(augmented_images[0].numpy().astype("uint8"))
        plt.axis("off")

 

Standardizing the data

현재 우리가 사용할 이미지는 픽셀 type float32, 크기는 180x180으로 이미 어느정도 표준화가 되어 있습니다. 하지만 RGB channel의 값들이 [0, 255] 값을 가지고 있어, 신경망을 사용하기엔 적합하지 않습니다. 따라서 Rescaling layer를 활용해서 [0, 1] 범위로 변환해줄 것입니다.

 

Two options to preprocess the data

augmentation을 적용할 수 있는 두 가지 방법이 있습니다.

>> Option 1: Make it part of the model like this:

inputs = keras.Input(shape=input_shape)
x = data_augmentation(inputs)
x = layers.experimental.preprocessing.Rescaling(1./255)(x)
...  # Rest of the model

이 방법은 모델 실행에서 동시에 사용될 수 있기 때문에 GPU를 사용한다는 큰 장점을 활용할 수 있습니다.

이 방법은 test time에는 동작하지 않기 때문에, evaluate(), predict()에서는 활용되지 않고, 오로지 fit() 동안에만 사용됩니다. 또한, GPU를 보유하고 있다면 이 방법을 긍정적으로 고려해볼 수 있습니다.

>> Option 2: apply it to the dataset, so as to obtain a dataset that yields batches of augmented images, like this:

augmented_train_ds = train_ds.map(
  lambda x, y: (data_augmentation(x, training=True), y))

이 방법은 CPU에서 주로 사용되고, 버퍼를 통해 모델에 입력됩니다.

CPU를 활용하여 학습하는 경우라면 이 방법이 더 효과적입니다.

 

Configure the dataset for performance

prefetch 옵션을 활용하면 더욱 효율적으로 데이터를 생성하여 모델에 입력할 수 있습니다.

train_ds = train_ds.prefetch(buffer_size=32)
val_ds = val_ds.prefetch(buffer_size=32)

 

Build a model

Xception의 small version을 작성해보겠습니다. 이 모델에 대해서는 최적화를 진행하지 않을 것이고, 만약 최적화에 관심이 있다면 Keras Tuner를 고려해보세요.

강조 사항:

  • data_augmentation을 사용하고, 추가로 Rescaling layer를 활용합니다.
  • 마지막 classification layer에 Dropout을 추가합니다.
def make_model(input_shape, num_classes):
    inputs = keras.Input(shape=input_shape)
    # 본격적으로 모델에 입력하기 전에 여기서 augmentation이 진행됩니다.
    # inference time에는 동작하지 않습니다.
    x = data_augmentation(inputs)

    # [0, 1] 변환을 위해 Rescaling Layer를 활용합니다.
    x = layers.experimental.preprocessing.Rescaling(1.0 / 255)(x)
    x = layers.Conv2D(32, 3, strides=2, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    x = layers.Conv2D(64, 3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    previous_block_activation = x  # Set aside residual

    for size in [128, 256, 512, 728]:
        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(size, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(size, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.MaxPooling2D(3, strides=2, padding="same")(x)

        # Project residual
        residual = layers.Conv2D(size, 1, strides=2, padding="same")(
            previous_block_activation)
        x = layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    x = layers.SeparableConv2D(1024, 3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    x = layers.GlobalAveragePooling2D()(x)
    if num_classes == 2:
        activation = "sigmoid"
        units = 1
    else:
        activation = "softmax"
        units = num_classes

    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(units, activation=activation)(x)
    
    return keras.Model(inputs, outputs)

# image size에 (3, ) 채널을 추가합니다.
model = make_model(input_shape=image_size + (3,), num_classes=2)

 

Train the model

epochs = 50

callbacks = [
    keras.callbacks.ModelCheckpoint("save_at_{epoch}.h5"),
]
model.compile(optimizer=keras.optimizers.Adam(1e-3),
              loss="binary_crossentropy",
              metrics=["accuracy"])
model.fit(train_ds, epochs=epochs, 
          callbacks=callbacks, validation_data=val_ds)

학습을 진행해보세요.

 

Run inference on new data

inference time에는 Dropout과 data augmentation이 비활성화 상태가 됩니다.
하지만 Rescaling layer는 그대로 활용되기 때문에 테스트용 데이터로 추론을 진행할 때 올바른 결과를 얻을 수 있습니다.

img = keras.preprocessing.image.load_img("PetImages/Cat/6779.jpg", 
                                         target_size=image_size)
img_array = keras.preprocessing.image.img_to_array(img)
img_array = tf.expand_dims(img_array, 0)  # Create batch axis

predictions = model.predict(img_array)
score = predictions[0]
print(
    "This image is %.2f percent cat and %.2f percent dog."
    % (100 * (1 - score), 100 * score)
)

Warm-up 방식의 학습 방법, 학습률을 높였다가 낮췄다가, 다시 높였다가 낮췄다가 등 학습 과정에서 다양한 학습률로 local optima를 빠져나오도록 장치하는 방법이 fixed learning rate 방법보다 성능이 더 좋다는 것은 이미 오래전부터 논문을 통해 증명되어 왔습니다.

Cosine Annealing을 사용하면 learning rate가 어떻게 변하는지 알아보겠습니다.

설명하는 코드는 케라스 콜백과 같이 등록하여 사용하면 됩니다.


import tensorflow as tf
import tensorflow.keras.backend as backend
import math

# CosineAnneling Example.
class CosineAnnealingLearningRateSchedule(tf.keras.callbacks.Callback):
    # constructor
    def __init__(self, n_epochs, n_cycles, lrate_max, min_lr, verbose = 0):
        self.epochs = n_epochs
        self.cycles=  n_cycles
        self.lr_max = lrate_max
        self.min_lr = min_lr
        self.lrates = list()
    
    # caculate learning rate for an epoch
    def cosine_annealing(self, epoch, n_epochs, n_cycles, lrate_max):
        # 전체 epoch / 설정 cycle 수만큼 cycle을 반복합니다.
        epochs_per_cycle = math.floor(n_epochs/n_cycles)
        cos_inner = (math.pi * (epoch % epochs_per_cycle)) / (epochs_per_cycle)
        
        return lrate_max/2 * (math.cos(cos_inner) + 1)
  
    # calculate and set learning rate at the start of the epoch
    def on_epoch_begin(self, epoch, logs = None):
        if(epoch < 101):
            # calculate learning rate
            lr = self.cosine_annealing(epoch, self.epochs, self.cycles, self.lr_max)
            print('\nEpoch %05d: CosineAnnealingScheduler setting learng rate to %s.' % (epoch + 1, lr))
        # 101번째 epoch부터는 해당 설정한 min_lr을 사용
        else:
            lr = self.min_lr
            
        #     elif((epoch >= 65) and (epoch < 75)):
        #       lr = 1e-5
        #       print('\n No CosineAnnealingScheduler set lr 1e-5')
        #     elif((epoch >= 75) and (epoch < 85)):
        #       lr = 1e-6
        #       print('\n No CosineAnnealingScheduler set lr 1e-6')
        #     elif((epoch >= 85)):
        #       lr = 1e-7
        #       print('\n No CosineAnnealingScheduler set lr 1e-7')

        # set learning rate
        # 아래 예제 코드 실행을 위해선 밑 코드를 주석 처리 해주세요.
        backend.set_value(self.model.optimizer.lr, lr)
        # log value
        self.lrates.append(lr)

위 코드를 사용하면, 아래와 같은 학습률 변화를 볼 수 있습니다.

cosine_schedule = CosineAnnealingLearningRateSchedule(n_epochs = 100, n_cycles = 5, lrate_max = 1e-3, min_lr = 1e-6)

for i in range(1, 100 + 1):
    cosine_schedule.on_epoch_begin(i)
    
import matplotlib.pyplot as plt

plt.plot(cosine_schedule.lrates)
plt.title('Cosine Annealing_Toy')
plt.xlabel('epochs'); plt.ylabel('learning_rate')
plt.grid()
plt.show()

n_cycle을 5로 지정한만큼, 20(100/5) 수를 기준으로 cycle이 반복되고 있습니다. 

사실 텐서플로우를 사용한다면 위처럼 직접 정의하여 사용하지 않아도 됩니다.
텐서플로우 공식 홈페이지를 보면 이미 학습률을 조절할 수 있는 다양한 방법들을 제공하고 있기 때문에 가져다 사용하면 됩니다.
(CosineDecayRestarts, CosineDecay 등)

다음 글에서는 텐서플로우에서 제공하는 함수를 사용하여 MNIST 데이터셋에 적용해보겠습니다.

아래 코드로 내용을 대체합니다.

더 많은 층의 output을 보고 싶으면, feature_maps를 for-loop로 구현하면 됩니다.

# 신경망 시각화(조휘용)
import tensorflow as tf

get_layer_name = [layer.name for layer in model.layers]
get_output = [layer.output for layer in model.layers]

# 모델 전체에서 output을 가져올 수 있습니다.
visual_model = tf.keras.models.Model(inputs = model.input, outputs = get_output)

test_img = np.expand_dims(testX[0], axis = 0)
feature_maps = visual_model.predict(test_img)

# 첫 번째 컨볼루션 층의 특징맵을 시각화합니다.
conv_featuremap = feature_maps[0]
conv_name = get_layer_name[0]

img_size = conv_featuremap.shape[1]
img_features = conv_featuremap.shape[-1]

display_grid = np.zeros((img_size, img_size * img_features))

for i in range(img_features):
    x = conv_featuremap[:, :, :, i]
    x -= x.mean(); x /= x.std()
    x *= 64
    x += 128
    x = np.clip(x, 0, 255).astype('uint8')
    display_grid[:, i * img_size : (i + 1) * img_size] = x
    
plt.figure(figsize = (20,20))
plt.title(conv_name)
plt.grid(False)
plt.imshow(display_grid, cmap = 'viridis')

 

https://www.youtube.com/watch?v=51YtxSH-U3Y&list=PLQY2H8rRoyvzuJw20FG82Lgm2SZjTdIXU&index=7


최근 텐서플로우는 파이토치 때문에 연구에는 불편하다는 인식이 있습니다(개인적인 의견일 수도..).

이번 영상에서는 텐서플로우가 효율적인 연구를 위해 제공하는 기능을 알아보도록 하겠습니다.

 

파라미터의 상태를 제어한다는 것은 연구에서 매우 중요한 작업입니다.
예를 들어, 케라스 Dense layer의 파라미터나 bias는 층에 저장되어 있긴 하지만, 여전히 state를 다루기엔 매우 불편합니다.

더욱 편리한 제어를 위해 tf.variable_creator_scope를 사용합니다.

class FactorizedVariable(tf.Module):
    def __init__(self, a, b):
        self.a = a
        self.b = b

tf.register_tensor_conversion_function(
  FactorizedVariable, lambda x, *a, **k: tf.matmul(x.a, x.b))

def scope(next_creator, **kwargs):
    shape = kwargs['initial_value']().shape
    if len(shape) != 2: return next_creator(**kwargs)
    return FactorizedVariable(tf.Variable(tf.random.normal([shape[0], 2])),
                                         tf.Variable(tf.random.normal([2, shape[1]])))

with tf.variable_creator_scope(scope):
    d = tf.keras.layer.Dense(10)
    d(tf.zeros[20, 10])
assert isinstance(d.kernel, FactorizedVariable)
  • 먼저, 저장하고 싶은 값을 선택하고, tf.Module을 상속받은 클래스를 정의합니다.
    tf.Module은 저장하고 싶은 변수를 자동으로 추적할 수 있도록 도와줍니다.

위의 코드는 매우 간단하지만, 실제로 사용하는 모델에서는 파라미터가 매우 많기 때문에 관리가 힘듭니다. 따라서 tf.variable_creator_scope를 사용하면 자동 추적 및 파라미터의 변화를 확인할 수 있기 때문에 매우 편리합니다.

딥러닝을 연구하는 데에 있어서 계산 속도는 매우 중요합니다. 텐서플로우는 TensorFlow compiler, XLA 등을 통해 빠른 연산 속도를 지원하고 있습니다. 더욱 효과적으로 사용하려면 @tf.function(experimental_compile=True)를 사용하세요.

활성화 함수의 예를 보겠습니다. 활성화 함수에서는 element-wise 연산 때문에 속도 측면에서 부정적인 영향을 줄지도 모릅니다.
다음 예제 코드에서 속도 차이를 볼 수 있습니다.

def f(x):
    return tf.math.log(2*tf.exp(tf.nn.relu(x+1)))

c_f = tf.function(f, experimental_compile=True)
c_f(tf.zeros([100, 100]))

f = tf.function(f)
f(tf.zeros([100, 100]))

print(timeit.timeit(lambda: f(tf.zeros([100, 100])), number = 10))
# 0.007

print(timeit.timeit(lambda: c_f(tf.zeros([100, 100])), number = 10))
# 0.005 -- ~25% faster!
  • tf.function 사용은 동일합니다. 단지, experimental_compile=True를 추가합니다.
  • linear operations가 포함된 함수나 Bert를 포함한 large-scale 모델에서 효과를 볼 수 있습니다.

element-wise 연산은 옵티마이저에서도 매우 빈번하게 일어납니다. @tf.function을 옵티마이저 코드에 추가한다면 효과를 볼 수 있습니다.
다음은 직접 옵티마이저를 정의해서 @tf.function을 사용하는 예제입니다.

class MyOptimizer(tf.keras.optimizers.Optimizer):
    def __init__(self, lr, power, avg):
        super().__init__(name="MyOptimizer")
        self.lrate, self.pow, self.avg = lr, power, avg
        
    def get_config(self): pass
    def _create_slots(self, var_list):
        for v in var_list: self.add_slot(v, "accum", tf.zeros_like(v))
    
    @tf.function(experimental_compile=True)
    def _resource_apply_dense(self, grad, var, apply_state = None):
        acc = self.get_slot(var, "accum")
        acc.assign(self.avg * tf.pow(grad, self.pow) + (1-self.avg) * acc)
        
        return var.assign_sub(self.lrate * grad/tf.pow(acc, self.pow))

 

다음은 Vectorization을 이야기해보겠습니다. 이는 성능 향상을 위해 매우~! 중요한 지표입니다.
머신 러닝 모델을 다루기 위해 Vectorization이 중요하다는 것은 이미 다 알고 있는 사실이지만, 다루기가 어렵습니다.

그래서 텐서플로우는 이를 위해 auto-Vectorization을 제공합니다. 이 기능은 element-wise 연산이나 batch computation에서 성능 향상을 위해 사용될 것입니다.

Jacobian 연산을 수행하는 예제 코드입니다. jacobian은 미분값을 저장해놓은 행렬입니다.
이를 위해선 tf.GradientTape에서 tape.gradient를 무수히 호출해야하고, 다수의 for-loop를 사용하고, Tensor를 쌓아야 합니다.
이러한 과정을 거치는 코드는 언제나 작동하지만, 좀 더 효율적으로 다룰 수 있는 방법을 텐서플로우가 제공합니다.

tf.vectorized_map을 사용하는 것입니다.

x = tf.random.normal([10, 10])

with tf.GradientTape(persistent=True) as t:
    t.watch(x)
    y = tf.exp(tf.matmul(x, x))
    jac = tf.vectorized_map(
                            lambda yi: tf.vectorized_map(
                            lambda yij: t.gradient(yij, x), yi), y)
  • tf.vectorized_map을 사용하면 빠른 속도로 연산을 수행할 수 있습니다. 하지만 코드가 복잡합니다.
  • 텐서플로우는 이를 위해 jacobian을 아예 함수로 제공합니다.
x = tf.random.normal([10, 10])

with tf.GradientTape() as t:
    t.watch(x)
    y = tf.exp(tf.matmul(x, x))
jac = t.jacobian(y, x)
  • 제공하는 jacobian을 사용하면, 기존 코드보다 10배는 빠르다고 합니다.

마지막으로 데이터에 관한 이야기입니다.
텐서플로우를 사용하는 우리는 항상 매우 커다란 크기의 array를 다루게 됩니다.

 

또, 머신 러닝 모델을 다루다보면 서로 다른 타입의 데이터를 다루기도 합니다. type도 다르고, shape 다르고...
예를 들어, 텐서플로우는 다음과 같은 예를 임베딩 형태로 만들어 줍니다.

텐서플로우는 서로 다른 길이의 데이터를 다루기 위해 ragged tensor 형태를 사용합니다.

data = [['this', 'is', 'a', 'sentence'],
       ['another', 'one'],
       ['a', 'somewhat', 'longer', 'one', ',', 'this']]

rt = tf.ragged.constant(data)
vocab = tf.lookup.StaticVocabularyTable(
    tf.lookup.KeyValueTensorInitializer(
    ['This', 'is', 'a', 'sentence', 'another', 'one', 'somewhat', 'longer'],
    tf.range(8, dtype = tf.int64)), 1)

rt = tf.ragged.map_flat_values(lambda x:vocab.lookup(x), rt)
embedding_table = tf.Variable(tf.random.normal([9, 10]))
rt = tf.gather(embedding_table, rt)
tf.math.reduce_mean(rt, axis = 1)
# Result has shape (3, 10)

길이가 다르고, type이 다르면 tf.ragged를 사용하세요!