이 글은 다음 튜토리얼을 번역한 것입니다.

https://keras.io/examples/vision/mlp_image_classification/

 

Keras documentation: Image classification with modern MLP models

Image classification with modern MLP models Author: Khalid Salama Date created: 2021/05/30 Last modified: 2021/05/30 Description: Implementing the MLP-Mixer, FNet, and gMLP models for CIFAR-100 image classification. View in Colab • GitHub source Introduc

keras.io

 


Introduction

이 예제는 최근 많이 사용되고 있는 Attention 방법을 사용하지 않고 MLP(multi-layer perceptron)을 사용하는 세 가지 모델에 대해 CIFAR-100 데이터셋을 활용하여 이미지 분류 문제를 해결해봅니다.

  1. 두 가지 타입의 MLP를 사용하는 MLP Mixer model(by Ilya Tolstikhin et al.)을 구현합니다
  2. based on unparameterized Fourier Transform, FNet model을 구현합니다
  3. gating 방법을 사용하는 gMLP model을 구현합니다

이 예제의 목적은 다른 데이터셋에서 서로 다른 성능을 낼 수 있기 때문에 각 모델의 성능을 비교하는 것이 아닙니다. 모델을 구성하는 main block들을 간단히 구현해보는 예제입니다.

+ 이 예제에서 살펴볼 세 개 모델은 다른 방법에 비해 꽤 간단하지만, 성능은 좋기 때문에 논문을 한번 찾아볼 필요가 있을 것 같습니다.
+ Attention mechnism도 여전히 강력하고, 훌륭한 방법이지만, 최근 연구 트렌드는 MLP인 것 같습니다.

tensorflow version 2.4 또는 그 이상에서 실행할 수 있으며, tf-addons 모듈 설치가 필요합니다.

pip install -U tensorflow-addons

Setup

import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa

Prepare the data

num_classes = 100
input_shape = (32, 32, 3)

(x_train, y_train), (x_test, y_test) = keras.datasets.cifar100.load_data()

print(f"x_train shape: {x_train.shape} - y_train shape: {y_train.shape}")
print(f"x_test shape: {x_test.shape} - y_test shape: {y_test.shape}")

Configure the hyperparameters

weight_decay = 0.0001
batch_size = 128
num_epochs = 50
dropout_rate = 0.2
image_size = 64  # 이미지 resize 크기
patch_size = 8  # 이미지 패치 크기
num_patches = (image_size // patch_size) ** 2  # 패치로 이루어진 data array 크기
embedding_dim = 256  # Number of hidden units.
num_blocks = 4  # Number of blocks.

print(f"Image size: {image_size} X {image_size} = {image_size ** 2}")
print(f"Patch size: {patch_size} X {patch_size} = {patch_size ** 2} ")
print(f"Patches per image: {num_patches}")
print(f"Elements per patch (3 channels): {(patch_size ** 2) * 3}")

Build a classification model

processing block을 포함한 classifier를 구현합니다.

def build_classifier(blocks, positional_encoding=False):
    inputs = layers.Input(shape=input_shape) # inputs: [batch_size, 32, 32, 3]
    # Augment data.
    # augmented: [batch_size, 64, 64, 3]
    augmented = data_augmentation(inputs)

    # Create patches.
    # patches: [batch_size, 64, 192]
    patches = Patches(patch_size, num_patches)(augmented)
    
    # [batch_size, num_patches, embedding_dim] tensor를 생성하기 위한 패치 인코딩 부분입니다.
    x = layers.Dense(units=embedding_dim)(patches)
    
    if positional_encoding:
        positions = tf.range(start=0, limit=num_patches, delta=1)
        position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=embedding_dim
        )(positions)
        x = x + position_embedding

    # Process x using the module blocks.
    x = blocks(x)
    
    # global average pooling을 사용
    # [batch_size, embedding_dim] 형태의 representation tensor를 만듭니다.
    representation = layers.GlobalAveragePooling1D()(x)
    representation = layers.Dropout(rate=dropout_rate)(representation)
    
    # Compute logits outputs.
    logits = layers.Dense(num_classes)(representation)
    
    # Create the Keras model.
    return keras.Model(inputs=inputs, outputs=logits)

Define an experiment

compile, train, evaluate를 위한 구현입니다.

def run_experiment(model):
    # Create Adam optimizer with weight decay.
    optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay,
    )
    # Compile the model.
    # top5-acc -> name = "top5-acc"
    model.compile(
        optimizer=optimizer,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[
            keras.metrics.SparseCategoricalAccuracy(name="acc"),
            keras.metrics.SparseTopKCategoricalAccuracy(5, name="top5-acc"),
        ],
    )
    # learning rate 스케쥴러
    reduce_lr = keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss", factor=0.5, patience=5
    )
    # early stopping callback
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor="val_loss", patience=10, restore_best_weights=True
    )
    # Fit the model.
    history = model.fit(
        x=x_train,
        y=y_train,
        batch_size=batch_size,
        epochs=num_epochs,
        validation_split=0.1,
        callbacks=[early_stopping, reduce_lr],
    )

    _, accuracy, top_5_accuracy = model.evaluate(x_test, y_test)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")
    print(f"Test top 5 accuracy: {round(top_5_accuracy * 100, 2)}%")

    # Return history to plot learning curves.
    return history

Use data augmentation

data_augmentation = keras.Sequential(
    [
        layers.experimental.preprocessing.Normalization(),
        layers.experimental.preprocessing.Resizing(image_size, image_size),
        layers.experimental.preprocessing.RandomFlip("horizontal"),
        layers.experimental.preprocessing.RandomZoom(
            height_factor=0.2, width_factor=0.2
        ),
    ],
    name="data_augmentation",
)
# normalization을 위해 training data의 mean, varfmf 계산합니다.
data_augmentation.layers[0].adapt(x_train)

Implement patch extraction as a layer

Patches class를 이해하려면 tf.image.extract_patches를 이해할 필요가 있습니다.

class Patches(layers.Layer):
    def __init__(self, patch_size, num_patches):
        super(Patches, self).__init__()
        self.patch_size = patch_size
        self.num_patches = num_patches

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, self.num_patches, patch_dims])
        return patches

The MLP-Mixer model

MLP-Mixer model은 MLP만 사용하는 아키텍처이며, 두 가지 유형의 MLP layer를 포함합니다.

  1. 각 이미지 패치를 독립적으로 사용함으로써 per-location feature를 혼합합니다
  2. 채널축으로 적용하여 spatial information을 혼합합니다.

이 방법은 어떻게 보면 Xception model에서 대표적으로 사용한 depthwise separable convolution 구조와 유사해보이지만, 차이점으로는 two chained dense transforms, no max pooling, layer normalization instead of batch normalization 사용에 있습니다.

Implement the MLP-Mixer model

class MLPMixerLayer(layers.Layer):
    def __init__(self, num_patches, hidden_units, dropout_rate, *args, **kwargs):
        super(MLPMixerLayer, self).__init__(*args, **kwargs)

        self.mlp1 = keras.Sequential(
            [
                layers.Dense(units=num_patches),
                tfa.layers.GELU(),
                layers.Dense(units=num_patches),
                layers.Dropout(rate=dropout_rate),
            ]
        )
        self.mlp2 = keras.Sequential(
            [
                layers.Dense(units=num_patches),
                tfa.layers.GELU(),
                layers.Dense(units=embedding_dim),
                layers.Dropout(rate=dropout_rate),
            ]
        )
        self.normalize = layers.LayerNormalization(epsilon=1e-6)

    def call(self, inputs):
        # Apply layer normalization.
        x = self.normalize(inputs)

        # [num_batches, num_patches, hidden_units] -> [num_batches, hidden_units, num_patches]
        x_channels = tf.linalg.matrix_transpose(x)
        
        # mlp1을 채널 독립적으로 적용합니다.
        # Dense Layer는 2-D 이상일 경우, 마지막 차원에서 가중치 연산이 일어납니다 -> 일종의 trick으로 사용
        mlp1_outputs = self.mlp1(x_channels)
        
        # [num_batches, hidden_dim, num_patches] -> [num_batches, num_patches, hidden_units]
        mlp1_outputs = tf.linalg.matrix_transpose(mlp1_outputs)
        
        # Add skip connection.
        x = mlp1_outputs + inputs
        
        # Apply layer normalization.
        x_patches = self.normalize(x)
        
        # mlp2를 각 패치에 독립적으로 적용합니다.
        mlp2_outputs = self.mlp2(x_patches)
        
        # Add skip connection.
        x = x + mlp2_outputs
        
        return x

Build, train, and evaluate the MLP-Mixer model

mlpmixer_blocks = keras.Sequential(
    [MLPMixerLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)]
)
learning_rate = 0.005
mlpmixer_classifier = build_classifier(mlpmixer_blocks)
history = run_experiment(mlpmixer_classifier)

MLP-Mixer 모델은 다른 모델에 비해 사용되는 파라미터 수가 적습니다. 

MLP-Mixer 논문에 언급된 바에 따르면, large-dataset에 pre-trained하고 modern regularization 방법을 함께 사용한다면 SOTA 모델들의 score와 경쟁할 수 있는 수준의 score를 얻을 수 있다고 합니다. 임베딩 차원, 블록 수, 입력 이미지 크기를 늘리거나 다른 패치 크기를 사용하거나 모델을 더 오랫동안 학습시키면 더 좋은 성능을 얻을 수 있습니다.


The FNet model

FNet 모델은 Transformer block과 유사한 구조의 블록을 사용합니다. 하지만 FNet 모델은 self-attention layer 대신, 파라미터에서 자유로운 2D Fourier transformation layer를 사용합니다.

위 모델과 동일하게 패치, 채널 독립적 연산이 사용됩니다.

Implement the FNet module

class FNetLayer(layers.Layer):
    def __init__(self, num_patches, embedding_dim, dropout_rate, *args, **kwargs):
        super(FNetLayer, self).__init__(*args, **kwargs)

        self.ffn = keras.Sequential(
            [
                layers.Dense(units=embedding_dim),
                tfa.layers.GELU(),
                layers.Dropout(rate=dropout_rate),
                layers.Dense(units=embedding_dim),
            ]
        )

        self.normalize1 = layers.LayerNormalization(epsilon=1e-6)
        self.normalize2 = layers.LayerNormalization(epsilon=1e-6)

    def call(self, inputs):
        # Apply fourier transformations.
        x = tf.cast(
            tf.signal.fft2d(tf.cast(inputs, dtype=tf.dtypes.complex64)),
            dtype=tf.dtypes.float32,
        )
        # Add skip connection.
        x = x + inputs
        # Apply layer normalization.
        x = self.normalize1(x)
        # Apply Feedfowrad network.
        x_ffn = self.ffn(x)
        # Add skip connection.
        x = x + x_ffn
        # Apply layer normalization.
        return self.normalize2(x)

Build, train, and evaluate the FNet model

fnet_blocks = keras.Sequential(
    [FNetLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)]
)
learning_rate = 0.001
fnet_classifier = build_classifier(fnet_blocks, positional_encoding=True)
history = run_experiment(fnet_classifier)

The gMLP model

gMLP 모델은 Spatial Gating Unit(SGU)를 사용하는 MLP 아키텍처입니다. SGU는 spatial(channel) dimention을 따라 각 패치가 상호작용할 수 있도록 합니다.

  1.  각 패치를 따라 linear projection을 적용해서 input을 spatially transform합니다
  2. inputs와 spatially transform input을 element-wise multiplication합니다

Implement the gMLP module

class gMLPLayer(layers.Layer):
    def __init__(self, num_patches, embedding_dim, dropout_rate, *args, **kwargs):
        super(gMLPLayer, self).__init__(*args, **kwargs)

        self.channel_projection1 = keras.Sequential(
            [
                layers.Dense(units=embedding_dim * 2),
                tfa.layers.GELU(),
                layers.Dropout(rate=dropout_rate),
            ]
        )

        self.channel_projection2 = layers.Dense(units=embedding_dim)

        self.spatial_projection = layers.Dense(
            units=num_patches, bias_initializer="Ones"
        )

        self.normalize1 = layers.LayerNormalization(epsilon=1e-6)
        self.normalize2 = layers.LayerNormalization(epsilon=1e-6)

    def spatial_gating_unit(self, x):
        # Split x along the channel dimensions.
        # Tensors u and v will in th shape of [batch_size, num_patchs, embedding_dim].
        u, v = tf.split(x, num_or_size_splits=2, axis=2)
        # Apply layer normalization.
        v = self.normalize2(v)
        # Apply spatial projection.
        # [batch_size, num_patches, embedding_dim] -> [batch_size, embedding_dim, num_patches]
        v_channels = tf.linalg.matrix_transpose(v)
        v_projected = self.spatial_projection(v_channels)
        v_projected = tf.linalg.matrix_transpose(v_projected)
        
        # Apply element-wise multiplication.
        return u * v_projected

    def call(self, inputs):
        # Apply layer normalization.
        x = self.normalize1(inputs)
        
        # 여기서 embedding_dim을 2배로 만들어주고,
        # Apply the first channel projection. x_projected shape: [batch_size, num_patches, embedding_dim * 2].
        x_projected = self.channel_projection1(x)
        
        # 2배로 만들어진 channel을 두 개로 쪼개서 하나는 projection을 적용, 하나는 기존걸 유지한 뒤,
        # element-wise multiplication을 적용함. 연산은 skip-connection과 비슷한 원리
        # Apply the spatial gating unit. x_spatial shape: [batch_size, num_patches, embedding_dim].
        x_spatial = self.spatial_gating_unit(x_projected)
        
        # Apply the second channel projection. x_projected shape: [batch_size, num_patches, embedding_dim].
        x_projected = self.channel_projection2(x_spatial)
        
        # Add skip connection.
        return x + x_projected

Build, train, and evaluate the gMLP model

gmlp_blocks = keras.Sequential(
    [gMLPLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)]
)
learning_rate = 0.003
gmlp_classifier = build_classifier(gmlp_blocks)
history = run_experiment(gmlp_classifier)

gMLP 논문에 따르면, embedding dimension, gMLP blocks을 늘리거나 더 오랫동안 학습시키면 더 나은 결과를 얻을 수 있다고 합니다. 또한, 입력 이미지 크기나 패치 크기를 조절해가면서 학습시켜보세요.

또, gMLP 논문에서의 구현은 advanced regularization strategies나 MixUp, CutMix 뿐만 아니라 AutoAugmentation을 사용했습니다.