결론부터 말하자면, Gradient Accumulation 방법은 GPU memory issue를 보완하기 위한 방법입니다.

배치 크기는 성능에 영향을 주는 중요한 하이퍼파라미터 중 하나인데요.
이 때문에 배치 크기(Batch Size)와 성능(Performance) 관계는 이전부터 많은 방법(다양한 조건에서의 실험, 수학적 풀이, 최적화 관점 등)으로 연구되어 왔습니다.

대표적인 논문만 간단히 보자면,

위 논문뿐만 아니라 관련 논문을 보면 아래와 같이 결론을 얻을 수 있습니다.

    ① LB(Large Batch)는 학습 속도도 빠르고, 성능도 좋을 수 있다
    ② LB 일수록 더 큰 학습률(Learning Rate)를 사용하면 좋다 (하지만 큰 학습률을 사용하면 수렴이 안되는 문제가?)
    ③ 하지만 LB보다 SB(Small Batch)가 안정적으로 학습할 수 있어 일반화(Generalization)에 강하다

Gradient Accumulation 방법이 배치 크기와 관련이 있다보니 간단하게 배치 크기 관련 이야기를 해보았는데, 이번 글은 어떤 이유에서든 "비록 가지고 있는 GPU memory가 제한적이지만 일단 LB를 사용해서 학습시키고 싶다."라는 생각입니다.

Gradient Accumulation?

큰 배치 크기를 활용할때 가장 큰 문제는 '성능이 좋아질 수 있다', '안좋아질 수 있다'가 아닐겁니다. 아마도 가장 중요하게 고려하는 것은 지금 보유하고 있는 GPU를 가지고 '1,024, 2,048, ... 처럼 큰 배치 크기를 돌려볼 수 있느냐'입니다.

누구나 다 겪는 문제인데요. 만약 이와 같은 문제에 직면했다면, 이를 해결하기 위한 방법 중 하나로 Gradient Accumulation 방법을 생각해볼 수 있습니다. 가장 좋은 방법은 돈을 들이는 것...

일반적인 학습 방법과 Gradient Accumulation 방법의 차이는 아래 그림에서 바로 확인할 수 있습니다.

General Training vs Training with GA

일반적인 학습 방법이 미니 배치를 통해 gradient를 구한 후, Update를 즉시 진행하는 방법이라면,

Gradient Accumulation 방법을 포함한 학습은 미니 배치를 통해 구해진 gradient를 n-step동안 Global Gradients에 누적시킨 후, 한번에 업데이트하는 방법입니다.

매우 간단합니다. 즉, 핵심 아이디어는 256 mini-batch를 통해 gradient를 업데이트하는 것과 64 mini-batch를 4번(64 * 4 = 256) 누적하여 업데이트하는 것이 비슷한 결과를 가져다 줄 것이라는 생각입니다.

이 예와 같다면, 우리가 보유하고 있는 GPU memory 한계상 64 mini-batch 밖에 사용할 수 없다면, 이 학습 방법을 통해 4번 누적하여 업데이트할 경우 256 mini-batch를 사용하여 학습하는 것과 다름없게 되는 셈이죠.

하지만 이 방법은 memory issue를 중점적으로 해결하고자 하는 방법이지 속도, 성능과는 아직 explicit하게 증명된 바가 없습니다.

아래의 간단한 구현을 통해 위에서 글로서 살펴본 것보다 좀 더 쉽게 이해할 수 있을 거에요.


전체 코드는 깃허브_저장소에 있습니다.

Setup

from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Input, Dense, Conv2D, MaxPooling2D, Flatten
from tensorflow.keras.models import Model
import tensorflow as tf

print(tf.__version__)

MNIST Dataset Laod

(x_train, y_train), (x_test, y_test) = mnist.load_data()

Make TF Dataset

def make_datasets(x, y):
    # (28, 28) -> (28, 28, 1)
    def _new_axis(x, y):
        y = tf.one_hot(y, depth = 10)
        
        return x[..., tf.newaxis], y
            
    ds = tf.data.Dataset.from_tensor_slices((x, y))
    ds = ds.map(_new_axis, num_parallel_calls = tf.data.experimental.AUTOTUNE)
    ds = ds.shuffle(100).batch(32) # 배치 크기 조절하세요
    ds = ds.prefetch(tf.data.experimental.AUTOTUNE)
    
    return ds
    
ds = make_datasets(x_train, y_train)

Make Models

# rescaling, 1 / 255
preprocessing_layer = tf.keras.models.Sequential([
        tf.keras.layers.experimental.preprocessing.Rescaling(1./255),
    ])

# simple CNN model
def get_model():
    inputs = Input(shape = (28, 28, 1))
    preprocessing_inputs = preprocessing_layer(inputs)
    
    x = Conv2D(filters = 32, kernel_size = (3, 3), activation='relu')(preprocessing_inputs)
    x = MaxPooling2D((2, 2))(x)
    x = Conv2D(filters = 64, kernel_size = (3, 3), activation='relu')(x)
    x = MaxPooling2D((2, 2))(x)
    x = Conv2D(filters = 64, kernel_size =(3, 3), activation='relu')(x)
    
    x = Flatten()(x)
    x = Dense(64, activation = 'relu')(x)
    outputs = Dense(10, activation = 'softmax')(x)
    
    model = Model(inputs = inputs, outputs = outputs)
    
    return model

model = get_model()
model.summary()

Training with Gradient Accumulation

epochs = 10
num_accum = 4 # 누적 횟수

loss_fn = tf.keras.losses.CategoricalCrossentropy(from_logits = True)
optimizer = tf.keras.optimizers.Adam()
train_acc_metric = tf.keras.metrics.CategoricalAccuracy()
@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x)
        loss_value = loss_fn(y, logits)
    gradients = tape.gradient(loss_value, model.trainable_weights)
    
    # update metrics    
    train_acc_metric.update_state(y, logits)
    
    return gradients, loss_value

def train():
    for epoch in range(epochs):
        print(f'################ Start of epoch: {epoch} ################')
        # 누적 gradient를 담기 위한 zeros_like 선언
        accumulation_gradients = [tf.zeros_like(ele) for ele in model.trainable_weights]
        
        for step, (batch_x_train, batch_y_train) in enumerate(ds):
            gradients, loss_value = train_step(batch_x_train, batch_y_train)
            
            if step % num_accum == 0:
                accumulation_gradients = [grad / num_accum for grad in accumulation_gradients]
                optimizer.apply_gradients(zip(gradients, model.trainable_weights))

                # zero-like init
                accumulation_gradients = [tf.zeros_like(ele) for ele in model.trainable_weights]
            else:
                accumulation_gradients = [(accum_grad + grad) for accum_grad, grad in zip(accumulation_gradients, gradients)]

            if step % 100 == 0:
                print(f"Loss at Step: {step} : {loss_value:.4f}")
            
        train_acc = train_acc_metric.result()
        print(f'Accuracy : {(train_acc * 100):.4f}%')
        train_acc_metric.reset_states()
        
# start training
train()

mecab 설치 관련 내용을 찾아본 결과, .tar 확장자를 다운받아 여러 가지를 직접 설치해주는 방법도 있지만 가장 편한 것은 아래 명령어를 쓰는 것입니다. 쉘에 그대로 복붙해주세요

bash <(curl -s https://raw.githubusercontent.com/konlpy/konlpy/master/scripts/mecab.sh)

 

만약, 에러가 난다면

  1. brew upgrade zsh
    - 에러랑은 거의 무관하지만 zsh 쓴다는 가정하에 업그레이드를 한번 해주고 시작합니다
    - 셀을 실행시켰을 때 dydl library not loded ~ 와 같은 에러 로그가 발생한다면, zsh 업그레이드로 해결됩니다
  2. xcode-select --install
    - xcrun 등 로그가 발생한다면 2번에서 해결될 가능성이 높습니다. gcc를 먼저 설치하기 전에 맥 개발자 도구부터 설치 또는 업데이트 해줍니다
  3. brew install gcc
    - 2번을 진행하고 나서 gcc 설치 명령어를 한번 더 수행한 후에, 맨 위 명령어를 다시 실행해보세요
    - 보통 C compiler 관련 로그가 여기서 해결됩니다
  4. pip install mecab-python
    - 만약 mecab을 설치하는 마지막 과정에서 mecab-python 설치 에러 로그가 발생한다면, pip를 활용해서 직접 설치해줍니다. 여기까지 전부 성공하면 mecab 실행과 관련한 모든 패키지 설치가 완료된 겁니다

이 글은 다음 튜토리얼을 번역한 것입니다.

https://keras.io/examples/vision/mlp_image_classification/

 

Keras documentation: Image classification with modern MLP models

Image classification with modern MLP models Author: Khalid Salama Date created: 2021/05/30 Last modified: 2021/05/30 Description: Implementing the MLP-Mixer, FNet, and gMLP models for CIFAR-100 image classification. View in Colab • GitHub source Introduc

keras.io

 


Introduction

이 예제는 최근 많이 사용되고 있는 Attention 방법을 사용하지 않고 MLP(multi-layer perceptron)을 사용하는 세 가지 모델에 대해 CIFAR-100 데이터셋을 활용하여 이미지 분류 문제를 해결해봅니다.

  1. 두 가지 타입의 MLP를 사용하는 MLP Mixer model(by Ilya Tolstikhin et al.)을 구현합니다
  2. based on unparameterized Fourier Transform, FNet model을 구현합니다
  3. gating 방법을 사용하는 gMLP model을 구현합니다

이 예제의 목적은 다른 데이터셋에서 서로 다른 성능을 낼 수 있기 때문에 각 모델의 성능을 비교하는 것이 아닙니다. 모델을 구성하는 main block들을 간단히 구현해보는 예제입니다.

+ 이 예제에서 살펴볼 세 개 모델은 다른 방법에 비해 꽤 간단하지만, 성능은 좋기 때문에 논문을 한번 찾아볼 필요가 있을 것 같습니다.
+ Attention mechnism도 여전히 강력하고, 훌륭한 방법이지만, 최근 연구 트렌드는 MLP인 것 같습니다.

tensorflow version 2.4 또는 그 이상에서 실행할 수 있으며, tf-addons 모듈 설치가 필요합니다.

pip install -U tensorflow-addons

Setup

import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa

Prepare the data

num_classes = 100
input_shape = (32, 32, 3)

(x_train, y_train), (x_test, y_test) = keras.datasets.cifar100.load_data()

print(f"x_train shape: {x_train.shape} - y_train shape: {y_train.shape}")
print(f"x_test shape: {x_test.shape} - y_test shape: {y_test.shape}")

Configure the hyperparameters

weight_decay = 0.0001
batch_size = 128
num_epochs = 50
dropout_rate = 0.2
image_size = 64  # 이미지 resize 크기
patch_size = 8  # 이미지 패치 크기
num_patches = (image_size // patch_size) ** 2  # 패치로 이루어진 data array 크기
embedding_dim = 256  # Number of hidden units.
num_blocks = 4  # Number of blocks.

print(f"Image size: {image_size} X {image_size} = {image_size ** 2}")
print(f"Patch size: {patch_size} X {patch_size} = {patch_size ** 2} ")
print(f"Patches per image: {num_patches}")
print(f"Elements per patch (3 channels): {(patch_size ** 2) * 3}")

Build a classification model

processing block을 포함한 classifier를 구현합니다.

def build_classifier(blocks, positional_encoding=False):
    inputs = layers.Input(shape=input_shape) # inputs: [batch_size, 32, 32, 3]
    # Augment data.
    # augmented: [batch_size, 64, 64, 3]
    augmented = data_augmentation(inputs)

    # Create patches.
    # patches: [batch_size, 64, 192]
    patches = Patches(patch_size, num_patches)(augmented)
    
    # [batch_size, num_patches, embedding_dim] tensor를 생성하기 위한 패치 인코딩 부분입니다.
    x = layers.Dense(units=embedding_dim)(patches)
    
    if positional_encoding:
        positions = tf.range(start=0, limit=num_patches, delta=1)
        position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=embedding_dim
        )(positions)
        x = x + position_embedding

    # Process x using the module blocks.
    x = blocks(x)
    
    # global average pooling을 사용
    # [batch_size, embedding_dim] 형태의 representation tensor를 만듭니다.
    representation = layers.GlobalAveragePooling1D()(x)
    representation = layers.Dropout(rate=dropout_rate)(representation)
    
    # Compute logits outputs.
    logits = layers.Dense(num_classes)(representation)
    
    # Create the Keras model.
    return keras.Model(inputs=inputs, outputs=logits)

Define an experiment

compile, train, evaluate를 위한 구현입니다.

def run_experiment(model):
    # Create Adam optimizer with weight decay.
    optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay,
    )
    # Compile the model.
    # top5-acc -> name = "top5-acc"
    model.compile(
        optimizer=optimizer,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[
            keras.metrics.SparseCategoricalAccuracy(name="acc"),
            keras.metrics.SparseTopKCategoricalAccuracy(5, name="top5-acc"),
        ],
    )
    # learning rate 스케쥴러
    reduce_lr = keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss", factor=0.5, patience=5
    )
    # early stopping callback
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor="val_loss", patience=10, restore_best_weights=True
    )
    # Fit the model.
    history = model.fit(
        x=x_train,
        y=y_train,
        batch_size=batch_size,
        epochs=num_epochs,
        validation_split=0.1,
        callbacks=[early_stopping, reduce_lr],
    )

    _, accuracy, top_5_accuracy = model.evaluate(x_test, y_test)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")
    print(f"Test top 5 accuracy: {round(top_5_accuracy * 100, 2)}%")

    # Return history to plot learning curves.
    return history

Use data augmentation

data_augmentation = keras.Sequential(
    [
        layers.experimental.preprocessing.Normalization(),
        layers.experimental.preprocessing.Resizing(image_size, image_size),
        layers.experimental.preprocessing.RandomFlip("horizontal"),
        layers.experimental.preprocessing.RandomZoom(
            height_factor=0.2, width_factor=0.2
        ),
    ],
    name="data_augmentation",
)
# normalization을 위해 training data의 mean, varfmf 계산합니다.
data_augmentation.layers[0].adapt(x_train)

Implement patch extraction as a layer

Patches class를 이해하려면 tf.image.extract_patches를 이해할 필요가 있습니다.

class Patches(layers.Layer):
    def __init__(self, patch_size, num_patches):
        super(Patches, self).__init__()
        self.patch_size = patch_size
        self.num_patches = num_patches

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, self.num_patches, patch_dims])
        return patches

The MLP-Mixer model

MLP-Mixer model은 MLP만 사용하는 아키텍처이며, 두 가지 유형의 MLP layer를 포함합니다.

  1. 각 이미지 패치를 독립적으로 사용함으로써 per-location feature를 혼합합니다
  2. 채널축으로 적용하여 spatial information을 혼합합니다.

이 방법은 어떻게 보면 Xception model에서 대표적으로 사용한 depthwise separable convolution 구조와 유사해보이지만, 차이점으로는 two chained dense transforms, no max pooling, layer normalization instead of batch normalization 사용에 있습니다.

Implement the MLP-Mixer model

class MLPMixerLayer(layers.Layer):
    def __init__(self, num_patches, hidden_units, dropout_rate, *args, **kwargs):
        super(MLPMixerLayer, self).__init__(*args, **kwargs)

        self.mlp1 = keras.Sequential(
            [
                layers.Dense(units=num_patches),
                tfa.layers.GELU(),
                layers.Dense(units=num_patches),
                layers.Dropout(rate=dropout_rate),
            ]
        )
        self.mlp2 = keras.Sequential(
            [
                layers.Dense(units=num_patches),
                tfa.layers.GELU(),
                layers.Dense(units=embedding_dim),
                layers.Dropout(rate=dropout_rate),
            ]
        )
        self.normalize = layers.LayerNormalization(epsilon=1e-6)

    def call(self, inputs):
        # Apply layer normalization.
        x = self.normalize(inputs)

        # [num_batches, num_patches, hidden_units] -> [num_batches, hidden_units, num_patches]
        x_channels = tf.linalg.matrix_transpose(x)
        
        # mlp1을 채널 독립적으로 적용합니다.
        # Dense Layer는 2-D 이상일 경우, 마지막 차원에서 가중치 연산이 일어납니다 -> 일종의 trick으로 사용
        mlp1_outputs = self.mlp1(x_channels)
        
        # [num_batches, hidden_dim, num_patches] -> [num_batches, num_patches, hidden_units]
        mlp1_outputs = tf.linalg.matrix_transpose(mlp1_outputs)
        
        # Add skip connection.
        x = mlp1_outputs + inputs
        
        # Apply layer normalization.
        x_patches = self.normalize(x)
        
        # mlp2를 각 패치에 독립적으로 적용합니다.
        mlp2_outputs = self.mlp2(x_patches)
        
        # Add skip connection.
        x = x + mlp2_outputs
        
        return x

Build, train, and evaluate the MLP-Mixer model

mlpmixer_blocks = keras.Sequential(
    [MLPMixerLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)]
)
learning_rate = 0.005
mlpmixer_classifier = build_classifier(mlpmixer_blocks)
history = run_experiment(mlpmixer_classifier)

MLP-Mixer 모델은 다른 모델에 비해 사용되는 파라미터 수가 적습니다. 

MLP-Mixer 논문에 언급된 바에 따르면, large-dataset에 pre-trained하고 modern regularization 방법을 함께 사용한다면 SOTA 모델들의 score와 경쟁할 수 있는 수준의 score를 얻을 수 있다고 합니다. 임베딩 차원, 블록 수, 입력 이미지 크기를 늘리거나 다른 패치 크기를 사용하거나 모델을 더 오랫동안 학습시키면 더 좋은 성능을 얻을 수 있습니다.


The FNet model

FNet 모델은 Transformer block과 유사한 구조의 블록을 사용합니다. 하지만 FNet 모델은 self-attention layer 대신, 파라미터에서 자유로운 2D Fourier transformation layer를 사용합니다.

위 모델과 동일하게 패치, 채널 독립적 연산이 사용됩니다.

Implement the FNet module

class FNetLayer(layers.Layer):
    def __init__(self, num_patches, embedding_dim, dropout_rate, *args, **kwargs):
        super(FNetLayer, self).__init__(*args, **kwargs)

        self.ffn = keras.Sequential(
            [
                layers.Dense(units=embedding_dim),
                tfa.layers.GELU(),
                layers.Dropout(rate=dropout_rate),
                layers.Dense(units=embedding_dim),
            ]
        )

        self.normalize1 = layers.LayerNormalization(epsilon=1e-6)
        self.normalize2 = layers.LayerNormalization(epsilon=1e-6)

    def call(self, inputs):
        # Apply fourier transformations.
        x = tf.cast(
            tf.signal.fft2d(tf.cast(inputs, dtype=tf.dtypes.complex64)),
            dtype=tf.dtypes.float32,
        )
        # Add skip connection.
        x = x + inputs
        # Apply layer normalization.
        x = self.normalize1(x)
        # Apply Feedfowrad network.
        x_ffn = self.ffn(x)
        # Add skip connection.
        x = x + x_ffn
        # Apply layer normalization.
        return self.normalize2(x)

Build, train, and evaluate the FNet model

fnet_blocks = keras.Sequential(
    [FNetLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)]
)
learning_rate = 0.001
fnet_classifier = build_classifier(fnet_blocks, positional_encoding=True)
history = run_experiment(fnet_classifier)

The gMLP model

gMLP 모델은 Spatial Gating Unit(SGU)를 사용하는 MLP 아키텍처입니다. SGU는 spatial(channel) dimention을 따라 각 패치가 상호작용할 수 있도록 합니다.

  1.  각 패치를 따라 linear projection을 적용해서 input을 spatially transform합니다
  2. inputs와 spatially transform input을 element-wise multiplication합니다

Implement the gMLP module

class gMLPLayer(layers.Layer):
    def __init__(self, num_patches, embedding_dim, dropout_rate, *args, **kwargs):
        super(gMLPLayer, self).__init__(*args, **kwargs)

        self.channel_projection1 = keras.Sequential(
            [
                layers.Dense(units=embedding_dim * 2),
                tfa.layers.GELU(),
                layers.Dropout(rate=dropout_rate),
            ]
        )

        self.channel_projection2 = layers.Dense(units=embedding_dim)

        self.spatial_projection = layers.Dense(
            units=num_patches, bias_initializer="Ones"
        )

        self.normalize1 = layers.LayerNormalization(epsilon=1e-6)
        self.normalize2 = layers.LayerNormalization(epsilon=1e-6)

    def spatial_gating_unit(self, x):
        # Split x along the channel dimensions.
        # Tensors u and v will in th shape of [batch_size, num_patchs, embedding_dim].
        u, v = tf.split(x, num_or_size_splits=2, axis=2)
        # Apply layer normalization.
        v = self.normalize2(v)
        # Apply spatial projection.
        # [batch_size, num_patches, embedding_dim] -> [batch_size, embedding_dim, num_patches]
        v_channels = tf.linalg.matrix_transpose(v)
        v_projected = self.spatial_projection(v_channels)
        v_projected = tf.linalg.matrix_transpose(v_projected)
        
        # Apply element-wise multiplication.
        return u * v_projected

    def call(self, inputs):
        # Apply layer normalization.
        x = self.normalize1(inputs)
        
        # 여기서 embedding_dim을 2배로 만들어주고,
        # Apply the first channel projection. x_projected shape: [batch_size, num_patches, embedding_dim * 2].
        x_projected = self.channel_projection1(x)
        
        # 2배로 만들어진 channel을 두 개로 쪼개서 하나는 projection을 적용, 하나는 기존걸 유지한 뒤,
        # element-wise multiplication을 적용함. 연산은 skip-connection과 비슷한 원리
        # Apply the spatial gating unit. x_spatial shape: [batch_size, num_patches, embedding_dim].
        x_spatial = self.spatial_gating_unit(x_projected)
        
        # Apply the second channel projection. x_projected shape: [batch_size, num_patches, embedding_dim].
        x_projected = self.channel_projection2(x_spatial)
        
        # Add skip connection.
        return x + x_projected

Build, train, and evaluate the gMLP model

gmlp_blocks = keras.Sequential(
    [gMLPLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)]
)
learning_rate = 0.003
gmlp_classifier = build_classifier(gmlp_blocks)
history = run_experiment(gmlp_classifier)

gMLP 논문에 따르면, embedding dimension, gMLP blocks을 늘리거나 더 오랫동안 학습시키면 더 나은 결과를 얻을 수 있다고 합니다. 또한, 입력 이미지 크기나 패치 크기를 조절해가면서 학습시켜보세요.

또, gMLP 논문에서의 구현은 advanced regularization strategies나 MixUp, CutMix 뿐만 아니라 AutoAugmentation을 사용했습니다.

이 글은 다음 튜토리얼을 번역한 것입니다.

 

Keras documentation: Classification with Gated Residual and Variable Selection Networks

Classification with Gated Residual and Variable Selection Networks Author: Khalid Salama Date created: 2021/02/10 Last modified: 2021/02/10 Description: Using Gated Residual and Variable Selection Networks for income level prediction. View in Colab • Git

keras.io


Introduction

이 예제에선 Structured data classification을 위해 Bryan Lim et al.(arxiv.org/abs/1912.09363)에서 제안된 Gated Residual NetworksVariable Selection Networks를 사용해봅니다. GRN은 필요에 따라 비선형성을 제공할 수 있는 유연성을 제공하고, VSN은 성능에 부정적인 영향을 주는 불필요한 feature를 제거하는 역할을 수행합니다. 또한, 이 두 가지를 함께 사용했을 때 모델의 학습 능력을 주요하게 향상시킬 수 있습니다.

이 예제는 논문에서 제안된 전체 모델을 구현하진 않고, 부분적으로 GRN과 VSN만 구현하게 됩니다.

※ TensorFlow 2.3 이상의 버전에서 정상 작동합니다.


The Dataset

데이터셋은 UC Irvine Machine Learning Repo에서 제공하는 United States Census Income Dataset을 사용합니다. 이진 분류이며, 연소득 50K가 넘는 사람인지를 구분하는 문제입니다.

데이터셋은 41개의 feature(7 numerical + 34 categorical)과 ~300K의 인스턴스로 이루어져 있습니다.


Setup

import math
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

Prepare Data

데이터를 로드합니다.

# Column names.
CSV_HEADER = [
    "age", "class_of_worker", "detailed_industry_recode",
    "detailed_occupation_recode", "education", "wage_per_hour",
    "enroll_in_edu_inst_last_wk", "marital_stat", "major_industry_code",
    "major_occupation_code", "race", "hispanic_origin",
    "sex", "member_of_a_labor_union", "reason_for_unemployment",
    "full_or_part_time_employment_stat", "capital_gains", "capital_losses",
    "dividends_from_stocks", "tax_filer_stat", "region_of_previous_residence",
    "state_of_previous_residence", "detailed_household_and_family_stat",
    "detailed_household_summary_in_household", "instance_weight",
    "migration_code-change_in_msa", "migration_code-change_in_reg",
    "migration_code-move_within_reg", "live_in_this_house_1_year_ago",
    "migration_prev_res_in_sunbelt", "num_persons_worked_for_employer",
    "family_members_under_18", "country_of_birth_father",
    "country_of_birth_mother", "country_of_birth_self",
    "citizenship", "own_business_or_self_employed",
    "fill_inc_questionnaire_for_veteran's_admin", "veterans_benefits",
    "weeks_worked_in_year", "year", "income_level"
]

data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census-income.data.gz"
data = pd.read_csv(data_url, header=None, names=CSV_HEADER)

test_data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census-income.test.gz"
test_data = pd.read_csv(test_data_url, header=None, names=CSV_HEADER)

print(f"Data shape: {data.shape}") # (199523, 42)
print(f"Test data shape: {test_data.shape}") # (99762, 42)

target은 ' - 50000.', ' 50000+.' 두 가지로 이루어져 있습니다.

따라서 이 두 가지를 Integer 형태로 바꿔줍니다.

data["income_level"] = data["income_level"].apply(
    lambda x: 0 if x == " - 50000." else 1
)

test_data["income_level"] = test_data["income_level"].apply(
    lambda x: 0 if x == " - 50000." else 1
)

훈련/검증 데이터로 분할합니다.

random_selection = np.random.rand(len(data.index)) <= 0.85
train_data = data[random_selection]
valid_data = data[~random_selection]

분할한 데이터를 csv 파일로 저장해둡니다.

train_data_file = "train_data.csv"
valid_data_file = "valid_data.csv"
test_data_file = "test_data.csv"

train_data.to_csv(train_data_file, index=False, header=False)
valid_data.to_csv(valid_data_file, index=False, header=False)
test_data.to_csv(test_data_file, index=False, header=False)

Define dataset metadata

reading, parsing, encoding이 원할하게 수행될 수 있도록 metadata를 정의하겠습니다.

# Target feature name.
TARGET_FEATURE_NAME = "income_level"

# Weight column name.
WEIGHT_COLUMN_NAME = "instance_weight"

# Numeric feature names.
NUMERIC_FEATURE_NAMES = [
    "age", "wage_per_hour",
    "capital_gains", "capital_losses",
    "dividends_from_stocks", "num_persons_worked_for_employer",
    "weeks_worked_in_year"
]

# Categorical feature와 feature가 가지고 있는 unique label을 구합니다.
CATEGORICAL_FEATURES_WITH_VOCABULARY = {
    feature_name: sorted([str(value) for value in list(data[feature_name].unique())])
    for feature_name in CSV_HEADER
    if feature_name
    not in list(NUMERIC_FEATURE_NAMES + [WEIGHT_COLUMN_NAME, TARGET_FEATURE_NAME])
}

# All features names.
FEATURE_NAMES = NUMERIC_FEATURE_NAMES + list(
    CATEGORICAL_FEATURES_WITH_VOCABULARY.keys()
)

# default 값 처리를 위해 categorical은 ["NA"], 나머지는 [0.0]으로 초기화합니다.
COLUMN_DEFAULTS = [
    [0.0]
    if feature_name in NUMERIC_FEATURE_NAMES + [TARGET_FEATURE_NAME, WEIGHT_COLUMN_NAME]
    else ["NA"]
    for feature_name in CSV_HEADER
]

Create a tf.data.Dataset for training and evaluation

file을 read and parsing하고 feature, label 변환을 수행하기 위해 tf.data.Dataset을 사용합니다.

from tensorflow.keras.layers.experimental.preprocessing import StringLookup

def process(features, target):
    for feature_name in features:
        if feature_name in CATEGORICAL_FEATURES_WITH_VOCABULARY:
            # Categorical feature이면 string으로 cast합니다.
            features[feature_name] = tf.cast(features[feature_name], tf.dtypes.string)
    # Get the instance weight.
    weight = features.pop(WEIGHT_COLUMN_NAME)
    
    return features, target, weight

def get_dataset_from_csv(csv_file_path, shuffle=False, batch_size=128):
    # csv 파일을 읽어오는 method
    dataset = tf.data.experimental.make_csv_dataset(
        csv_file_path,
        batch_size=batch_size,
        column_names=CSV_HEADER,
        column_defaults=COLUMN_DEFAULTS,
        label_name=TARGET_FEATURE_NAME,
        num_epochs=1,
        header=False,
        shuffle=shuffle,
    ).map(process)

    return dataset

Create model inputs

def create_model_inputs():
    inputs = {}
    
    for feature_name in FEATURE_NAMES:
        if feature_name in NUMERIC_FEATURE_NAMES:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.float32
            )
        else:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.string
            )
            
    return inputs

Encode input features

Categorical featureencoding_size dimension을 가지는 Embedding Layer를 사용해서 인코딩합니다. Numerical featurelayers.Dense를 사용해서 encoding_size dimension을 가지도록 선형 변환합니다. 따라서, 모든 인코딩된 feature는 동일한 dimension을 가지도록 구성됩니다.

embedding representation으로 변환하는 작업은 아래 테스트 코드로 확인해보세요.

index = StringLookup(
                vocabulary=CATEGORICAL_FEATURES_WITH_VOCABULARY['country_of_birth_father'], 
                mask_token=None, num_oov_indices=0
                )
test = train_data['country_of_birth_father'][:32]

# print(index(test))

embedding_ecoder = layers.Embedding(
                input_dim=len(CATEGORICAL_FEATURES_WITH_VOCABULARY['country_of_birth_father']),
                output_dim=200
            )

print(embedding_ecoder(index(test)))

 

from tensorflow.keras.layers.experimental.preprocessing import CategoryEncoding
from tensorflow.keras.layers.experimental.preprocessing import StringLookup

def encode_inputs(inputs, encoding_size):
    encoded_features = []
    
    for feature_name in inputs:
        if feature_name in CATEGORICAL_FEATURES_WITH_VOCABULARY:
            vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]
            # Create a lookup to convert a string values to an integer indices.
            # String value를 Integer index로 변환합니다.
            # 따라서, 동일한 label은 동일한 index로 변환됩니다.
            index = StringLookup(
                vocabulary=vocabulary, mask_token=None, num_oov_indices=0
            )
            # String -> Integer index
            value_index = index(inputs[feature_name])
            # encoding_size dimension을 가지는 Embedding layer를 정의합니다.
            embedding_ecoder = layers.Embedding(
                input_dim=len(vocabulary), output_dim=encoding_size
            )
            # index value를 embedding representation으로 변환합니다.
            encoded_feature = embedding_ecoder(value_index)
        else:
            # encoding_size unit을 가지는 Dense layer를 활용해서 numeric feature를 인코딩합니다.
            encoded_feature = tf.expand_dims(inputs[feature_name], -1)
            encoded_feature = layers.Dense(units=encoding_size)(encoded_feature)
        encoded_features.append(encoded_feature)
        
    return encoded_features

Implement the Gated Linear Unit

GLU는 주어진 작업에서 연관없는 input을 사용하지 않게끔 하는 유연성을 제공합니다.

class GatedLinearUnit(layers.Layer):
    def __init__(self, units):
        super(GatedLinearUnit, self).__init__()
        self.linear = layers.Dense(units)
        self.sigmoid = layers.Dense(units, activation="sigmoid")

    def call(self, inputs):
        return self.linear(inputs) * self.sigmoid(inputs)

Implement the Gated Residual Network

GRN은 다음과 같이 구성됩니다.

  1. ELU Activation을 적용합니다.
  2. Dropout을 적용합니다.
  3. GLU를 적용하고, original input과 Adding합니다. dimension이 다르면 project layer를 활용합니다.
  4. normalization을 적용하고, output을 반환합니다.
class GatedResidualNetwork(layers.Layer):
    def __init__(self, units, dropout_rate):
        super(GatedResidualNetwork, self).__init__()
        self.units = units
        self.elu_dense = layers.Dense(units, activation="elu")
        self.linear_dense = layers.Dense(units)
        self.dropout = layers.Dropout(dropout_rate)
        self.gated_linear_unit = GatedLinearUnit(units)
        self.layer_norm = layers.LayerNormalization()
        self.project = layers.Dense(units)

    def call(self, inputs):
        # 1.
        x = self.elu_dense(inputs)
        # 2.
        x = self.linear_dense(x)
        x = self.dropout(x)
        # 3.
        if inputs.shape[-1] != self.units:
            inputs = self.project(inputs)
        x = inputs + self.gated_linear_unit(x)
        # 4.
        x = self.layer_norm(x)
        
        return x

Implement the Variable Selection Network

VSN은 다음과 같이 구성됩니다.

  1. 각 feature에 개별적으로 GRN을 적용합니다.
  2. GRN을 적용한 feature를 concat하고, softmax 함수를 적용합니다.
  3. 개별 GRN의 가중합을 구합니다.

VSN은 input feature 개수와 상관없이 [batch_size, encoding_size] 형태를 가지는 output을 반환합니다.

class VariableSelection(layers.Layer):
    def __init__(self, num_features, units, dropout_rate):
        super(VariableSelection, self).__init__()
        self.grns = list()
        # Create a GRN for each feature independently
        for idx in range(num_features):
            grn = GatedResidualNetwork(units, dropout_rate)
            self.grns.append(grn)
        # Create a GRN for the concatenation of all the features
        self.grn_concat = GatedResidualNetwork(units, dropout_rate)
        self.softmax = layers.Dense(units=num_features, activation="softmax")

    def call(self, inputs):
        v = layers.concatenate(inputs) # (batch_size, embedding_size * num_features)
        v = self.grn_concat(v) # (batch_size, units)
        v = tf.expand_dims(self.softmax(v), axis=-1) # (batch_size, num_features, 1)

        x = []
        for idx, input in enumerate(inputs):
            x.append(self.grns[idx](input))
        x = tf.stack(x, axis=1) # (batch_size, num_features, units)

        # (batch_size, units)
        # (1, num_features) by (num_features, units) -> (1, units)
        outputs = tf.squeeze(tf.matmul(v, x, transpose_a=True), axis=1)

        return outputs

# test용 코드
# VariableSelection(num_features = 2, units = 100, dropout_rate = 0.1)([embedding_ecoder(index(test)), embedding_ecoder(index(test))])

Create Gated Residual and Variable Selection Networks model

def create_model(encoding_size):
    inputs = create_model_inputs()
    
    feature_list = encode_inputs(inputs, encoding_size)
    num_features = len(feature_list)

    features = VariableSelection(num_features, encoding_size, dropout_rate)(
        feature_list
    )

    outputs = layers.Dense(units=1, activation="sigmoid")(features)
    model = keras.Model(inputs=inputs, outputs=outputs)

    return model

Compile, train, and evaluate the model

learning_rate = 0.001
dropout_rate = 0.15
batch_size = 265
num_epochs = 20
encoding_size = 16

model = create_model(encoding_size)
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
    loss=keras.losses.BinaryCrossentropy(),
    metrics=[keras.metrics.BinaryAccuracy(name="accuracy")],
)


# Create an early stopping callback.
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=5, restore_best_weights=True
)

print("Start training the model...")
train_dataset = get_dataset_from_csv(
    train_data_file, shuffle=True, batch_size=batch_size
)
valid_dataset = get_dataset_from_csv(valid_data_file, batch_size=batch_size)
model.fit(
    train_dataset,
    epochs=num_epochs,
    validation_data=valid_dataset,
    callbacks=[early_stopping],
)
print("Model training finished.")

print("Evaluating model performance...")
test_dataset = get_dataset_from_csv(test_data_file, batch_size=batch_size)
_, accuracy = model.evaluate(test_dataset)

print(f"Test accuracy: {round(accuracy * 100, 2)}%")

예제에서는 95% acc를 달성합니다. 

acc 향상을 위해 encoding_size를 조절하거나 여러 개의 GRN 또는 VSN을 사용해볼 수 있습니다. 또, dropout_rate 조절은 overfitting을 피할 수 있도록 도울겁니다.

StringLookup은 엑셀에서 굳이 비슷한 이름을 가진 함수를 찾아보자면, VLookup, HLookup처럼 인덱스를 찾아주는 함수인데, tensorflow version 2.4부터 tf.keras.layers.experimental.preprocessing.StringLookUp으로 만나볼 수 있습니다.

tf.keras.layers.experimental.preprocessing.StringLookup(
    max_tokens=None, num_oov_indices=1, mask_token='',
    oov_token='[UNK]', vocabulary=None, encoding=None, invert=False,
    **kwargs
)

사용 방법은 예제만 봐도 단번에 알 수 있는데요. vocabulary를 만들어서 StringLookUp에게 던져주면, word Embedding처럼 각 data value들이 index로 표현될 수 있도록 도와주는 함수입니다.

vocab = ["a", "b", "c", "d"]
data = tf.constant([["a", "c", "d"], ["d", "z", "b"]])
layer = StringLookup(vocabulary=vocab)
layer(data)
# <tf.Tensor: shape = (2, 3), dtype=int64, numpy =
# array([[2, 4, 5],
#       [5, 1, 3]])>

"a"가 index 2인 이유는 바로 다음 예제를 보면 단번에 알 수 있습니다. OOV 처리 등.

data = tf.constant([["a", "c", "d"], ["d", "z", "b"]])
layer = StringLookup()
layer.adapt(data)

layer.get_vocabulary()
# ['','[UNK]', 'd', 'z', 'c', 'b', 'a']

 

그런데 TesnorFlow 2.4 이전 버전은 StringLookUp 함수가 제공되지 않기 때문에 다른 방법으로 사용해야 합니다.
물론 다양한 방법이 있겠지만, tensorflow가 제공하는 함수를 활용해볼거라면 tensorflow lookup 모듈을 고려해볼 수 있었습니다.

이를 위해선 두 가지를 사용해야 합니다.

  • tf.lookup.StaticHashTable
  • tf.lookup.KeyValueTensorInitializer

tf.lookup.KeyValueTensorInitializer에서 vocab(key)와 data value(value)를 준비해주면, StaticHashTable에 넣어 Table을 만드는 구조인 것으로 보입니다.

이 역시 텐서플로우 공식 홈페이지 예제를 보면 쉽게 이해할 수 있습니다.

keys_tensor = tf.constant(['a', 'b', 'c'])
vals_tensor = tf.constant([7, 8, 9])
input_tensor = tf.constant(['a', 'f'])

init = tf.lookup.KeyValueTensorInitializer(keys_tensor, vals_tensor)
table = tf.lookup.StaticHashTable(
    init,
    default_value=-1)
    
table.lookup(input_tensor).numpy()
# array([7, -1], dtype = int32)

중요하진 않지만, tf 2.2에서는 table.lookup(input_tensor).numpy()가 작동하지 않습니다.
예제가 완벽히 설명해주고 있으니 추가로 설명하진 않겠습니다.

이를 활용하면 StringLookUp처럼 사용할 수 있는데, 형변환 에러가 발생합니다
(tf 2.4에서는 아마 발생하지 않을거에요).

그래서 예제처럼 사용하기보다는 Custom_Layer로 만든 후, Tensor로 변환시키게끔해서 사용해야 하는 것 같습니다.

다음처럼 구현할 수 있고, 물론 중요한 Embedding Layer와도 연결해서 사용할 수 있습니다.

import tensorflow as tf

key = tf.range(start = 0, limit = 100, delta = 1, dtype = tf.int64)
value = tf.range(start = 0, limit = 100, delta = 1, dtype = tf.int64)
table = tf.lookup.StaticHashTable(initializer=tf.lookup.KeyValueTensorInitializer(keys = key, values = value),
                                  default_value = -1)

class custom_layer(tf.keras.layers.Layer):
  def __init__(self):
    super(custom_layer, self).__init__()

  def call(self, inputs):
    return table.lookup(inputs)

custom_layer = custom_layer()

# layer 연결
inputs = tf.keras.layers.Input(shape = (1, ), dtype = tf.int64)
encoded_feature = custom_layer(inputs)
embedding_feature = tf.keras.layers.Embedding(input_dim = 100, 
                                              output_dim = 100)(encoded_feature)

model = tf.keras.models.Model(inputs = inputs, outputs = embedding_feature)

# Embedding을 사용하지 않는 경우의 test example
# test_data = tf.constant([[50],
#                         [37]], dtype = tf.int64)

# model.predict(test_data) # array([[50], [37]])

- 어쨌든 버전이 높으면 뭐가 많아서 사용하긴 편해보인다...가 결론