결론부터 말하자면, Gradient Accumulation 방법은 GPU memory issue를 보완하기 위한 방법입니다.

배치 크기는 성능에 영향을 주는 중요한 하이퍼파라미터 중 하나인데요.
이 때문에 배치 크기(Batch Size)와 성능(Performance) 관계는 이전부터 많은 방법(다양한 조건에서의 실험, 수학적 풀이, 최적화 관점 등)으로 연구되어 왔습니다.

대표적인 논문만 간단히 보자면,

위 논문뿐만 아니라 관련 논문을 보면 아래와 같이 결론을 얻을 수 있습니다.

    ① LB(Large Batch)는 학습 속도도 빠르고, 성능도 좋을 수 있다
    ② LB 일수록 더 큰 학습률(Learning Rate)를 사용하면 좋다 (하지만 큰 학습률을 사용하면 수렴이 안되는 문제가?)
    ③ 하지만 LB보다 SB(Small Batch)가 안정적으로 학습할 수 있어 일반화(Generalization)에 강하다

Gradient Accumulation 방법이 배치 크기와 관련이 있다보니 간단하게 배치 크기 관련 이야기를 해보았는데, 이번 글은 어떤 이유에서든 "비록 가지고 있는 GPU memory가 제한적이지만 일단 LB를 사용해서 학습시키고 싶다."라는 생각입니다.

Gradient Accumulation?

큰 배치 크기를 활용할때 가장 큰 문제는 '성능이 좋아질 수 있다', '안좋아질 수 있다'가 아닐겁니다. 아마도 가장 중요하게 고려하는 것은 지금 보유하고 있는 GPU를 가지고 '1,024, 2,048, ... 처럼 큰 배치 크기를 돌려볼 수 있느냐'입니다.

누구나 다 겪는 문제인데요. 만약 이와 같은 문제에 직면했다면, 이를 해결하기 위한 방법 중 하나로 Gradient Accumulation 방법을 생각해볼 수 있습니다. 가장 좋은 방법은 돈을 들이는 것...

일반적인 학습 방법과 Gradient Accumulation 방법의 차이는 아래 그림에서 바로 확인할 수 있습니다.

General Training vs Training with GA

일반적인 학습 방법이 미니 배치를 통해 gradient를 구한 후, Update를 즉시 진행하는 방법이라면,

Gradient Accumulation 방법을 포함한 학습은 미니 배치를 통해 구해진 gradient를 n-step동안 Global Gradients에 누적시킨 후, 한번에 업데이트하는 방법입니다.

매우 간단합니다. 즉, 핵심 아이디어는 256 mini-batch를 통해 gradient를 업데이트하는 것과 64 mini-batch를 4번(64 * 4 = 256) 누적하여 업데이트하는 것이 비슷한 결과를 가져다 줄 것이라는 생각입니다.

이 예와 같다면, 우리가 보유하고 있는 GPU memory 한계상 64 mini-batch 밖에 사용할 수 없다면, 이 학습 방법을 통해 4번 누적하여 업데이트할 경우 256 mini-batch를 사용하여 학습하는 것과 다름없게 되는 셈이죠.

하지만 이 방법은 memory issue를 중점적으로 해결하고자 하는 방법이지 속도, 성능과는 아직 explicit하게 증명된 바가 없습니다.

아래의 간단한 구현을 통해 위에서 글로서 살펴본 것보다 좀 더 쉽게 이해할 수 있을 거에요.


전체 코드는 깃허브_저장소에 있습니다.

Setup

from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Input, Dense, Conv2D, MaxPooling2D, Flatten
from tensorflow.keras.models import Model
import tensorflow as tf

print(tf.__version__)

MNIST Dataset Laod

(x_train, y_train), (x_test, y_test) = mnist.load_data()

Make TF Dataset

def make_datasets(x, y):
    # (28, 28) -> (28, 28, 1)
    def _new_axis(x, y):
        y = tf.one_hot(y, depth = 10)
        
        return x[..., tf.newaxis], y
            
    ds = tf.data.Dataset.from_tensor_slices((x, y))
    ds = ds.map(_new_axis, num_parallel_calls = tf.data.experimental.AUTOTUNE)
    ds = ds.shuffle(100).batch(32) # 배치 크기 조절하세요
    ds = ds.prefetch(tf.data.experimental.AUTOTUNE)
    
    return ds
    
ds = make_datasets(x_train, y_train)

Make Models

# rescaling, 1 / 255
preprocessing_layer = tf.keras.models.Sequential([
        tf.keras.layers.experimental.preprocessing.Rescaling(1./255),
    ])

# simple CNN model
def get_model():
    inputs = Input(shape = (28, 28, 1))
    preprocessing_inputs = preprocessing_layer(inputs)
    
    x = Conv2D(filters = 32, kernel_size = (3, 3), activation='relu')(preprocessing_inputs)
    x = MaxPooling2D((2, 2))(x)
    x = Conv2D(filters = 64, kernel_size = (3, 3), activation='relu')(x)
    x = MaxPooling2D((2, 2))(x)
    x = Conv2D(filters = 64, kernel_size =(3, 3), activation='relu')(x)
    
    x = Flatten()(x)
    x = Dense(64, activation = 'relu')(x)
    outputs = Dense(10, activation = 'softmax')(x)
    
    model = Model(inputs = inputs, outputs = outputs)
    
    return model

model = get_model()
model.summary()

Training with Gradient Accumulation

epochs = 10
num_accum = 4 # 누적 횟수

loss_fn = tf.keras.losses.CategoricalCrossentropy(from_logits = True)
optimizer = tf.keras.optimizers.Adam()
train_acc_metric = tf.keras.metrics.CategoricalAccuracy()
@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x)
        loss_value = loss_fn(y, logits)
    gradients = tape.gradient(loss_value, model.trainable_weights)
    
    # update metrics    
    train_acc_metric.update_state(y, logits)
    
    return gradients, loss_value

def train():
    for epoch in range(epochs):
        print(f'################ Start of epoch: {epoch} ################')
        # 누적 gradient를 담기 위한 zeros_like 선언
        accumulation_gradients = [tf.zeros_like(ele) for ele in model.trainable_weights]
        
        for step, (batch_x_train, batch_y_train) in enumerate(ds):
            gradients, loss_value = train_step(batch_x_train, batch_y_train)
            
            if step % num_accum == 0:
                accumulation_gradients = [grad / num_accum for grad in accumulation_gradients]
                optimizer.apply_gradients(zip(gradients, model.trainable_weights))

                # zero-like init
                accumulation_gradients = [tf.zeros_like(ele) for ele in model.trainable_weights]
            else:
                accumulation_gradients = [(accum_grad + grad) for accum_grad, grad in zip(accumulation_gradients, gradients)]

            if step % 100 == 0:
                print(f"Loss at Step: {step} : {loss_value:.4f}")
            
        train_acc = train_acc_metric.result()
        print(f'Accuracy : {(train_acc * 100):.4f}%')
        train_acc_metric.reset_states()
        
# start training
train()

mecab 설치 관련 내용을 찾아본 결과, .tar 확장자를 다운받아 여러 가지를 직접 설치해주는 방법도 있지만 가장 편한 것은 아래 명령어를 쓰는 것입니다. 쉘에 그대로 복붙해주세요

bash <(curl -s https://raw.githubusercontent.com/konlpy/konlpy/master/scripts/mecab.sh)

 

만약, 에러가 난다면

  1. brew upgrade zsh
    - 에러랑은 거의 무관하지만 zsh 쓴다는 가정하에 업그레이드를 한번 해주고 시작합니다
    - 셀을 실행시켰을 때 dydl library not loded ~ 와 같은 에러 로그가 발생한다면, zsh 업그레이드로 해결됩니다
  2. xcode-select --install
    - xcrun 등 로그가 발생한다면 2번에서 해결될 가능성이 높습니다. gcc를 먼저 설치하기 전에 맥 개발자 도구부터 설치 또는 업데이트 해줍니다
  3. brew install gcc
    - 2번을 진행하고 나서 gcc 설치 명령어를 한번 더 수행한 후에, 맨 위 명령어를 다시 실행해보세요
    - 보통 C compiler 관련 로그가 여기서 해결됩니다
  4. pip install mecab-python
    - 만약 mecab을 설치하는 마지막 과정에서 mecab-python 설치 에러 로그가 발생한다면, pip를 활용해서 직접 설치해줍니다. 여기까지 전부 성공하면 mecab 실행과 관련한 모든 패키지 설치가 완료된 겁니다

jupyter notebook으로 파이썬을 사용할 때, 자주 사용하는 라이브러리(NumPy, Pandas 등)를 항상 임포트(import)하는게 어려운 것은 아니지만 노트북을 생성할때마다 임포트하는 것은 여간 번거로운 일이 아닙니다.

startup 설정을 진행하면 jupyter notebook을 실행할 때 00-first_.py 파일이 실행되면서 여기에 입력해둔 import 구문을 별도로 다시 입력해주지 않아도 되는 편리함을 느낄 수 있습니다.


  1. 배쉬 or 명령 프롬프트(cmd)를 실행합니다.
  2. ipython profile create 명령어 입력
    1. 이 명령어를 입력하면 프로파일이 생깁니다.
    2. 만약, 프로파일을 처음 생성하는 것이면 아래와 같이 몇 가지 라인이 자동으로 입력되는데, 바로 밑에서 .ipython으로 들어갈 경로를 모른다면 이때 생기는 코멘트를 참고하면 됩니다
      [ProfileCreate] Generating default config file: ~~~~~​
  3. "startup"이 존재하는 경로로 진입합니다
    1. cd .ipython/profile_default\
    2. dir 명령어로 startup이 있는지 확인합니다
    3. startup 경로로 진입합니다 cd startup
  4. 00-first_.py 파일을 작성합니다
    1. 00-first_.py 파일은 jupyter notebook이 실행될 때 먼저 자동으로 실행됩니다.
      즉, import 구문을 넣어두면 우리가 선언해주지 않아도 선언된 상태로 노트북을 사용할 수 있습니다.
    2. startup 경로에 진입한 상태에서 jupyter notebook을 실행시킵니다.
    3. New -> Text File, 00-first_.py로 이름 변경
    4. 자신이 사용할 코드를 삽입하고 저장합니다.
      # 구문 예시
      import numpy as np
      import pandas as pd
      import matplotlib.pyplot as plt
      import seaborn as sns
  5. 이제 쥬피터 노트북을 재실행하고, 임포트를 직접 선언해주지 않아도 라이브러리 사용이 가능한지 확인하면 끝!

 

 

매번 찾아보기 번거로워 사용하는 것만 정리해서 올려두었습니다.


가장 먼저 정규표현식하면 필수적으로 나오는 이메일과 핸드폰 번호 예제입니다.

# 스타일이나 표현하고 싶은 방법에 따라 다름, 여기선 가장 기본적인 문법
# 경우에 따라 특수문자를 표현하기 위해 %, - 등을 집어넣을 수 있다
# ex) r'([\w.%-]+@ 등)'
re.findall(r'(\w+)@(\w+).(\w+)', 'test123@tistory.com')
# [('test123', 'tistory', 'com')]
re.findall(r'(\w+)-(\w+)-(\w+)', '010-1234-5678')
# [('010', '1234', '5678')]

 

아래 예제에서는 findall로 대부분 예제를 작성했지만, 주로 search, match, findall을 사용합니다.

  • []
    • [] 안에 정의한 문자들에 포함되는 것을 찾는다
    • 단, '^'가 []의 맨 앞에 올 경우, 해당 패턴이 아닌 것을 찾는다
re.findall('[#4]', '1#a2b3c4')
# ['#', '4']

re.findall('[A-Z]', '12a3B4DZ')
# ['B', 'D', 'Z']

re.findall('[^0-9]', '12a3B4DZ')
# ['a', 'B', 'D', 'Z']
  • . (점)
    • 한 개 chracter(모든 문자)와 일치하는 것을 찾는다, .. 이면 character 두 개가 이어진 것을 찾는다
    • 만약 '점' 자체를 찾고 싶다면, \.으로 표현해주면 된다
re.findall('[0-9].[a-z]', '1#a2b3c4')
# ['1#a']
  • \w
    • character를 의미, \w\w는 character 두 개가 이어진 것을 의미한다
    • \W는 character가 아닌 문자를 의미
re.findall('[0-9]\w', '1a2b3c4')
# ['1a', '2b', '3c']
  • \d
    • 숫자를 의미, \d\d는 숫자 두 개가 이어진 것을 의미한다
    • \D는 숫자가 아닌 문자를 의미
re.findall('\d[a-z]', '1a2b3c4')
# ['1a', '2b', '3c']
  • \s
    • 공백을 의미(space, tab 등)
    • \S는 공백이 아닌 문자를 의미
  • ^과 $의 의미
    • ^: 문자열의 시작부터 일치
    • $: 문자열의 끝이 일치
re.findall(r'^abc', 'abc') # 'abc' -> a로 시작해야함
re.findall(r'^abc', '@abc') # 못찾음

re.findall(r'abc$', 'abc') # 'abc' -> c로 끝나야함
re.findall(r'abc$', 'abc@') # 못찾음
  • 문자열 앞에 r을 붙임
    • \t, \n 등 escape 표현을 무시하고 그대로 표현
a = r'test\n'
print(a) # test\n
  • '+', '*', '?'
    • +: 패턴이 적어도 한번은 나와야 추출
    • *: 패턴이 0번 이상인 경우 추출, 즉 없어도 추출함(+와 차이점)
    • ?: 패턴이 아예 없거나(0) 한번만 나오거나(1)
re.findall(r'a1+dz+', 'a1dzzzc') # 'a1dzzz'
re.findall(r'a1+dz+', 'adzzzc') # 못찾음

re.findall(r'a1*dz*', 'a1dzzzc') # 'a1dzzz'
re.findall(r'a1*dz*', 'adzzzc') # adzzz 찾음

re.findall(r'a1?dz?', 'a1dzzzc') # 'a1dz' -> 1 한번, z 한번
re.findall(r'a1?dz?', 'adzzzc') # adz -> 1 없음, z 한번
  • {}
    • 패턴 횟수를 설정해줄 때 사용
re.findall(r'te+st', 'test,teest,teeest') # ['test', 'teest', 'teeest']
re.findall(r'te{3}st', 'test,teest,teeest') # ['teeest'] -> e가 3번 반복되는 것만 추출
re.findall(r'te{2,3}st', 'test,teest,teeest') # ['teest', 'teeest'] -> e가 2~3번 반복되는 것만 추출
  • {}?
    • 최소 기준으로 매칭
re.search(r't.{2,3}', 'test,teest,teeest') # test
re.search(r't.{2,3}?', 'test,teest,teeest') # tes
  • 예제) 특정 문자로 시작되는 패턴 찾기
    • 여기서 다루는 예는 매우 한정적일 수 있습니다~
    • 여기서는 w로 시작되면서 숫자로 끝나는 패턴을 찾아보겠습니다. 단, w와 숫자 사이에는 공백이 있을 수 있고, 없을 수 있습니다.
re.search(r'[^A-Za-z0-9]w[\s]?\d+\d$', 'benz sclass w212') # w212
re.search(r'[^A-Za-z0-9]w[\s]?\d+\d$', 'benz sclass w 212') # w 212

 

1. @staticmethod

함수 객체를 선언하지 않고, 바로 method에 접근해서 사용할 수 있습니다.

class Test:
    def __init__(self):
        pass

    @staticmethod
    def print_x(x):
    	return print(x)
        
Test.print_x(10) # 객체 선언하지않고 바로 사용        

 

2. list의 extend

주로 list 뒤에 값을 붙인다고 하면, append 함수만 생각나는 경우가 많습니다.
extend 함수는 익숙하지 않으면 잘 생각이...

[1, 2, 3]과 [10, 20]에 각각 함수를 사용했을 때 반환 결과는 다음과 같습니다.

  • extend 함수: [1, 2, 3, 10, 20]
  • append 함수: [1, 2, 3, [10, 20]]
a = [1, 2, 3]
b = [10, 20]

a.append(b)
print(a) # [1, 2, 3, [10, 20]]

a.extend(b)
print(a) # [1, 2, 3, 10, 20]

 

3. Pandas -> Numpy

Pandas 라이브러리는 데이터 분석과 관련한 다양한 함수를 제공해줌과 동시에 NumPy 라이브러리와 같이 빠른 성능을 보여줍니다.

하지만 만약 하고 있는 작업을 NumPy Array를 활용한 연산으로 수행할 수 있다면, 되도록이면 NumPy 연산으로 바꿔보는 것을 추천합니다.

생각치 못한 곳에서 속도 향상이 일어날 수 있고, 이후 처리 작업으로 전환하기에도 매우 편리합니다.

 

4. 속도가 정말 느린 것 같다면 배치 연산을 활용

전체 데이터에 전처리(preprocessing) 작업을 수행한 후, 처리된 데이터를 stack, concat과 같은 함수를 사용하여
새로운 저장 공간(ex: list or DataFrame etc.)에 쌓는 경우 속도 저하 현상이 발생할 수 있습니다.

이는 전체 데이터를 한번에 쌓고자하는 작업 때문에 병목 현상이 발생한 경우인데
특히, 우리가 자주쓰는 loc, iloc 함수와 같이 인덱싱 기능이 포함된 함수를 사용할 경우 자주 만나볼 수 있는 문제입니다.

속도 향상을 위해 전체를 한번에 쌓는 방법보다 분할하여 쌓은 뒤, 합쳐(merge)주는 순서로 변경해보세요.
신경망 모델 학습 시에 배치 연산을 하는 것처럼 바꿔보면 큰 속도 향상을 기대해볼 수 있습니다.
(ex: 1,000개 데이터를 100개씩 나누어 10번 처리한 뒤, 한번에 합쳐주는 방법을 의미)

매우 쉬운 방법이지만 한번에 떠오르기 쉽지 않습니다.

 

5. Class Config 설정

클래스를 정의할 때 여러 가지 하이퍼파라미터 정의가 필요할 수 있습니다.

__init__함수내 변수로 선언하여 관리할 수 있지만, 다루는 함수가 많아지거나 복잡해질수록 무언가 실수로 하나씩 변경하지 못하는 실수가 발생하기 마련입니다.

이를 방지하고자 관리하는 스타일에 따라 (1) Config 클래스 (2) 외부 파일 저장 (3) Config dict를 인자로 넘기기 등 방법을 사용할 수 있는데 여기서 볼 것은 (3)번 방법입니다.

**kwargs(dict type)를 사용해서 정의된 하이퍼파라미터를 넘겨준 후, 클래스 변수로 등록하는 과정입니다.
(+ *args는 tuple type)

class Test:
    def __init__(self):
        pass
    
    def set_config(self, **kwargs):
        self.add_entries(**kwargs)
    
    def add_entries(self, **kwargs):
        for key, value in kwargs.items():
            self.__dict__[key] = value
            
config = {
    'num_epochs': 10,
    'batch_size': 1024,
    'hidden_nums': 16,
}

t = Test()
t.set_config(**config)

# 결과는 __dict__로 확인할 수 있다
# {'num_epochs': 10, 'batch_size': 1024, 'hidden_nums': 16}
print(t.__dict__)

 

6. Jupyter Notebook으로 수식 쓰기

수학 공식을 표현하고 싶을 때, LaTeX 방법으로 수식을 만들 수 있지만 복잡하고 어렵습니다.

# RMSE 예시
# $$RMSE=\sqrt {\sum_{i} (Y_{i} - \hat Y_{i})^2}$$

handcalcs 패키지를 활용하면 매우 쉽게 수식을 렌더링할 수 있습니다.

pip install handcalcs로 쉽게 설치할 수 있습니다. 설치 후, 모듈을 임포트합니다.

import handcalcs.render

%%render magic 명령어로 아래와 같이 쉽게 사용할 수 있습니다.

%%render

r = sqrt(a**2 +b**2)
x = (-b + sqrt(b**2 -4*a*c))/(2*a)

실제 Jupyter Notebook Output

언더바(_)를 통해 밑 이름도 지정할 수 있습니다.

%%render

# under -> '_'
a_x = 1
b_under_underunder = 2

7. 파이썬 크롤링하기

파이썬으로 크롤링을 해야할 때, 스크래피(Scrapy) 등 다양한 방법이 있지만 기본적으로 우리가 사용하는 방법은 BeautifulSoupSelenium 라이브러리입니다.

특히, Selenium은 ChromeDriver을 활용해서 뷰(View)를 확인할 수 있는 장점이 있어 명확하고, 몇몇 함수를 사용해 더욱 편리하게 크롤링을 진행할 수 있습니다. 뿐만 아니라 버튼 클릭부터 페이지 변경까지 매우 편리합니다.

그런데 만약 버튼 클릭, 페이지네이션(Pagination), 링크 들어가서 또 링크를 들어가야하는 등 여러 가지 복잡한 액션이 포함된 크롤링을 진행해야할 때는 어떨까요? 뿐만 아니라 파싱해야할 정보량도 많다면?
이때, Selenium만 활용하면 속도가 매우 느리다는 단점을 느낄 수 있습니다. 음... 굳이 별다른 이유를 설명하지 않아도 일단 뷰를 우리에게 제공한다는 점을 생각해보면 일반적으로 납득이 가는 상황입니다.

이에 대한 해결방법으로 BeautifulSoup만 활용해보는 것입니다. BeautifulSoup은 HTTP 통신으로 시각적인 정보를 제공하지 않고 바로 파싱을 진행할 수 있기 때문에 크롤링 속도가 매우 빠르다는 장점이 있습니다.

만약, 여기서 페이지네이션이나 링크를 타고 들어가는 크롤링 과정을 뷰로 확인하고 싶다면, Selenium을 활용하면 되겠죠. 그런데 단점은 속도잖아요. 속도가 걱정된다면?

두 방법을 섞어서 활용해보세요. 여러 가지 액션은 Selenium, 파싱은 BeautifulSoup를 활용하면 빠른 속도로 크롤링을 진행할 수 있습니다.

이 글은 다음 튜토리얼을 번역한 것입니다.

https://keras.io/examples/vision/mlp_image_classification/

 

Keras documentation: Image classification with modern MLP models

Image classification with modern MLP models Author: Khalid Salama Date created: 2021/05/30 Last modified: 2021/05/30 Description: Implementing the MLP-Mixer, FNet, and gMLP models for CIFAR-100 image classification. View in Colab • GitHub source Introduc

keras.io

 


Introduction

이 예제는 최근 많이 사용되고 있는 Attention 방법을 사용하지 않고 MLP(multi-layer perceptron)을 사용하는 세 가지 모델에 대해 CIFAR-100 데이터셋을 활용하여 이미지 분류 문제를 해결해봅니다.

  1. 두 가지 타입의 MLP를 사용하는 MLP Mixer model(by Ilya Tolstikhin et al.)을 구현합니다
  2. based on unparameterized Fourier Transform, FNet model을 구현합니다
  3. gating 방법을 사용하는 gMLP model을 구현합니다

이 예제의 목적은 다른 데이터셋에서 서로 다른 성능을 낼 수 있기 때문에 각 모델의 성능을 비교하는 것이 아닙니다. 모델을 구성하는 main block들을 간단히 구현해보는 예제입니다.

+ 이 예제에서 살펴볼 세 개 모델은 다른 방법에 비해 꽤 간단하지만, 성능은 좋기 때문에 논문을 한번 찾아볼 필요가 있을 것 같습니다.
+ Attention mechnism도 여전히 강력하고, 훌륭한 방법이지만, 최근 연구 트렌드는 MLP인 것 같습니다.

tensorflow version 2.4 또는 그 이상에서 실행할 수 있으며, tf-addons 모듈 설치가 필요합니다.

pip install -U tensorflow-addons

Setup

import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa

Prepare the data

num_classes = 100
input_shape = (32, 32, 3)

(x_train, y_train), (x_test, y_test) = keras.datasets.cifar100.load_data()

print(f"x_train shape: {x_train.shape} - y_train shape: {y_train.shape}")
print(f"x_test shape: {x_test.shape} - y_test shape: {y_test.shape}")

Configure the hyperparameters

weight_decay = 0.0001
batch_size = 128
num_epochs = 50
dropout_rate = 0.2
image_size = 64  # 이미지 resize 크기
patch_size = 8  # 이미지 패치 크기
num_patches = (image_size // patch_size) ** 2  # 패치로 이루어진 data array 크기
embedding_dim = 256  # Number of hidden units.
num_blocks = 4  # Number of blocks.

print(f"Image size: {image_size} X {image_size} = {image_size ** 2}")
print(f"Patch size: {patch_size} X {patch_size} = {patch_size ** 2} ")
print(f"Patches per image: {num_patches}")
print(f"Elements per patch (3 channels): {(patch_size ** 2) * 3}")

Build a classification model

processing block을 포함한 classifier를 구현합니다.

def build_classifier(blocks, positional_encoding=False):
    inputs = layers.Input(shape=input_shape) # inputs: [batch_size, 32, 32, 3]
    # Augment data.
    # augmented: [batch_size, 64, 64, 3]
    augmented = data_augmentation(inputs)

    # Create patches.
    # patches: [batch_size, 64, 192]
    patches = Patches(patch_size, num_patches)(augmented)
    
    # [batch_size, num_patches, embedding_dim] tensor를 생성하기 위한 패치 인코딩 부분입니다.
    x = layers.Dense(units=embedding_dim)(patches)
    
    if positional_encoding:
        positions = tf.range(start=0, limit=num_patches, delta=1)
        position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=embedding_dim
        )(positions)
        x = x + position_embedding

    # Process x using the module blocks.
    x = blocks(x)
    
    # global average pooling을 사용
    # [batch_size, embedding_dim] 형태의 representation tensor를 만듭니다.
    representation = layers.GlobalAveragePooling1D()(x)
    representation = layers.Dropout(rate=dropout_rate)(representation)
    
    # Compute logits outputs.
    logits = layers.Dense(num_classes)(representation)
    
    # Create the Keras model.
    return keras.Model(inputs=inputs, outputs=logits)

Define an experiment

compile, train, evaluate를 위한 구현입니다.

def run_experiment(model):
    # Create Adam optimizer with weight decay.
    optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay,
    )
    # Compile the model.
    # top5-acc -> name = "top5-acc"
    model.compile(
        optimizer=optimizer,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[
            keras.metrics.SparseCategoricalAccuracy(name="acc"),
            keras.metrics.SparseTopKCategoricalAccuracy(5, name="top5-acc"),
        ],
    )
    # learning rate 스케쥴러
    reduce_lr = keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss", factor=0.5, patience=5
    )
    # early stopping callback
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor="val_loss", patience=10, restore_best_weights=True
    )
    # Fit the model.
    history = model.fit(
        x=x_train,
        y=y_train,
        batch_size=batch_size,
        epochs=num_epochs,
        validation_split=0.1,
        callbacks=[early_stopping, reduce_lr],
    )

    _, accuracy, top_5_accuracy = model.evaluate(x_test, y_test)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")
    print(f"Test top 5 accuracy: {round(top_5_accuracy * 100, 2)}%")

    # Return history to plot learning curves.
    return history

Use data augmentation

data_augmentation = keras.Sequential(
    [
        layers.experimental.preprocessing.Normalization(),
        layers.experimental.preprocessing.Resizing(image_size, image_size),
        layers.experimental.preprocessing.RandomFlip("horizontal"),
        layers.experimental.preprocessing.RandomZoom(
            height_factor=0.2, width_factor=0.2
        ),
    ],
    name="data_augmentation",
)
# normalization을 위해 training data의 mean, varfmf 계산합니다.
data_augmentation.layers[0].adapt(x_train)

Implement patch extraction as a layer

Patches class를 이해하려면 tf.image.extract_patches를 이해할 필요가 있습니다.

class Patches(layers.Layer):
    def __init__(self, patch_size, num_patches):
        super(Patches, self).__init__()
        self.patch_size = patch_size
        self.num_patches = num_patches

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, self.num_patches, patch_dims])
        return patches

The MLP-Mixer model

MLP-Mixer model은 MLP만 사용하는 아키텍처이며, 두 가지 유형의 MLP layer를 포함합니다.

  1. 각 이미지 패치를 독립적으로 사용함으로써 per-location feature를 혼합합니다
  2. 채널축으로 적용하여 spatial information을 혼합합니다.

이 방법은 어떻게 보면 Xception model에서 대표적으로 사용한 depthwise separable convolution 구조와 유사해보이지만, 차이점으로는 two chained dense transforms, no max pooling, layer normalization instead of batch normalization 사용에 있습니다.

Implement the MLP-Mixer model

class MLPMixerLayer(layers.Layer):
    def __init__(self, num_patches, hidden_units, dropout_rate, *args, **kwargs):
        super(MLPMixerLayer, self).__init__(*args, **kwargs)

        self.mlp1 = keras.Sequential(
            [
                layers.Dense(units=num_patches),
                tfa.layers.GELU(),
                layers.Dense(units=num_patches),
                layers.Dropout(rate=dropout_rate),
            ]
        )
        self.mlp2 = keras.Sequential(
            [
                layers.Dense(units=num_patches),
                tfa.layers.GELU(),
                layers.Dense(units=embedding_dim),
                layers.Dropout(rate=dropout_rate),
            ]
        )
        self.normalize = layers.LayerNormalization(epsilon=1e-6)

    def call(self, inputs):
        # Apply layer normalization.
        x = self.normalize(inputs)

        # [num_batches, num_patches, hidden_units] -> [num_batches, hidden_units, num_patches]
        x_channels = tf.linalg.matrix_transpose(x)
        
        # mlp1을 채널 독립적으로 적용합니다.
        # Dense Layer는 2-D 이상일 경우, 마지막 차원에서 가중치 연산이 일어납니다 -> 일종의 trick으로 사용
        mlp1_outputs = self.mlp1(x_channels)
        
        # [num_batches, hidden_dim, num_patches] -> [num_batches, num_patches, hidden_units]
        mlp1_outputs = tf.linalg.matrix_transpose(mlp1_outputs)
        
        # Add skip connection.
        x = mlp1_outputs + inputs
        
        # Apply layer normalization.
        x_patches = self.normalize(x)
        
        # mlp2를 각 패치에 독립적으로 적용합니다.
        mlp2_outputs = self.mlp2(x_patches)
        
        # Add skip connection.
        x = x + mlp2_outputs
        
        return x

Build, train, and evaluate the MLP-Mixer model

mlpmixer_blocks = keras.Sequential(
    [MLPMixerLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)]
)
learning_rate = 0.005
mlpmixer_classifier = build_classifier(mlpmixer_blocks)
history = run_experiment(mlpmixer_classifier)

MLP-Mixer 모델은 다른 모델에 비해 사용되는 파라미터 수가 적습니다. 

MLP-Mixer 논문에 언급된 바에 따르면, large-dataset에 pre-trained하고 modern regularization 방법을 함께 사용한다면 SOTA 모델들의 score와 경쟁할 수 있는 수준의 score를 얻을 수 있다고 합니다. 임베딩 차원, 블록 수, 입력 이미지 크기를 늘리거나 다른 패치 크기를 사용하거나 모델을 더 오랫동안 학습시키면 더 좋은 성능을 얻을 수 있습니다.


The FNet model

FNet 모델은 Transformer block과 유사한 구조의 블록을 사용합니다. 하지만 FNet 모델은 self-attention layer 대신, 파라미터에서 자유로운 2D Fourier transformation layer를 사용합니다.

위 모델과 동일하게 패치, 채널 독립적 연산이 사용됩니다.

Implement the FNet module

class FNetLayer(layers.Layer):
    def __init__(self, num_patches, embedding_dim, dropout_rate, *args, **kwargs):
        super(FNetLayer, self).__init__(*args, **kwargs)

        self.ffn = keras.Sequential(
            [
                layers.Dense(units=embedding_dim),
                tfa.layers.GELU(),
                layers.Dropout(rate=dropout_rate),
                layers.Dense(units=embedding_dim),
            ]
        )

        self.normalize1 = layers.LayerNormalization(epsilon=1e-6)
        self.normalize2 = layers.LayerNormalization(epsilon=1e-6)

    def call(self, inputs):
        # Apply fourier transformations.
        x = tf.cast(
            tf.signal.fft2d(tf.cast(inputs, dtype=tf.dtypes.complex64)),
            dtype=tf.dtypes.float32,
        )
        # Add skip connection.
        x = x + inputs
        # Apply layer normalization.
        x = self.normalize1(x)
        # Apply Feedfowrad network.
        x_ffn = self.ffn(x)
        # Add skip connection.
        x = x + x_ffn
        # Apply layer normalization.
        return self.normalize2(x)

Build, train, and evaluate the FNet model

fnet_blocks = keras.Sequential(
    [FNetLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)]
)
learning_rate = 0.001
fnet_classifier = build_classifier(fnet_blocks, positional_encoding=True)
history = run_experiment(fnet_classifier)

The gMLP model

gMLP 모델은 Spatial Gating Unit(SGU)를 사용하는 MLP 아키텍처입니다. SGU는 spatial(channel) dimention을 따라 각 패치가 상호작용할 수 있도록 합니다.

  1.  각 패치를 따라 linear projection을 적용해서 input을 spatially transform합니다
  2. inputs와 spatially transform input을 element-wise multiplication합니다

Implement the gMLP module

class gMLPLayer(layers.Layer):
    def __init__(self, num_patches, embedding_dim, dropout_rate, *args, **kwargs):
        super(gMLPLayer, self).__init__(*args, **kwargs)

        self.channel_projection1 = keras.Sequential(
            [
                layers.Dense(units=embedding_dim * 2),
                tfa.layers.GELU(),
                layers.Dropout(rate=dropout_rate),
            ]
        )

        self.channel_projection2 = layers.Dense(units=embedding_dim)

        self.spatial_projection = layers.Dense(
            units=num_patches, bias_initializer="Ones"
        )

        self.normalize1 = layers.LayerNormalization(epsilon=1e-6)
        self.normalize2 = layers.LayerNormalization(epsilon=1e-6)

    def spatial_gating_unit(self, x):
        # Split x along the channel dimensions.
        # Tensors u and v will in th shape of [batch_size, num_patchs, embedding_dim].
        u, v = tf.split(x, num_or_size_splits=2, axis=2)
        # Apply layer normalization.
        v = self.normalize2(v)
        # Apply spatial projection.
        # [batch_size, num_patches, embedding_dim] -> [batch_size, embedding_dim, num_patches]
        v_channels = tf.linalg.matrix_transpose(v)
        v_projected = self.spatial_projection(v_channels)
        v_projected = tf.linalg.matrix_transpose(v_projected)
        
        # Apply element-wise multiplication.
        return u * v_projected

    def call(self, inputs):
        # Apply layer normalization.
        x = self.normalize1(inputs)
        
        # 여기서 embedding_dim을 2배로 만들어주고,
        # Apply the first channel projection. x_projected shape: [batch_size, num_patches, embedding_dim * 2].
        x_projected = self.channel_projection1(x)
        
        # 2배로 만들어진 channel을 두 개로 쪼개서 하나는 projection을 적용, 하나는 기존걸 유지한 뒤,
        # element-wise multiplication을 적용함. 연산은 skip-connection과 비슷한 원리
        # Apply the spatial gating unit. x_spatial shape: [batch_size, num_patches, embedding_dim].
        x_spatial = self.spatial_gating_unit(x_projected)
        
        # Apply the second channel projection. x_projected shape: [batch_size, num_patches, embedding_dim].
        x_projected = self.channel_projection2(x_spatial)
        
        # Add skip connection.
        return x + x_projected

Build, train, and evaluate the gMLP model

gmlp_blocks = keras.Sequential(
    [gMLPLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)]
)
learning_rate = 0.003
gmlp_classifier = build_classifier(gmlp_blocks)
history = run_experiment(gmlp_classifier)

gMLP 논문에 따르면, embedding dimension, gMLP blocks을 늘리거나 더 오랫동안 학습시키면 더 나은 결과를 얻을 수 있다고 합니다. 또한, 입력 이미지 크기나 패치 크기를 조절해가면서 학습시켜보세요.

또, gMLP 논문에서의 구현은 advanced regularization strategies나 MixUp, CutMix 뿐만 아니라 AutoAugmentation을 사용했습니다.

특정 가상 환경에 설치되어있는 패키지만 복사하고 싶다던지, 아니면 자신의 환경을 다른 사람에게 그대로 전달해줄 작업이 있으면 다음 방법을 사용하면 편리합니다.

(Copy) 의존 패키지 복사하기

1. activate env 가상환경 활성화

2. pip freeze > requirements.txt
명령어를 실행시키면 현재 경로에 txt 파일이 생성됩니다.
requirements는 관례적인 이름이기 때문에 다른 이름을 사용해도 무방합니다.

3. type requirements.txt로 확인 또는 직접 경로에 들어가서 열어봐도 됩니다.

(Install) 의존 패키지 설치하기

freeze 명령어를 통해 생성된 파일이 있는 경로에서 다음 명령어를 실행

1. pip install -r requirements.txt

 

이 글은 다음 튜토리얼을 번역한 것입니다.

 

Keras documentation: Classification with Gated Residual and Variable Selection Networks

Classification with Gated Residual and Variable Selection Networks Author: Khalid Salama Date created: 2021/02/10 Last modified: 2021/02/10 Description: Using Gated Residual and Variable Selection Networks for income level prediction. View in Colab • Git

keras.io


Introduction

이 예제에선 Structured data classification을 위해 Bryan Lim et al.(arxiv.org/abs/1912.09363)에서 제안된 Gated Residual NetworksVariable Selection Networks를 사용해봅니다. GRN은 필요에 따라 비선형성을 제공할 수 있는 유연성을 제공하고, VSN은 성능에 부정적인 영향을 주는 불필요한 feature를 제거하는 역할을 수행합니다. 또한, 이 두 가지를 함께 사용했을 때 모델의 학습 능력을 주요하게 향상시킬 수 있습니다.

이 예제는 논문에서 제안된 전체 모델을 구현하진 않고, 부분적으로 GRN과 VSN만 구현하게 됩니다.

※ TensorFlow 2.3 이상의 버전에서 정상 작동합니다.


The Dataset

데이터셋은 UC Irvine Machine Learning Repo에서 제공하는 United States Census Income Dataset을 사용합니다. 이진 분류이며, 연소득 50K가 넘는 사람인지를 구분하는 문제입니다.

데이터셋은 41개의 feature(7 numerical + 34 categorical)과 ~300K의 인스턴스로 이루어져 있습니다.


Setup

import math
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

Prepare Data

데이터를 로드합니다.

# Column names.
CSV_HEADER = [
    "age", "class_of_worker", "detailed_industry_recode",
    "detailed_occupation_recode", "education", "wage_per_hour",
    "enroll_in_edu_inst_last_wk", "marital_stat", "major_industry_code",
    "major_occupation_code", "race", "hispanic_origin",
    "sex", "member_of_a_labor_union", "reason_for_unemployment",
    "full_or_part_time_employment_stat", "capital_gains", "capital_losses",
    "dividends_from_stocks", "tax_filer_stat", "region_of_previous_residence",
    "state_of_previous_residence", "detailed_household_and_family_stat",
    "detailed_household_summary_in_household", "instance_weight",
    "migration_code-change_in_msa", "migration_code-change_in_reg",
    "migration_code-move_within_reg", "live_in_this_house_1_year_ago",
    "migration_prev_res_in_sunbelt", "num_persons_worked_for_employer",
    "family_members_under_18", "country_of_birth_father",
    "country_of_birth_mother", "country_of_birth_self",
    "citizenship", "own_business_or_self_employed",
    "fill_inc_questionnaire_for_veteran's_admin", "veterans_benefits",
    "weeks_worked_in_year", "year", "income_level"
]

data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census-income.data.gz"
data = pd.read_csv(data_url, header=None, names=CSV_HEADER)

test_data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census-income.test.gz"
test_data = pd.read_csv(test_data_url, header=None, names=CSV_HEADER)

print(f"Data shape: {data.shape}") # (199523, 42)
print(f"Test data shape: {test_data.shape}") # (99762, 42)

target은 ' - 50000.', ' 50000+.' 두 가지로 이루어져 있습니다.

따라서 이 두 가지를 Integer 형태로 바꿔줍니다.

data["income_level"] = data["income_level"].apply(
    lambda x: 0 if x == " - 50000." else 1
)

test_data["income_level"] = test_data["income_level"].apply(
    lambda x: 0 if x == " - 50000." else 1
)

훈련/검증 데이터로 분할합니다.

random_selection = np.random.rand(len(data.index)) <= 0.85
train_data = data[random_selection]
valid_data = data[~random_selection]

분할한 데이터를 csv 파일로 저장해둡니다.

train_data_file = "train_data.csv"
valid_data_file = "valid_data.csv"
test_data_file = "test_data.csv"

train_data.to_csv(train_data_file, index=False, header=False)
valid_data.to_csv(valid_data_file, index=False, header=False)
test_data.to_csv(test_data_file, index=False, header=False)

Define dataset metadata

reading, parsing, encoding이 원할하게 수행될 수 있도록 metadata를 정의하겠습니다.

# Target feature name.
TARGET_FEATURE_NAME = "income_level"

# Weight column name.
WEIGHT_COLUMN_NAME = "instance_weight"

# Numeric feature names.
NUMERIC_FEATURE_NAMES = [
    "age", "wage_per_hour",
    "capital_gains", "capital_losses",
    "dividends_from_stocks", "num_persons_worked_for_employer",
    "weeks_worked_in_year"
]

# Categorical feature와 feature가 가지고 있는 unique label을 구합니다.
CATEGORICAL_FEATURES_WITH_VOCABULARY = {
    feature_name: sorted([str(value) for value in list(data[feature_name].unique())])
    for feature_name in CSV_HEADER
    if feature_name
    not in list(NUMERIC_FEATURE_NAMES + [WEIGHT_COLUMN_NAME, TARGET_FEATURE_NAME])
}

# All features names.
FEATURE_NAMES = NUMERIC_FEATURE_NAMES + list(
    CATEGORICAL_FEATURES_WITH_VOCABULARY.keys()
)

# default 값 처리를 위해 categorical은 ["NA"], 나머지는 [0.0]으로 초기화합니다.
COLUMN_DEFAULTS = [
    [0.0]
    if feature_name in NUMERIC_FEATURE_NAMES + [TARGET_FEATURE_NAME, WEIGHT_COLUMN_NAME]
    else ["NA"]
    for feature_name in CSV_HEADER
]

Create a tf.data.Dataset for training and evaluation

file을 read and parsing하고 feature, label 변환을 수행하기 위해 tf.data.Dataset을 사용합니다.

from tensorflow.keras.layers.experimental.preprocessing import StringLookup

def process(features, target):
    for feature_name in features:
        if feature_name in CATEGORICAL_FEATURES_WITH_VOCABULARY:
            # Categorical feature이면 string으로 cast합니다.
            features[feature_name] = tf.cast(features[feature_name], tf.dtypes.string)
    # Get the instance weight.
    weight = features.pop(WEIGHT_COLUMN_NAME)
    
    return features, target, weight

def get_dataset_from_csv(csv_file_path, shuffle=False, batch_size=128):
    # csv 파일을 읽어오는 method
    dataset = tf.data.experimental.make_csv_dataset(
        csv_file_path,
        batch_size=batch_size,
        column_names=CSV_HEADER,
        column_defaults=COLUMN_DEFAULTS,
        label_name=TARGET_FEATURE_NAME,
        num_epochs=1,
        header=False,
        shuffle=shuffle,
    ).map(process)

    return dataset

Create model inputs

def create_model_inputs():
    inputs = {}
    
    for feature_name in FEATURE_NAMES:
        if feature_name in NUMERIC_FEATURE_NAMES:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.float32
            )
        else:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.string
            )
            
    return inputs

Encode input features

Categorical featureencoding_size dimension을 가지는 Embedding Layer를 사용해서 인코딩합니다. Numerical featurelayers.Dense를 사용해서 encoding_size dimension을 가지도록 선형 변환합니다. 따라서, 모든 인코딩된 feature는 동일한 dimension을 가지도록 구성됩니다.

embedding representation으로 변환하는 작업은 아래 테스트 코드로 확인해보세요.

index = StringLookup(
                vocabulary=CATEGORICAL_FEATURES_WITH_VOCABULARY['country_of_birth_father'], 
                mask_token=None, num_oov_indices=0
                )
test = train_data['country_of_birth_father'][:32]

# print(index(test))

embedding_ecoder = layers.Embedding(
                input_dim=len(CATEGORICAL_FEATURES_WITH_VOCABULARY['country_of_birth_father']),
                output_dim=200
            )

print(embedding_ecoder(index(test)))

 

from tensorflow.keras.layers.experimental.preprocessing import CategoryEncoding
from tensorflow.keras.layers.experimental.preprocessing import StringLookup

def encode_inputs(inputs, encoding_size):
    encoded_features = []
    
    for feature_name in inputs:
        if feature_name in CATEGORICAL_FEATURES_WITH_VOCABULARY:
            vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]
            # Create a lookup to convert a string values to an integer indices.
            # String value를 Integer index로 변환합니다.
            # 따라서, 동일한 label은 동일한 index로 변환됩니다.
            index = StringLookup(
                vocabulary=vocabulary, mask_token=None, num_oov_indices=0
            )
            # String -> Integer index
            value_index = index(inputs[feature_name])
            # encoding_size dimension을 가지는 Embedding layer를 정의합니다.
            embedding_ecoder = layers.Embedding(
                input_dim=len(vocabulary), output_dim=encoding_size
            )
            # index value를 embedding representation으로 변환합니다.
            encoded_feature = embedding_ecoder(value_index)
        else:
            # encoding_size unit을 가지는 Dense layer를 활용해서 numeric feature를 인코딩합니다.
            encoded_feature = tf.expand_dims(inputs[feature_name], -1)
            encoded_feature = layers.Dense(units=encoding_size)(encoded_feature)
        encoded_features.append(encoded_feature)
        
    return encoded_features

Implement the Gated Linear Unit

GLU는 주어진 작업에서 연관없는 input을 사용하지 않게끔 하는 유연성을 제공합니다.

class GatedLinearUnit(layers.Layer):
    def __init__(self, units):
        super(GatedLinearUnit, self).__init__()
        self.linear = layers.Dense(units)
        self.sigmoid = layers.Dense(units, activation="sigmoid")

    def call(self, inputs):
        return self.linear(inputs) * self.sigmoid(inputs)

Implement the Gated Residual Network

GRN은 다음과 같이 구성됩니다.

  1. ELU Activation을 적용합니다.
  2. Dropout을 적용합니다.
  3. GLU를 적용하고, original input과 Adding합니다. dimension이 다르면 project layer를 활용합니다.
  4. normalization을 적용하고, output을 반환합니다.
class GatedResidualNetwork(layers.Layer):
    def __init__(self, units, dropout_rate):
        super(GatedResidualNetwork, self).__init__()
        self.units = units
        self.elu_dense = layers.Dense(units, activation="elu")
        self.linear_dense = layers.Dense(units)
        self.dropout = layers.Dropout(dropout_rate)
        self.gated_linear_unit = GatedLinearUnit(units)
        self.layer_norm = layers.LayerNormalization()
        self.project = layers.Dense(units)

    def call(self, inputs):
        # 1.
        x = self.elu_dense(inputs)
        # 2.
        x = self.linear_dense(x)
        x = self.dropout(x)
        # 3.
        if inputs.shape[-1] != self.units:
            inputs = self.project(inputs)
        x = inputs + self.gated_linear_unit(x)
        # 4.
        x = self.layer_norm(x)
        
        return x

Implement the Variable Selection Network

VSN은 다음과 같이 구성됩니다.

  1. 각 feature에 개별적으로 GRN을 적용합니다.
  2. GRN을 적용한 feature를 concat하고, softmax 함수를 적용합니다.
  3. 개별 GRN의 가중합을 구합니다.

VSN은 input feature 개수와 상관없이 [batch_size, encoding_size] 형태를 가지는 output을 반환합니다.

class VariableSelection(layers.Layer):
    def __init__(self, num_features, units, dropout_rate):
        super(VariableSelection, self).__init__()
        self.grns = list()
        # Create a GRN for each feature independently
        for idx in range(num_features):
            grn = GatedResidualNetwork(units, dropout_rate)
            self.grns.append(grn)
        # Create a GRN for the concatenation of all the features
        self.grn_concat = GatedResidualNetwork(units, dropout_rate)
        self.softmax = layers.Dense(units=num_features, activation="softmax")

    def call(self, inputs):
        v = layers.concatenate(inputs) # (batch_size, embedding_size * num_features)
        v = self.grn_concat(v) # (batch_size, units)
        v = tf.expand_dims(self.softmax(v), axis=-1) # (batch_size, num_features, 1)

        x = []
        for idx, input in enumerate(inputs):
            x.append(self.grns[idx](input))
        x = tf.stack(x, axis=1) # (batch_size, num_features, units)

        # (batch_size, units)
        # (1, num_features) by (num_features, units) -> (1, units)
        outputs = tf.squeeze(tf.matmul(v, x, transpose_a=True), axis=1)

        return outputs

# test용 코드
# VariableSelection(num_features = 2, units = 100, dropout_rate = 0.1)([embedding_ecoder(index(test)), embedding_ecoder(index(test))])

Create Gated Residual and Variable Selection Networks model

def create_model(encoding_size):
    inputs = create_model_inputs()
    
    feature_list = encode_inputs(inputs, encoding_size)
    num_features = len(feature_list)

    features = VariableSelection(num_features, encoding_size, dropout_rate)(
        feature_list
    )

    outputs = layers.Dense(units=1, activation="sigmoid")(features)
    model = keras.Model(inputs=inputs, outputs=outputs)

    return model

Compile, train, and evaluate the model

learning_rate = 0.001
dropout_rate = 0.15
batch_size = 265
num_epochs = 20
encoding_size = 16

model = create_model(encoding_size)
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
    loss=keras.losses.BinaryCrossentropy(),
    metrics=[keras.metrics.BinaryAccuracy(name="accuracy")],
)


# Create an early stopping callback.
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=5, restore_best_weights=True
)

print("Start training the model...")
train_dataset = get_dataset_from_csv(
    train_data_file, shuffle=True, batch_size=batch_size
)
valid_dataset = get_dataset_from_csv(valid_data_file, batch_size=batch_size)
model.fit(
    train_dataset,
    epochs=num_epochs,
    validation_data=valid_dataset,
    callbacks=[early_stopping],
)
print("Model training finished.")

print("Evaluating model performance...")
test_dataset = get_dataset_from_csv(test_data_file, batch_size=batch_size)
_, accuracy = model.evaluate(test_dataset)

print(f"Test accuracy: {round(accuracy * 100, 2)}%")

예제에서는 95% acc를 달성합니다. 

acc 향상을 위해 encoding_size를 조절하거나 여러 개의 GRN 또는 VSN을 사용해볼 수 있습니다. 또, dropout_rate 조절은 overfitting을 피할 수 있도록 도울겁니다.

StringLookup은 엑셀에서 굳이 비슷한 이름을 가진 함수를 찾아보자면, VLookup, HLookup처럼 인덱스를 찾아주는 함수인데, tensorflow version 2.4부터 tf.keras.layers.experimental.preprocessing.StringLookUp으로 만나볼 수 있습니다.

tf.keras.layers.experimental.preprocessing.StringLookup(
    max_tokens=None, num_oov_indices=1, mask_token='',
    oov_token='[UNK]', vocabulary=None, encoding=None, invert=False,
    **kwargs
)

사용 방법은 예제만 봐도 단번에 알 수 있는데요. vocabulary를 만들어서 StringLookUp에게 던져주면, word Embedding처럼 각 data value들이 index로 표현될 수 있도록 도와주는 함수입니다.

vocab = ["a", "b", "c", "d"]
data = tf.constant([["a", "c", "d"], ["d", "z", "b"]])
layer = StringLookup(vocabulary=vocab)
layer(data)
# <tf.Tensor: shape = (2, 3), dtype=int64, numpy =
# array([[2, 4, 5],
#       [5, 1, 3]])>

"a"가 index 2인 이유는 바로 다음 예제를 보면 단번에 알 수 있습니다. OOV 처리 등.

data = tf.constant([["a", "c", "d"], ["d", "z", "b"]])
layer = StringLookup()
layer.adapt(data)

layer.get_vocabulary()
# ['','[UNK]', 'd', 'z', 'c', 'b', 'a']

 

그런데 TesnorFlow 2.4 이전 버전은 StringLookUp 함수가 제공되지 않기 때문에 다른 방법으로 사용해야 합니다.
물론 다양한 방법이 있겠지만, tensorflow가 제공하는 함수를 활용해볼거라면 tensorflow lookup 모듈을 고려해볼 수 있었습니다.

이를 위해선 두 가지를 사용해야 합니다.

  • tf.lookup.StaticHashTable
  • tf.lookup.KeyValueTensorInitializer

tf.lookup.KeyValueTensorInitializer에서 vocab(key)와 data value(value)를 준비해주면, StaticHashTable에 넣어 Table을 만드는 구조인 것으로 보입니다.

이 역시 텐서플로우 공식 홈페이지 예제를 보면 쉽게 이해할 수 있습니다.

keys_tensor = tf.constant(['a', 'b', 'c'])
vals_tensor = tf.constant([7, 8, 9])
input_tensor = tf.constant(['a', 'f'])

init = tf.lookup.KeyValueTensorInitializer(keys_tensor, vals_tensor)
table = tf.lookup.StaticHashTable(
    init,
    default_value=-1)
    
table.lookup(input_tensor).numpy()
# array([7, -1], dtype = int32)

중요하진 않지만, tf 2.2에서는 table.lookup(input_tensor).numpy()가 작동하지 않습니다.
예제가 완벽히 설명해주고 있으니 추가로 설명하진 않겠습니다.

이를 활용하면 StringLookUp처럼 사용할 수 있는데, 형변환 에러가 발생합니다
(tf 2.4에서는 아마 발생하지 않을거에요).

그래서 예제처럼 사용하기보다는 Custom_Layer로 만든 후, Tensor로 변환시키게끔해서 사용해야 하는 것 같습니다.

다음처럼 구현할 수 있고, 물론 중요한 Embedding Layer와도 연결해서 사용할 수 있습니다.

import tensorflow as tf

key = tf.range(start = 0, limit = 100, delta = 1, dtype = tf.int64)
value = tf.range(start = 0, limit = 100, delta = 1, dtype = tf.int64)
table = tf.lookup.StaticHashTable(initializer=tf.lookup.KeyValueTensorInitializer(keys = key, values = value),
                                  default_value = -1)

class custom_layer(tf.keras.layers.Layer):
  def __init__(self):
    super(custom_layer, self).__init__()

  def call(self, inputs):
    return table.lookup(inputs)

custom_layer = custom_layer()

# layer 연결
inputs = tf.keras.layers.Input(shape = (1, ), dtype = tf.int64)
encoded_feature = custom_layer(inputs)
embedding_feature = tf.keras.layers.Embedding(input_dim = 100, 
                                              output_dim = 100)(encoded_feature)

model = tf.keras.models.Model(inputs = inputs, outputs = embedding_feature)

# Embedding을 사용하지 않는 경우의 test example
# test_data = tf.constant([[50],
#                         [37]], dtype = tf.int64)

# model.predict(test_data) # array([[50], [37]])

- 어쨌든 버전이 높으면 뭐가 많아서 사용하긴 편해보인다...가 결론

KL-Divergence는 텐서플로우 공식 문서에 다음과 같이 구현되어 있습니다.

loss = y_true * log(y_true / y_pred)

의미는 다르지만 y_true / y_pred을 보니 분포를 비교할 때 사용하는 F-Distribution이 생각나기도 하네요.(그냥 여담)

소스: 위키백과

KLD(이하 KL-Divergence)는 P 분포와 Q 분포가 얼마나 다른지를 측정하는 방법입니다. 여기서 통계적으로 P는 사후, Q는 사전분포를 의미합니다.

텐서플로우 공식 문서에 정의되어있는 용어로 설명해보면, KLD는 y_true(P)가 가지는 분포값과 y_pred(Q)가 가지는 분포값이 얼마나 다른지를 확인하는 방법입니다.

KLD는 값이 낮을수록 두 분포가 유사하다라고 해석합니다. 정보이론에서 흔히 볼 수 있는 엔트로피(Entropy) 또한, 값이 낮을수록 랜덤성이 낮다고 해석하는 것과 비슷합니다.

두 가지의 해석 방법이 비슷한 것은 바로 KLD에 크로스-엔트로피(Cross-Entropy) 개념이 이미 포함되어 있기 때문입니다.

 

KLD와 크로스 엔트로피

정보이론에서 정보량은 다음을 효과적으로 표현하기 위해 로그를 사용하여 표현합니다.

  • 확률이 높을수록 → 매우 당연하게 일어날 사건
  • 확률이 낮으면 → 자주 일어나지 않는 특별한 사건

또, 우리가 흔히 볼 수 있는 엔트로피는 평균 정보량을 나타내므로 다음과 같이 표현합니다.

Entropy

예측할 수 있는 5개 상황이 각각 벌어질 확률을 [0.2, 0.2, 0.2, 0.2, 0.2]라고 가정했을 때, 엔트로피는 0.2 x 5 x log(0.2)가 되겠죠.

이제 KLD에 왜 Cross-Entropy가 포함되어 있는지 보겠습니다. 또, 편의를 위해 아래에서 볼 수 있는 p와 q를 다음과 같이 생각하겠습니다.

  • p : 실제 세계에서 관찰하여 얻어낸 확률 ; 실제 확률분포 P
  • q : 모델이 예측한 확률 ; 확률분포 P로 근사될 분포 Q

KL-Divergence = Cross-Entropy - Entropy

KLD 식을 다시 나누면 그림에서 왼쪽항처럼 나눌 수 있는데, 이는 분명 Entorpy - Entropy같아 보이지만, 맨 앞의 식에서 로그 안의 값이 모델이 예측한 확률 q를 나타내고 있기 때문에 Cross-Entropy - Entropy가 됩니다.

Cross-Etnropy

결과적으로 모델이 예측한 확률분포(Q)의 정보량과 실제 확률분포(P) 정보량의 차이를 계속 학습함으로써 Q를 P에 근사한다고 표현할 수 있습니다. 그래서 이에 대한 차이(정보량)를 분포가 유사한지에 대한 정도로 다시 해석할 수 있는 것입니다.

 

모델 학습에서의 KLD

우리가 보통 Classification 문제에서 Binary 또는 Categorical Cross-Entropy를 쓰는데, 사실 KLD를 사용하는 것과 동일하다고 표현해도 무방합니다.

위 식에서 Entropy에 해당하는 부분은 실제 값으로 고정된 값이기 때문에 (Loss 최소화에 영향을 주지 않아) 생략할 수 있고, 실제 모델이 학습하면서 최소화할 부분은 KLD 식의 앞부분에 해당하는 Cross-Entropy이기 때문이죠.

하지만 실제 진짜를 모방하기 위해 가짜의 분포를 정말 잘 만들어내야 하는 GAN에서는 이에 대한 정보가 굉장히 중요한 것 같습니다.

그래서 실제로 증명 과정에서 KLD를 사용하진 않고, KLD를 거리 개념으로 해석할 수 있게 변환한 Jensen-Shannon divergence를 사용합니다. 

Jensen-Shannon Divergence

우리가 흔히 생각하는 거리는 A와 B를 보았을 때, 다음을 만족해야 합니다.

  • A 기준에서 바라본 B까지의 거리 = B 기준에서 바라본 A까지의 거리

 

하지만 KLD는 KLD(P || Q) ≠ KLD(Q || P) 이기 때문에 거리로서 해석될 수 없습니다.