KL-Divergence는 텐서플로우 공식 문서에 다음과 같이 구현되어 있습니다.

loss = y_true * log(y_true / y_pred)

의미는 다르지만 y_true / y_pred을 보니 분포를 비교할 때 사용하는 F-Distribution이 생각나기도 하네요.(그냥 여담)

소스: 위키백과

KLD(이하 KL-Divergence)는 P 분포와 Q 분포가 얼마나 다른지를 측정하는 방법입니다. 여기서 통계적으로 P는 사후, Q는 사전분포를 의미합니다.

텐서플로우 공식 문서에 정의되어있는 용어로 설명해보면, KLD는 y_true(P)가 가지는 분포값과 y_pred(Q)가 가지는 분포값이 얼마나 다른지를 확인하는 방법입니다.

KLD는 값이 낮을수록 두 분포가 유사하다라고 해석합니다. 정보이론에서 흔히 볼 수 있는 엔트로피(Entropy) 또한, 값이 낮을수록 랜덤성이 낮다고 해석하는 것과 비슷합니다.

두 가지의 해석 방법이 비슷한 것은 바로 KLD에 크로스-엔트로피(Cross-Entropy) 개념이 이미 포함되어 있기 때문입니다.

 

KLD와 크로스 엔트로피

정보이론에서 정보량은 다음을 효과적으로 표현하기 위해 로그를 사용하여 표현합니다.

  • 확률이 높을수록 → 매우 당연하게 일어날 사건
  • 확률이 낮으면 → 자주 일어나지 않는 특별한 사건

또, 우리가 흔히 볼 수 있는 엔트로피는 평균 정보량을 나타내므로 다음과 같이 표현합니다.

Entropy

예측할 수 있는 5개 상황이 각각 벌어질 확률을 [0.2, 0.2, 0.2, 0.2, 0.2]라고 가정했을 때, 엔트로피는 0.2 x 5 x log(0.2)가 되겠죠.

이제 KLD에 왜 Cross-Entropy가 포함되어 있는지 보겠습니다. 또, 편의를 위해 아래에서 볼 수 있는 p와 q를 다음과 같이 생각하겠습니다.

  • p : 실제 세계에서 관찰하여 얻어낸 확률 ; 실제 확률분포 P
  • q : 모델이 예측한 확률 ; 확률분포 P로 근사될 분포 Q

KL-Divergence = Cross-Entropy - Entropy

KLD 식을 다시 나누면 그림에서 왼쪽항처럼 나눌 수 있는데, 이는 분명 Entorpy - Entropy같아 보이지만, 맨 앞의 식에서 로그 안의 값이 모델이 예측한 확률 q를 나타내고 있기 때문에 Cross-Entropy - Entropy가 됩니다.

Cross-Etnropy

결과적으로 모델이 예측한 확률분포(Q)의 정보량과 실제 확률분포(P) 정보량의 차이를 계속 학습함으로써 Q를 P에 근사한다고 표현할 수 있습니다. 그래서 이에 대한 차이(정보량)를 분포가 유사한지에 대한 정도로 다시 해석할 수 있는 것입니다.

 

모델 학습에서의 KLD

우리가 보통 Classification 문제에서 Binary 또는 Categorical Cross-Entropy를 쓰는데, 사실 KLD를 사용하는 것과 동일하다고 표현해도 무방합니다.

위 식에서 Entropy에 해당하는 부분은 실제 값으로 고정된 값이기 때문에 (Loss 최소화에 영향을 주지 않아) 생략할 수 있고, 실제 모델이 학습하면서 최소화할 부분은 KLD 식의 앞부분에 해당하는 Cross-Entropy이기 때문이죠.

하지만 실제 진짜를 모방하기 위해 가짜의 분포를 정말 잘 만들어내야 하는 GAN에서는 이에 대한 정보가 굉장히 중요한 것 같습니다.

그래서 실제로 증명 과정에서 KLD를 사용하진 않고, KLD를 거리 개념으로 해석할 수 있게 변환한 Jensen-Shannon divergence를 사용합니다. 

Jensen-Shannon Divergence

우리가 흔히 생각하는 거리는 A와 B를 보았을 때, 다음을 만족해야 합니다.

  • A 기준에서 바라본 B까지의 거리 = B 기준에서 바라본 A까지의 거리

 

하지만 KLD는 KLD(P || Q) ≠ KLD(Q || P) 이기 때문에 거리로서 해석될 수 없습니다.

이 글은 다음 튜토리얼을 번역한 것입니다.


Introduction

이 예제는 Structured Data Classification을 위해 P. Kontschieder et al에서 제안된 Deep neural Dicision Forest 구현에 관한 내용입니다. 확률적이고 미분가능한 dicision tree 구성, end-to-end 학습 방법, deep representation learning을 통한 decision tree 통합을 어떻게 하는지를 다룹니다.

 

The Dataset

이 예제는  UC Irvine Machine Learning Repository에서 제공하고 있는 United states Census Income Dataset을 활용합니다. 한 개인이 연간 50,000 USD 이상 버는지, 벌지 못하는지에 관한 이진 분류 문제입니다.

5개 numerical, 9개 categorical로 이루어진 총 14개 특징(나이, 직업 수준, 교육, 직업 등)과 함께 48,842개의 데이터를 포함하고 있습니다.

 

Setup

import tensorflow as tf
import numpy as np
import pandas as pd
from tensorflow import keras
from tensorflow.keras import layers
import math

 

Prepare the data

CSV_HEADER = [
    "age", "workclass", "fnlwgt",
    "education", "education_num", "marital_status",
    "occupation", "relationship", "race",
    "gender", "capital_gain", "capital_loss",
    "hours_per_week", "native_country","income_bracket"
]

train_data_url = (
    "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data"
)
train_data = pd.read_csv(train_data_url, header=None, names=CSV_HEADER)

test_data_url = (
    "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test"
)
test_data = pd.read_csv(test_data_url, header=None, names=CSV_HEADER)

print(f"Train dataset shape: {train_data.shape}")
print(f"Test dataset shape: {test_data.shape}")

테스트 데이터에서 첫 번째 데이터는 유효하지 않으므로 삭제하고, 레이블 데이터에 포함되어 있는 '.'을 없애줍니다.

test_data = test_data[1:]
# <=50K. -> <=50K
test_data.income_bracket = test_data.income_bracket.apply(
    lambda value: value.replace(".", "")
)

학습 데이터와 트레이닝 데이터를 csv로 저장합니다.

 

Define dataset metadata

이제 데이터에 포함되어 있는 특징들에 대해 분석과 encoding이 용이하도록 메타데이터를 정의합니다.

# numerical feature 입니다.
NUMERIC_FEATURE_NAMES = [
    "age", "education_num",
    "capital_gain","capital_loss",
    "hours_per_week"
]

# categorical feature과 unique한 값만 모아놓은 vocabulary 입니다.
CATEGORICAL_FEATURES_WITH_VOCABULARY = {
    "workclass": sorted(list(train_data["workclass"].unique())),
    "education": sorted(list(train_data["education"].unique())),
    "marital_status": sorted(list(train_data["marital_status"].unique())),
    "occupation": sorted(list(train_data["occupation"].unique())),
    "relationship": sorted(list(train_data["relationship"].unique())),
    "race": sorted(list(train_data["race"].unique())),
    "gender": sorted(list(train_data["gender"].unique())),
    "native_country": sorted(list(train_data["native_country"].unique())),
}
# fnlwgt는 특징으로 사용하지 않습니다.
IGNORE_COLUMN_NAMES = ["fnlwgt"]
# categorical feature 입니다.
CATEGORICAL_FEATURE_NAMES = list(CATEGORICAL_FEATURES_WITH_VOCABULARY.keys())
# 모든 input feature 입니다.
FEATURE_NAMES = NUMERIC_FEATURE_NAMES + CATEGORICAL_FEATURE_NAMES
# 각 특징별로 default value를 지정해놓습니다.
COLUMN_DEFAULTS = [
    [0.0] if feature_name in NUMERIC_FEATURE_NAMES + IGNORE_COLUMN_NAMES else ["NA"]
    for feature_name in CSV_HEADER
]
# target feature 입니다.
TARGET_FEATURE_NAME = "income_bracket"
# target feature에 포함되어 있는 label입니다.
TARGET_LABELS = [" <=50K", " >50K"]

Create tf.data.Dataset objects for training and validation

tf.data.Dataset을 활용해서 파일, feature과 label을 읽고 분석할 수 있는 Dataset 객체로 변환합니다. 또, label은 각각 index로 매핑합니다.

from tensorflow.keras.layers.experimental.preprocessing import StringLookup

target_label_lookup = StringLookup(
    vocabulary=TARGET_LABELS, mask_token=None, num_oov_indices=0
)


def get_dataset_from_csv(csv_file_path, shuffle=False, batch_size=128):
    dataset = tf.data.experimental.make_csv_dataset(
        csv_file_path,
        batch_size=batch_size,
        column_names=CSV_HEADER,
        column_defaults=COLUMN_DEFAULTS,
        label_name=TARGET_FEATURE_NAME,
        num_epochs=1,
        header=False,
        na_value="?",
        shuffle=shuffle,
    ).map(lambda features, target: (features, target_label_lookup(target)))
    return dataset.cache()

Create model inputs

def create_model_inputs():
    inputs = {}
    for feature_name in FEATURE_NAMES:
        # Numerical과 Categorical을 구분합니다.
        if feature_name in NUMERIC_FEATURE_NAMES:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.float32
            )
        else:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.string
            )
    return inputs

Encode input features

from tensorflow.keras.layers.experimental.preprocessing import CategoryEncoding
from tensorflow.keras.layers.experimental.preprocessing import StringLookup

def encode_inputs(inputs, use_embedding=False):
    encoded_features = []
    for feature_name in inputs:
        # Categorical일 경우
        if feature_name in CATEGORICAL_FEATURE_NAMES:
            vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]
            # vocabulary에 포함되어 있는 unique value를 integer indice로 변환합니다.
            # (oov), mask token은 사용하지 않으므로
            # mask_token = None, num_oov_indices = 0으로 설정합니다.
            index = StringLookup(
                vocabulary=vocabulary, mask_token=None, num_oov_indices=0
            )
            # String value -> Integer value
            value_index = index(inputs[feature_name])

            # Embedding을 사용하는 경우
            if use_embedding:
                # embedding_dims를 정의하고, Embedding을 만듭니다.
                embedding_dims = int(math.sqrt(len(vocabulary)))
                embedding_ecoder = layers.Embedding(
                    input_dim=len(vocabulary), output_dim=embedding_dims
                )
                # 해당 feature에 포함된 값을 embedding representation으로 변환합니다.
                encoded_feature = embedding_ecoder(value_index)
            # Embedding을 사용하지 않는 경우
            else:
                # one-hot encoder를 선언하고, vocab을 adapt한 뒤, 인덱스로 변환합니다.
                onehot_encoder = CategoryEncoding(output_mode="binary")
                onehot_encoder.adapt(index(vocabulary))
                encoded_feature = onehot_encoder(value_index)
        # Numerical일 경우
        else:
            # Numerical Feature는 shape이 None이므로
            # Concat을 위해 dim을 늘려줍니다.
            encoded_feature = inputs[feature_name]
            if inputs[feature_name].shape[-1] is None:
                print(feature_name)
                encoded_feature = tf.expand_dims(encoded_feature, -1)

        encoded_features.append(encoded_feature)

    encoded_features = layers.concatenate(encoded_features)
    return encoded_features

Deep Neural Decision Tree

neural decision tree를 학습시키기 위해 두 가지 가중치 집합을 사용합니다. 첫 번째 집합인 pi는 tree leaves에서 클래스 확률 분포를 표현합니다. 두 번째 집합인 가중치를 포함한 layer decision_fn은 어떤 leave로 향할지에 대한 확률을 표현하는 집합입니다. forward pass는 아래 순서로 진행됩니다.

  1. 모델은 배치에서 instance의 모든 특성이 single vector로 인코딩된 형태의 input feature를 받습니다. 여기서 single vector는 CNN이나 dense transformation을 통해 만들어집니다. 여기서는 structured data를 사용하므로 dense layer를 사용합니다.
  2. used_features_mask를 사용해서 input feature 중 사용할 subset feature를 랜덤하게 선택합니다.
  3. 그리고 나서 전체 트리를 순회하면서 해당 instance가 leaves에 도달할 확률을 담을 mu를 계산합니다.
  4. 마지막으로 leaves에 도달할 확률과 leaves가 가지고 있는 class 확률이 합쳐져 최종 outputs을 생성하게 됩니다.
class NeuralDecisionTree(keras.Model):
    def __init__(self, depth, num_features, used_features_rate, num_classes):
        super(NeuralDecisionTree, self).__init__()
        self.depth = depth
        self.num_leaves = 2 ** depth
        self.num_classes = num_classes

        # input feature 중 subset feature를 랜덤으로 선택하기 위한 mask를 생성합니다.
        num_used_features = int(num_features * used_features_rate)
        one_hot = np.eye(num_features)
        sampled_feature_indicies = np.random.choice(
            np.arange(num_features), num_used_features, replace=False
        )
        self.used_features_mask = one_hot[sampled_feature_indicies]

        # leaves가 가지고 있는 class weights를 초기화합니다.
        self.pi = tf.Variable(
            initial_value=tf.random_normal_initializer()(
                shape=[self.num_leaves, self.num_classes]
            ),
            dtype="float32",
            trainable=True,
        )

        # 어떤 leave로 향할지에 대한 확률을 담은 decision_fn을 선언합니다.
        self.decision_fn = layers.Dense(
            units=self.num_leaves, activation="sigmoid", name="decision"
        )

    def call(self, features):
        batch_size = tf.shape(features)[0]

        # 마스크를 통해 subset feature를 선택합니다.
        # [batch_size, num_used_features]
        features = tf.matmul(
            features, self.used_features_mask, transpose_b=True
        )  
        # leave로 향할 확률을 계산합니다.
        # [batch_size, num_leaves, 1]
        decisions = tf.expand_dims(
            self.decision_fn(features), axis=2
        )  
        # 향하지 않을 확률을 계산하고, concat합니다.
        # [batch_size, num_leaves, 2]
        decisions = layers.concatenate(
            [decisions, 1 - decisions], axis=2
        )  

        mu = tf.ones([batch_size, 1, 1])

        begin_idx = 1
        end_idx = 2
        # 트리를 순회하면서 instance가 각 leaves에 도달할 확률을 mu에 담습니다.
        for level in range(self.depth):
            mu = tf.reshape(mu, [batch_size, -1, 1])  # [batch_size, 2 ** level, 1]
            mu = tf.tile(mu, (1, 1, 2))  # [batch_size, 2 ** level, 2]
            level_decisions = decisions[
                :, begin_idx:end_idx, :
            ]  # [batch_size, 2 ** level, 2]
            mu = mu * level_decisions  # [batch_size, 2**level, 2]
            begin_idx = end_idx
            end_idx = begin_idx + 2 ** (level + 1)

        mu = tf.reshape(mu, [batch_size, self.num_leaves])  # [batch_size, num_leaves]
        probabilities = keras.activations.softmax(self.pi)  # [num_leaves, num_classes]
        outputs = tf.matmul(mu, probabilities)  # [batch_size, num_classes]
        
        return outputs

Deep Neural Decision Forest

Forest 모델은 동시에 학습되는 여러 개의 Decision Tree로 이루어집니다. 마치 앙상블처럼, Forest 모델의 output은 Tree model output의 평균값입니다. 

class NeuralDecisionForest(keras.Model):
    def __init__(self, num_trees, depth, num_features, used_features_rate, num_classes):
        super(NeuralDecisionForest, self).__init__()
        self.ensemble = []
        # num_trees만큼 DecisionTree 모델을 ensemble에 담습니다.
        # 트리는 각각 input feature를 랜덤하게 선택하기 때문에 서로 다르게 학습됩니다.
        for _ in range(num_trees):
            self.ensemble.append(
                NeuralDecisionTree(depth, num_features, used_features_rate, num_classes)
            )

    def call(self, inputs):
        # Initialize the outputs: a [batch_size, num_classes] matrix of zeros.
        batch_size = tf.shape(inputs)[0]
        outputs = tf.zeros([batch_size, num_classes])

        # 각 트리 output을 모두 더해주고,
        # 평균을 구한 뒤 반환합니다.
        for tree in self.ensemble:
            outputs += tree(inputs)
        outputs /= len(self.ensemble)
        return outputs

마지막으로 학습과 평가를 위한 코드는 아래와 같습니다.

learning_rate = 0.01
batch_size = 265
num_epochs = 10
hidden_units = [64, 64]


def run_experiment(model):

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
        loss=keras.losses.SparseCategoricalCrossentropy(),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )

    print("Start training the model...")
    train_dataset = get_dataset_from_csv(
        train_data_file, shuffle=True, batch_size=batch_size
    )

    model.fit(train_dataset, epochs=num_epochs)
    print("Model training finished")

    print("Evaluating the model on the test data...")
    test_dataset = get_dataset_from_csv(test_data_file, batch_size=batch_size)

    _, accuracy = model.evaluate(test_dataset)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

Experiment 1: train a decision tree model

전체 input feature를 활용해서 Decision tree model을 학습시켜 봅니다.

num_trees = 10
depth = 10
used_features_rate = 1.0 # 전체 feature를 사용합니다.
num_classes = len(TARGET_LABELS)


def create_tree_model():
    inputs = create_model_inputs()
    features = encode_inputs(inputs, use_embedding=True)
    features = layers.BatchNormalization()(features)
    num_features = features.shape[1]

    tree = NeuralDecisionTree(depth, num_features, used_features_rate, num_classes)

    outputs = tree(features)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model


tree_model = create_tree_model()
run_experiment(tree_model)

Experiment 2: train a forest model

이 실험에서는 input feature의 50%만 사용하는 num_trees만큼의 트리를 사용하여 forest model을 학습합니다. used_features_rate를 활용해서 각 트리가 얼만큼의 input feature를 활용할지 정할 수 있습니다. 또, 이전 실험과 다르게 depth는 10에서 5로 조정합니다.

num_trees = 25
depth = 5
used_features_rate = 0.5


def create_forest_model():
    inputs = create_model_inputs()
    features = encode_inputs(inputs, use_embedding=True)
    features = layers.BatchNormalization()(features)
    num_features = features.shape[1]

    forest_model = NeuralDecisionForest(
        num_trees, depth, num_features, used_features_rate, num_classes
    )

    outputs = forest_model(features)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model


forest_model = create_forest_model()

run_experiment(forest_model)

이 글은 다음 예제를 번역한 것입니다.

 

이번 튜토리얼에서는 Embedding Layer를 굉장히 자주 사용하는데, 작동 방식을 간단하게라도 알고 가면 좋을 것 같다. 물론 아래 정리한 글은 매우 정확하게 확인한 것은 아니므로 틀릴 수 있다. 만약 틀리다면 댓글로 정정좀 부탁드립니다 : )

Embedding Layer를 사용했을 때, 결과물 shape가 만들어지는 방식을 보자면

  • data_shape: (None, 500)
  • Embedding(input_dim = 10,000, output_dim = 32)을 사용했을 때,

결과적으로 (None, 500, 32) 형태를 얻게 된다.

여기서 한 단계 더 들어가면 어떻게 (500, 32) shape이 만들어지는지 궁금해질 수 있다. 추측하건대 (500, 32)가 만들어지는 방식은 다음과 같다.

  1. 먼저, Embedding Vector를 (10,000, 32) 형태로 Random weight를 가지도록 초기화한다.
  2. 다음으로 tokenizing된 [0, 1, 20, 54, ..., 100] 으로 이루어져 있는 데이터를 index로 사용해서 인덱스가 존재하는 부분은 활성화하여 Gradient Descent에 반영되도록 한다.
  3. 이 때문에 Embedding_input_dim은 주로 vacab_size를 사용하는데, index를 통해 해당 단어가 들어왔다는 것을 알려주어야 하기 때문인 것 같다.
  4. 이와 같은 방법으로 GD를 통해 계속 업데이트해서 특정 단어나 feature가 가지고 있는 특성을 Vector화시키는 것 같다.

Introduction

Movielens dataset을 활용하여 Behavior Sequence Transformer (BST) model by Qiwei Chen et al.을 사용해보는 예제입니다. BST 모델은 대상 영화에 대한 유저의 평점을 예측하기 위해 유저 프로파일, 영화 특성뿐만 아니라 영화를 보고 평가할 때 유저의 순차적인 행동을 활용합니다.

더 구체적으로는 BST 모델이 아래 input을 사용하면서 대상 영화의 평점을 예측하게 됩니다.

  1. 유저가 시청한 영화를 나타내는 고정 길이 시퀀스 movie_ids
  2. 유저가 시청한 영화의 평점을 나타내는 고정 길이 시퀀스 ratings
  3. user_id, sesx, occupation, age_group을 포함한 유저 특성
  4. 각 영화의 장르를 나타내는 genre
  5. 평점을 예측하기 위한 대상 영화를 나타내는 target_movie_id

이번 예제에서는 논문에서 나타난 BST 모델 그대로를 사용하지 않고 다음처럼 변형하여 사용합니다.

  1. 영화 특성을 "other_features"로 정의하여 transformer layer input으로 사용하지 않고 따로 concat하여 사용하는 대신, 대상 영화와 인풋 시퀀스에서의 각 영화를 임베딩을 활용하여 통합합니다.
  2. 영화 평점과 position 임베딩을 활용하여 self-attention layer에 입력할 transformer feature를 만듭니다.

TensorFlow 2.4 또는 그 이상 버전에서 정상적으로 작동합니다.


The dataset

1M version of the Movielens dataset을 활용하여 예제를 진행합니다. 데이터셋은 유저 특성, 영화 장르를 포함하여 4,000개 영화에 대해 6,000명 유저가 평가한 백만개의 평점을 포함하고 있습니다. 또, 사용자가 영화를 평가한 timestamp를 포함하고 있기 때문에 학습을 위해 BST 모델이 필요로 하는 시퀀스 형태의 영화 평점 데이터를 만들 수 있습니다.


Setup

import os
import math
from zipfile import ZipFile
from urllib.request import urlretrieve
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental.preprocessing import StringLookup

Prepare the data

Download and prepare the DataFrames

데이터를 다운로드합니다.

users.dat, movies.dat, ratings.dat 파일을 얻을 수 있습니다.

urlretrieve("http://files.grouplens.org/datasets/movielens/ml-1m.zip", "movielens.zip")
ZipFile("movielens.zip", "r").extractall()

사용할 column을 명시하고, 데이터를 로드합니다.

users = pd.read_csv(
    "ml-1m/users.dat",
    sep="::",
    names=["user_id", "sex", "age_group", "occupation", "zip_code"],
)

ratings = pd.read_csv(
    "ml-1m/ratings.dat",
    sep="::",
    names=["user_id", "movie_id", "rating", "unix_timestamp"],
)

movies = pd.read_csv(
    "ml-1m/movies.dat", sep="::", names=["movie_id", "title", "genres"]
)

각 column의 타입에 대해 아래와 같이 간단한 전처리 작업을 수행합니다.

users["user_id"] = users["user_id"].apply(lambda x: f"user_{x}")
users["age_group"] = users["age_group"].apply(lambda x: f"group_{x}")
users["occupation"] = users["occupation"].apply(lambda x: f"occupation_{x}")

movies["movie_id"] = movies["movie_id"].apply(lambda x: f"movie_{x}")

ratings["movie_id"] = ratings["movie_id"].apply(lambda x: f"movie_{x}")
ratings["user_id"] = ratings["user_id"].apply(lambda x: f"user_{x}")
ratings["rating"] = ratings["rating"].apply(lambda x: float(x))

다양한 영화 장르를 특성으로 다루기 위해 영화 장르를 각 컬럼으로 분리하도록 하겠습니다.
(Animation|Children's|Comedy -> Animation, Children's, Comedy) = one-hot encoding과 같습니다.

genres = [
    "Action", "Adventure", "Animation",
    "Children's", "Comedy", "Crime",
    "Documentary", "Drama", "Fantasy", "Film-Noir",
    "Horror", "Musical", "Mystery", "Romance",
    "Sci-Fi", "Thriller", "War", "Western"
]

for genre in genres:
    movies[genre] = movies["genres"].apply(
        lambda values: int(genre in values.split("|"))
    )

Transform the movie ratings data into sequences

unix_timestamp 특성을 기준으로 정렬한 뒤, user_id로 그룹핑하면, 각 유저별로 특정 timestamprating을 매긴 movie_ids를 얻을 수 있습니다.

ratings_group = ratings.sort_values(by=["unix_timestamp"]).groupby("user_id")

ratings_data = pd.DataFrame(
    data={
        "user_id": list(ratings_group.groups.keys()),
        "movie_ids": list(ratings_group.movie_id.apply(list)),
        "ratings": list(ratings_group.rating.apply(list)),
        "timestamps": list(ratings_group.unix_timestamp.apply(list)),
    }
)

모델 입력을 위해 시퀀스를 생성해야 합니다. movie_ids, ratings에 대해 고정 길이 시퀀스를 생성하는 작업을 수행합니다. sequence_length 변수에 따라 model input sequence의 길이가 달라집니다. 또한, step_size 변수를 통해 각 유저마다 보유하고 있는 시퀀스를 제어할 수 있습니다.

sequence_length = 4
step_size = 2

# [4, 4, 5, 5, 6, 6]이라면, 아래 함수를 통해
# [[4, 4, 5, 5], [5, 5, 6, 6]]으로 다시 만들어집니다.
def create_sequences(values, window_size, step_size):
    sequences = []
    start_index = 0
    while True:
        end_index = start_index + window_size
        seq = values[start_index:end_index]
        # 뒤에 남은 데이터가 window_size보다 작으면 중지합니다.
        if len(seq) < window_size:
            seq = values[-window_size:]
            # window_size와 같으면 list에 붙여줍니다.
            if len(seq) == window_size:
                sequences.append(seq)
            break
        sequences.append(seq)
        start_index += step_size
    return sequences


ratings_data.movie_ids = ratings_data.movie_ids.apply(
    lambda ids: create_sequences(ids, sequence_length, step_size)
)

ratings_data.ratings = ratings_data.ratings.apply(
    lambda ids: create_sequences(ids, sequence_length, step_size)
)

del ratings_data["timestamps"]

다음으로 유저 특성을 ratings data에 포함시킵니다. 뿐만 아니라 데이터를 시퀀스 형태로 변형합니다.

# list로 되어있는 movie_ids를 전부 펼칩니다.
ratings_data_movies = ratings_data[["user_id", "movie_ids"]].explode(
    "movie_ids", ignore_index=True
)

# ratings도 펼칩니다.
ratings_data_rating = ratings_data[["ratings"]].explode("ratings", ignore_index=True)

# movie_ids, ratings를 합칩니다.
ratings_data_transformed = pd.concat([ratings_data_movies, ratings_data_rating], axis=1)

print(ratings_data_transformed)

# user_id를 바탕으로 유저 특성을 합칩니다.
ratings_data_transformed = ratings_data_transformed.join(
    users.set_index("user_id"), on="user_id"
)

# [a, b, c, d] -> a, b, c, d
ratings_data_transformed.movie_ids = ratings_data_transformed.movie_ids.apply(
    lambda x: ",".join(x)
)

# [a, b, c, d] -> a, b, c, d
ratings_data_transformed.ratings = ratings_data_transformed.ratings.apply(
    lambda x: ",".join([str(v) for v in x])
)

del ratings_data_transformed["zip_code"]

ratings_data_transformed.rename(
    columns={"movie_ids": "sequence_movie_ids", "ratings": "sequence_ratings"},
    inplace=True,
)

sequence_length 4, step_size 2로 작업을 수행하면 총 498,623개 데이터를 만들 수 있습니다.

마지막으로 training/testing 데이터를 85%/15% 비율로 분리하고 csv로 저장합니다.


Define metadata

CSV_HEADER = list(ratings_data_transformed.columns)

CATEGORICAL_FEATURES_WITH_VOCABULARY = {
    "user_id": list(users.user_id.unique()),
    "movie_id": list(movies.movie_id.unique()),
    "sex": list(users.sex.unique()),
    "age_group": list(users.age_group.unique()),
    "occupation": list(users.occupation.unique()),
}

USER_FEATURES = ["sex", "age_group", "occupation"]

MOVIE_FEATURES = ["genres"]

Create tf.data.Dataset for training and evaluation

def get_dataset_from_csv(csv_file_path, shuffle=False, batch_size=128):
    def process(features):
        movie_ids_string = features["sequence_movie_ids"]
        # to_tensor() 변환을 해주지 않으면, RaggedTensor로 반환되기 때문에
        # 후에 indexing이 되지 않아 에러가 납니다.
        sequence_movie_ids = tf.strings.split(movie_ids_string, ",").to_tensor()

        # 마지막 movid_id를 target 값으로 사용합니다.
        features["target_movie_id"] = sequence_movie_ids[:, -1]
        features["sequence_movie_ids"] = sequence_movie_ids[:, :-1]

        ratings_string = features["sequence_ratings"]
        sequence_ratings = tf.strings.to_number(
            tf.strings.split(ratings_string, ","), tf.dtypes.float32
        ).to_tensor()

        # 마지막 rating을 target 값으로 사용합니다.
        target = sequence_ratings[:, -1]
        features["sequence_ratings"] = sequence_ratings[:, :-1]

        return features, target

    dataset = tf.data.experimental.make_csv_dataset(
        csv_file_path,
        batch_size=batch_size,
        column_names=CSV_HEADER,
        num_epochs=1,
        header=False,
        field_delim="|",
        shuffle=shuffle,
    ).map(process)

    return dataset

Create model inputs

def create_model_inputs():
    return {
        "user_id": layers.Input(name="user_id", shape=(1,), dtype=tf.string),
        "sequence_movie_ids": layers.Input(
            name="sequence_movie_ids", shape=(sequence_length - 1,), dtype=tf.string
        ),
        "target_movie_id": layers.Input(
            name="target_movie_id", shape=(1,), dtype=tf.string
        ),
        "sequence_ratings": layers.Input(
            name="sequence_ratings", shape=(sequence_length - 1,), dtype=tf.float32
        ),
        "sex": layers.Input(name="sex", shape=(1,), dtype=tf.string),
        "age_group": layers.Input(name="age_group", shape=(1,), dtype=tf.string),
        "occupation": layers.Input(name="occupation", shape=(1,), dtype=tf.string),
    }

Encode input features

encode_input_features 함수는 다음 기능을 가지고 있습니다.

  1. Categorical 특성은 vocabulary size(여기선 예를 들면 movie_id의 unique 갯수)^1/2만큼의 output_dim과 vocabulary size input_dim 크기를 가지는 layers.Embedding을 사용하여 인코딩합니다.
    --> embedding_encoder
  2. movie sequence에 포함되어 있는 영화와 target movie는 영화 갯수^1/2만큼의 output_dim과 movie_vocabulary size input_dim 크기를 가지는 layers.Embedding으로 인코딩합니다.
    --> movie_embedding_encoder
  3. movie feature에 포함되는 genre 특성은 임베딩을 통과시킨뒤, movie_embedding에 이어붙여 layers.Dense를 다시 통과시킵니다.
    --> movie_embedding_processor
  4. 그 다음, movie_embedding에 만들어진 position 임베딩을 더하고, rating을 곱합니다.(Positional Encoding 부분)
    --> encoded_sequence_movies_with_position_and_rating
  5. movie_embedding과 target_movie_embedding을 이어붙이고, 이렇게 만들어진 결과는 최종적으로 트랜스포머 attentioon layer의 입력으로 사용하기 위한 [batch size, sequence length, embedding size] 형태를 가집니다.
    --> encoded_transformer_features
  6. 이렇게 최종적으로 encoded_transformer_features, encoded_other_features 두 가지를 반환합니다.
def encode_input_features(
    inputs,
    include_user_id=True,
    include_user_features=True,
    include_movie_features=True,
):

    encoded_transformer_features = []

    # other_feature_names에 존재하는 특성들의 임베딩 표현을 담고 있습니다.
    encoded_other_features = []

    # 여기서는
    # ['user_id', 'sex', 'age_group', 'occupation']이 됩니다.
    other_feature_names = []
    if include_user_id:
        other_feature_names.append("user_id")
    if include_user_features:
        other_feature_names.extend(USER_FEATURES)

    ## 유저 정보를 인코딩합니다.
    for feature_name in other_feature_names:
        # string으로 된 값들을 전부 index 형식으로 변환합니다.
        # vocabulary가 만들어지면, input[feature_name]이 만들어진 사전에 따라
        # 해당 문자가 부여받은 index로 변환됩니다.
        vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]
        idx = StringLookup(vocabulary=vocabulary, mask_token=None, num_oov_indices=0)(
            inputs[feature_name]
        )
        # 임베딩 차원을 만듭니다.
        embedding_dims = int(math.sqrt(len(vocabulary)))
        # 임베딩을 만듭니다.
        embedding_encoder = layers.Embedding(
            input_dim=len(vocabulary),
            output_dim=embedding_dims,
            name=f"{feature_name}_embedding",
        )
        # feature에 들어있는 각 값들을 임베딩 표현으로 변환합니다.
        encoded_other_features.append(embedding_encoder(idx))

    # 위에서 만들어진 임베딩 표현을 하나로 만들어줍니다.
    # 여러 개면 concatenate를 사용해서 펼쳐줍니다.
    if len(encoded_other_features) > 1:
        encoded_other_features = layers.concatenate(encoded_other_features)
    elif len(encoded_other_features) == 1:
        encoded_other_features = encoded_other_features[0]
    else:
        encoded_other_features = None

    # movie_id를 가져와서 StringLookup으로 index화 시켜줍니다.
    movie_vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY["movie_id"]
    movie_embedding_dims = int(math.sqrt(len(movie_vocabulary)))
    movie_index_lookup = StringLookup(
        vocabulary=movie_vocabulary,
        mask_token=None, num_oov_indices=0,
        name="movie_index_lookup")
    
    # movie_id로 만든 영화 임베딩입니다.
    movie_embedding_encoder = layers.Embedding(
        input_dim=len(movie_vocabulary),
        output_dim=movie_embedding_dims,
        name=f"movie_embedding")

    # genre 임베딩을 만듭니다.
    genre_vectors = movies[genres].to_numpy()
    movie_genres_lookup = layers.Embedding(
        input_dim=genre_vectors.shape[0],
        output_dim=genre_vectors.shape[1],
        embeddings_initializer=tf.keras.initializers.Constant(genre_vectors),
        trainable=False,
        name="genres_vector")
    
    # genre를 임베딩한 후, 통과시킬 Dense layer를 선언합니다.
    movie_embedding_processor = layers.Dense(
        units=movie_embedding_dims,
        activation="relu",
        name="process_movie_embedding_with_genres")

    ## Define a function to encode a given movie id.
    def encode_movie(movie_id):
        # 해당 movie_id의 index를 얻습니다.
        movie_idx = movie_index_lookup(movie_id)
        # movie 임베딩에 통과시켜 index의 임베딩 표현을 얻습니다.
        movie_embedding = movie_embedding_encoder(movie_idx)
        encoded_movie = movie_embedding
        # 영화 특성을 포함시킬거라면,(예를 들어, 장르 등)
        if include_movie_features:
            # movie_id의 index를 genre 임베딩에 통과시킵니다.
            # 그럼 어떤 값을 얻을텐데,
            movie_genres_vector = movie_genres_lookup(movie_idx)
            # 그 값을 movie_embedding과 concat한 후, Dense layer에 통과시킵니다.
            encoded_movie = movie_embedding_processor(
                layers.concatenate([movie_embedding, movie_genres_vector])
            )
        # 최종적으로 인코딩된 movie_id와 genre 특성을 합한 값 encoded_movie를 얻습니다.
        return encoded_movie

    ## target movie id를 인코딩합니다.
    target_movie_id = inputs["target_movie_id"]
    encoded_target_movie = encode_movie(target_movie_id)

    ## 시퀀스 형태로 이뤄진 movie_id를 인코딩합니다.
    sequence_movies_ids = inputs["sequence_movie_ids"]
    encoded_sequence_movies = encode_movie(sequence_movies_ids)

    # position 처리를 위한 임베딩을 만듭니다.
    position_embedding_encoder = layers.Embedding(
        input_dim=sequence_length,
        output_dim=movie_embedding_dims,
        name="position_embedding")
    
    # 단순 포지션 번호를 만들고, 임베딩에 통과시켜 임베딩 표현을 얻습니다.
    # positions가 [0, 1, 2], 즉 길이가 3이고, movie_embedding_dims가 62라면,
    positions = tf.range(start=0, limit=sequence_length - 1, delta=1)
    # 아래 임베딩 통과 후, (None, 3, 62) 형태를 얻습니다.
    encodded_positions = position_embedding_encoder(positions)
    # 인코딩된 영화와 평점을 multiply하기 위한 형태로 바꿔줍니다.
    # (None, 3, 1) 형태로 Multiply가 가능합니다.
    sequence_ratings = tf.expand_dims(inputs["sequence_ratings"], -1)
    # 인코딩된 영화 값과 포지션 값을 더해주고, 평점과 곱해줍니다.
    # 위 형태라면 최종적으로 (None, 3, 62) 형태가 됩니다.
    encoded_sequence_movies_with_poistion_and_rating = layers.Multiply()(
        [(encoded_sequence_movies + encodded_positions), sequence_ratings]
    )

    # 위 형태를 그대로 가져와서 설명하면,
    # (1, 62)로 tf.unstack을 통해 찢고, encoded_transformer_features에 넣어줍니다.
    for encoded_movie in tf.unstack(
        encoded_sequence_movies_with_poistion_and_rating, axis=1
    ):
        encoded_transformer_features.append(tf.expand_dims(encoded_movie, 1))
    # target_movie랑 기존 movie랑 합쳐줍니다.
    encoded_transformer_features.append(encoded_target_movie)

    # 각각 따로있는 상태이므로 concat을 통해, (None, 4, 62)로 변환합니다.
    encoded_transformer_features = layers.concatenate(encoded_transformer_features, axis=1)

    return encoded_transformer_features, encoded_other_features

Create a BST model

include_user_id = False
include_user_features = False
include_movie_features = False

hidden_units = [256, 128]
dropout_rate = 0.1
num_heads = 3


def create_model():
    inputs = create_model_inputs()
    transformer_features, other_features = encode_input_features(
        inputs, include_user_id, include_user_features, include_movie_features
    )

    # MultiHead Attention 사용
    attention_output = layers.MultiHeadAttention(
        num_heads=num_heads, key_dim=transformer_features.shape[2], dropout=dropout_rate
    )(transformer_features, transformer_features)

    # Transformer block.
    attention_output = layers.Dropout(dropout_rate)(attention_output)
    x1 = layers.Add()([transformer_features, attention_output])
    x1 = layers.LayerNormalization()(x1)
    x2 = layers.LeakyReLU()(x1)
    x2 = layers.Dense(units=x2.shape[-1])(x2)
    x2 = layers.Dropout(dropout_rate)(x2)
    transformer_features = layers.Add()([x1, x2])
    transformer_features = layers.LayerNormalization()(transformer_features)
    features = layers.Flatten()(transformer_features)

    # 임베딩 표현으로 이루어진 other_features를 포함합니다.
    # Concat 시에는 shape[-1]을 통해 Flatten 후에 Concat합니다.
    if other_features is not None:
        features = layers.concatenate(
            [features, layers.Reshape([other_features.shape[-1]])(other_features)]
        )

    # Fully-connected layers, Linear 부분입니다.
    for num_units in hidden_units:
        features = layers.Dense(num_units)(features)
        features = layers.BatchNormalization()(features)
        features = layers.LeakyReLU()(features)
        features = layers.Dropout(dropout_rate)(features)

    outputs = layers.Dense(units=1)(features)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model


model = create_model()

# 모델 컴파일
model.compile(
    optimizer=keras.optimizers.Adagrad(learning_rate=0.01),
    loss=keras.losses.MeanSquaredError(),
    metrics=[keras.metrics.MeanAbsoluteError()],
)

# 학습 데이터
train_dataset = get_dataset_from_csv("train_data.csv", shuffle=True, batch_size=265)

# 학습
model.fit(train_dataset, epochs=5)

# 테스트 데이터
test_dataset = get_dataset_from_csv("test_data.csv", batch_size=265)

# 평가
_, rmse = model.evaluate(test_dataset, verbose=0)
print(f"Test MAE: {round(rmse, 3)}")

대략 0.7 MAE를 얻을 수 있습니다.


Conclusion

BST 모델은 시퀀스 형태의 유저 행동을 통해 추천할 때 필요한 연속적인 신호를 잡아내는 Transformer 구조를 사용합니다.

sequence length, epoch을 높이는 등 다양한 configuration으로 학습을 시도해볼 수 있습니다. 또, 'sex * genre'와 같은 병합 특성이나 고객 주소, 영화 개봉년도와 같은 다른 특성을 포함해볼 수도 있습니다.

이 글은 다음 예제를 번역한 것입니다.


Introduction

이번 예제는 CSV 파일에 포함되어 있는 구조 데이터(structured data)를 활용하여 분류 문제를 해결합니다. 데이터는 numerical과 categorical 특성들로 구성되어 있습니다. 우리는 Keras Preprocessing Layer를 사용해서 numerical 특성을 정규화하고 categorical 특성을 벡터화할 것입니다.

주의! 이번 예제는 TensorFlow 2.3 이상의 버전에서 실행해야 합니다.

The Dataset

사용할 데이터셋은 Cleveland Clinic Foundation에서 제공한 심장병 관련 데이터셋입니다. 환자 정보를 포함하고 있는 303개의 데이터(sample)로 이루어져 있으며, 각 열에서 해당 속성(feature)을 살펴볼 수 있습니다. 이를 활용하여 심장병의 여부(binary classification)를 예측해 볼 것입니다.

Age Age in years Numerical
Sex (1 = male; 0 = female) Categorical
CP Chest pain type (0, 1, 2, 3, 4) Categorical
Trestbpd Resting blood pressure (in mm Hg on admission) Numerical
Chol Serum cholesterol in mg/dl Numerical
FBS fasting blood sugar in 120 mg/dl (1 = true; 0 = false) Categorical
RestECG Resting electrocardiogram results (0, 1, 2) Categorical
Thalach Maximum heart rate achieved Numerical
Exang Exercise induced angina (1 = yes; 0 = no) Categorical
Oldpeak ST depression induced by exercise relative to rest Numerical
Slope Slope of the peak exercise ST segment Numerical
CA Number of major vessels (0-3) colored by fluoroscopy Both numerical & categorical
Thal 3 = normal; 6 = fixed defect; 7 = reversible defect Categorical
Target Diagnosis of heart disease (1 = true; 0 = false) Target

 

Setup

import tensorflow as tf
import numpy as np
import pandas as pd
from tensorflow import keras
from tensorflow.keras import layers

Preparing the data

데이터를 다운로드받습니다.

file_url = "http://storage.googleapis.com/download.tensorflow.org/data/heart.csv"
dataframe = pd.read_csv(file_url)

303개 데이터와 14개 column(13 feature + 1 target label)으로 이루어져 있습니다.

dataframe.shape
# (303, 14)

head() 함수로 간단히 살펴봅니다.

dataframe.head()

마지막 속성은 "target"으로 '1'이면 심장병, '0'이면 심장병이 아닌 환자를 의미합니다.

training, validation set으로 나누겠습니다.

# 20% 샘플을 뽑고,
val_dataframe = dataframe.sample(frac=0.2, random_state=1337)
# 위에서 뽑힌 샘플을 제거해서 훈련 데이터셋으로 활용합니다.
train_dataframe = dataframe.drop(val_dataframe.index)

print(
    "Using %d samples for training and %d for validation"
    % (len(train_dataframe), len(val_dataframe))
)

tf.data.Dataset 객체를 만들어줍니다.

def dataframe_to_dataset(dataframe):
    dataframe = dataframe.copy()
    labels = dataframe.pop("target")
    ds = tf.data.Dataset.from_tensor_slices((dict(dataframe), labels))
    ds = ds.shuffle(buffer_size=len(dataframe))
    
    return ds


train_ds = dataframe_to_dataset(train_dataframe)
val_ds = dataframe_to_dataset(val_dataframe)

각 Dataset은 (input, target)을 반환합니다.

  • input: dictionary of feature
  • target: 1 or 0 value
for x, y in train_ds.take(1):
    print("Input:", x)
    print("Target:", y)
    
# Input: {'age': <tf.Tensor: shape=(), dtype=int64, numpy=60>, 'sex': <tf.Tensor: shape=(), dtype=int64, numpy=1>, 'cp': <tf.Tensor: shape=(), dtype=int64, numpy=4>, 'trestbps': <tf.Tensor: shape=(), dtype=int64, numpy=117>, 'chol': <tf.Tensor: shape=(), dtype=int64, numpy=230>, 'fbs': <tf.Tensor: shape=(), dtype=int64, numpy=1>, 'restecg': <tf.Tensor: shape=(), dtype=int64, numpy=0>, 'thalach': <tf.Tensor: shape=(), dtype=int64, numpy=160>, 'exang': <tf.Tensor: shape=(), dtype=int64, numpy=1>, 'oldpeak': <tf.Tensor: shape=(), dtype=float64, numpy=1.4>, 'slope': <tf.Tensor: shape=(), dtype=int64, numpy=1>, 'ca': <tf.Tensor: shape=(), dtype=int64, numpy=2>, 'thal': <tf.Tensor: shape=(), dtype=string, numpy=b'reversible'>}
# Target: tf.Tensor(1, shape=(), dtype=int64)

Dataset에 배치 크기를 설정합니다.

train_ds = train_ds.batch(32)
val_ds = val_ds.batch(32)

 

Feature preprocessing with Keras layers

정수형태로 인코딩되어 있는 categorical feature은 다음과 같습니다.

  • sex, cp, fbs, restecg, exang, ca

CategoryEncoding() layer를 사용해서 one-hot encoding 형태로 바꿔줄 것입니다. 또, thal은 문자형태로 인코딩되어 있는 categorical feature입니다. 먼저 StringLookup() layer를 사용해서 해당 속성이 가지고 있는 값들을 전부 index 형태로 변환하고, CategoryEncoding() layer를 사용해서 one-hot 인코딩을 동일하게 진행해줄 것입니다.

마지막으로, 연속적인 정수 형태인 feature는 다음과 같습니다.

  • age, trestbps, chol, thalach, oldpeak, slope

이들에게는 Normalization() layer를 사용해서 평균 0, 표준 편차를 1로 만들어 줄 것입니다.

이 작업을 수행하기 위해 세 가지 유틸리티 함수를 정의해서 사용합니다.

  • encode_numerical_feature: Normalization을 적용하기 위한 함수
  • encode_string_categorical_feature: 문자 형태로 된 열에 대해 one-hot encoding을 적용
  • encode_integer_categorical_feature: 숫자 형태로 된 열에 대해 one-hot encoding을 적용
from tensorflow.keras.layers.experimental.preprocessing import Normalization
from tensorflow.keras.layers.experimental.preprocessing import CategoryEncoding
from tensorflow.keras.layers.experimental.preprocessing import StringLookup


def encode_numerical_feature(feature, name, dataset):
    # Normalization layer 정의
    normalizer = Normalization()

    # 적용할 열만 가져옵니다.
    feature_ds = dataset.map(lambda x, y: x[name])
    feature_ds = feature_ds.map(lambda x: tf.expand_dims(x, -1))

    # 데이터의 통계치 학습
    normalizer.adapt(feature_ds)

    # 정규화 진행
    encoded_feature = normalizer(feature)
    
    return encoded_feature


def encode_string_categorical_feature(feature, name, dataset):
    # 문자열로 되어 있는 값을 integer 형태로 변환하기 위한 StringLookup() 함수 정의
    index = StringLookup()
    # one-hot encoding 진행을 위한 함수 정의
    encoder = CategoryEncoding(output_mode="binary")
    
    # 적용할 열만 가져옵니다.
    feature_ds = dataset.map(lambda x, y: x[name])
    feature_ds = feature_ds.map(lambda x: tf.expand_dims(x, -1))
    
    # 전체 순서: index adapt -> index 형태로 변환 -> categorical encoding adapt -> categorical 형태로 변환
    
    # feature_ds가 반환하는 문자열 데이터가 어떤 것이 있는지 확인합니다.
    # 문자열로 된 값과, integer index를 부여하기 위한 작업입니다.
    index.adapt(feature_ds)

    # (실제 반환 데이터) integer 형태로 변환합니다.
    encoded_feature = index(feature)
    
    # Dataset 객체가 index 변환 작업을 수행하도록 적용해줍니다.
    feature_ds = feature_ds.map(index)
    
    # feature_ds가 반환하는 index 데이터가 어떤 것이 있는지 확인합니다.
    encoder.adapt(feature_ds)

    # (실제 반환 데이터) one-hot encoding을 적용하고 return합니다.
    # 데이터셋 객체에 다시 map 해줄 필요는 없습니다.
    encoded_feature = encoder(encoded_feature)
    
    return encoded_feature

# 위와 동일
def encode_integer_categorical_feature(feature, name, dataset):
    # Create a CategoryEncoding for our integer indices
    encoder = CategoryEncoding(output_mode="binary")

    # Prepare a Dataset that only yields our feature
    feature_ds = dataset.map(lambda x, y: x[name])
    feature_ds = feature_ds.map(lambda x: tf.expand_dims(x, -1))

    # Learn the space of possible indices
    encoder.adapt(feature_ds)

    # Apply one-hot encoding to our indices
    encoded_feature = encoder(feature)
    return encoded_feature

 

Build a model

end-to-end 모델을 만들어봅니다.

# Numerical -> one-hot encoding
sex = keras.Input(shape=(1,), name="sex", dtype="int64")
cp = keras.Input(shape=(1,), name="cp", dtype="int64")
fbs = keras.Input(shape=(1,), name="fbs", dtype="int64")
restecg = keras.Input(shape=(1,), name="restecg", dtype="int64")
exang = keras.Input(shape=(1,), name="exang", dtype="int64")
ca = keras.Input(shape=(1,), name="ca", dtype="int64")

# String -> one-hot encoding
thal = keras.Input(shape=(1,), name="thal", dtype="string")

# Numerical features
age = keras.Input(shape=(1,), name="age")
trestbps = keras.Input(shape=(1,), name="trestbps")
chol = keras.Input(shape=(1,), name="chol")
thalach = keras.Input(shape=(1,), name="thalach")
oldpeak = keras.Input(shape=(1,), name="oldpeak")
slope = keras.Input(shape=(1,), name="slope")

all_inputs = [sex, cp, fbs,
                restecg, exang, ca,
                thal, age, trestbps,
                chol, thalach, oldpeak,slope]

# Integer categorical features
sex_encoded = encode_integer_categorical_feature(sex, "sex", train_ds)
cp_encoded = encode_integer_categorical_feature(cp, "cp", train_ds)
fbs_encoded = encode_integer_categorical_feature(fbs, "fbs", train_ds)
restecg_encoded = encode_integer_categorical_feature(restecg, "restecg", train_ds)
exang_encoded = encode_integer_categorical_feature(exang, "exang", train_ds)
ca_encoded = encode_integer_categorical_feature(ca, "ca", train_ds)

# String categorical features
thal_encoded = encode_string_categorical_feature(thal, "thal", train_ds)

# Numerical features
age_encoded = encode_numerical_feature(age, "age", train_ds)
trestbps_encoded = encode_numerical_feature(trestbps, "trestbps", train_ds)
chol_encoded = encode_numerical_feature(chol, "chol", train_ds)
thalach_encoded = encode_numerical_feature(thalach, "thalach", train_ds)
oldpeak_encoded = encode_numerical_feature(oldpeak, "oldpeak", train_ds)
slope_encoded = encode_numerical_feature(slope, "slope", train_ds)

all_features = layers.concatenate(
    [
        sex_encoded,
        cp_encoded,
        fbs_encoded,
        restecg_encoded,
        exang_encoded,
        slope_encoded,
        ca_encoded,
        thal_encoded,
        age_encoded,
        trestbps_encoded,
        chol_encoded,
        thalach_encoded,
        oldpeak_encoded,
    ]
)

x = layers.Dense(32, activation="relu")(all_features)
x = layers.Dropout(0.5)(x)

output = layers.Dense(1, activation="sigmoid")(x)

# 모델 생성
model = keras.Model(all_inputs, output)
model.compile("adam", "binary_crossentropy", metrics=["accuracy"])

 

Train the model

model.fit(train_ds, epochs=50, validation_data=val_ds)

약 82% 정확도를 얻을 수 있었습니다.

Inference on new data

model.predict()를 활용해서 새로운 환자의 심장병 여부를 확인해보죠.

  1. batch dimension을 까먹지 않도록 주의해야 합니다. 모델은 batch_dimension에서 과정에 수행됩니다.
  2. 새로운 샘플에서 값을 tensor 형태로 변환하기 위해 convert_to_tensor 활용
sample = {
    "age": 60,
    "sex": 1,
    "cp": 1,
    "trestbps": 145,
    "chol": 233,
    "fbs": 1,
    "restecg": 2,
    "thalach": 150,
    "exang": 0,
    "oldpeak": 2.3,
    "slope": 3,
    "ca": 0,
    "thal": "fixed",
}

input_dict = {name: tf.convert_to_tensor([value]) for name, value in sample.items()}
predictions = model.predict(input_dict)

print(
    "This particular patient had a %.1f percent probability "
    "of having a heart disease, as evaluated by our model." % (100 * predictions[0][0],)
)

# This particular patient had a 20.8 percent probability of having a heart disease, 
# as evaluated by our model.

 

이 글은 다음 예제를 번역한 것입니다.


Introduction

이 예제는 JPEG 파일을 직접 저장하고, 미리 학습된 모델을 사용하지 않으면서 이미지 분류를 어떻게 행하는지 알아보는 예제입니다. 'Cats vs Dogs' 데이터셋을 케라스를 활용하여 분류해봅니다.

데이터셋 생성을 위해 image_dataset_from_directory를 사용하고, 표준화와 augmentation을 위해 Keras Preprocessing Layer를 사용해봅니다.

Setup

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

 

Load the data: the Cats vs Dogs dataset

Raw data download

먼저, 765M 크기의 데이터를 다운로드합니다.
예제와 다르게 그냥 쥬피터 노트북 상에서 zip파일을 다운받고 싶으면, 아래 코드를 활용하면 됩니다.

import requests

target_path = 'datasets.zip'

url = 'https://download.microsoft.com/download/3/E/1/3E1C3F21-ECDB-4869-8368-6DEBA77B919F/kagglecatsanddogs_3367a.zip'
response = requests.get(url, stream = True)
handle = open(target_path, 'wb')

for chunk in response.iter_content(chunk_size = 512):
    if chunk:
        handle.write(chunk)
        
handle.close()   

압축을 풀었다면, 최상위 폴더에는 Cat, Dog 하위 폴더가 존재합니다.

Filter out corrupted images

손상된 이미지는 어디에나 존재합니다. 여기서는 "JFIF" 헤더를 가진 이미지를 필터링해보겠습니다.
필터링 시, 해당 헤더가 아니면 손상된 이미지이므로 제거합니다.

import os

num_skipped = 0
for folder_name in ("Cat", "Dog"):
    folder_path = os.path.join("PetImages", folder_name)
    for fname in os.listdir(folder_path):
        fpath = os.path.join(folder_path, fname) # PetImages/Cat/63.jpg
        try:
            fobj = open(fpath, "rb")
            # fobj.peek(10)은 10바이트를 가져오는데, 여기서는 보통
            # 헤더를 읽기 위해 전체를 가져온다고 생각해도 무방합니다.
            is_jfif = tf.compat.as_bytes("JFIF") in fobj.peek(10)
        finally:
            fobj.close()

        if not is_jfif:
            num_skipped += 1
            # Delete corrupted image
            os.remove(fpath)

print("Deleted %d images" % num_skipped)

 

Generate a Dataset

image_size = (180, 180)
batch_size = 32

train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    "PetImages",
    validation_split=0.2,
    subset="training",
    seed=1337,
    image_size=image_size,
    batch_size=batch_size,
)
val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    "PetImages",
    validation_split=0.2,
    subset="validation",
    seed=1337,
    image_size=image_size,
    batch_size=batch_size,
)

Visualize the data

9개 이미지를 그려봅니다. '1'은 Dog이고, '0'은 Cat에 해당합니다.

import matplotlib.pyplot as plt

# figure 크기를 조절합니다.
plt.figure(figsize=(10, 10))

# 배치 하나를 가져옵니다.
for images, labels in train_ds.take(1):
    for i in range(9):
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"))
        plt.title(int(labels[i]))
        plt.axis("off")

Using Image data augmentation

데이터가 다양하지 않거나, 데이터 개수가 충분하지 않을때 랜덤으로 회전시키거나 fliping하는 등 augmentation을 활용하면 이를 해결할 수 있습니다. 이는 모델이 다양한 데이터를 볼 수 있게 도와줌과 동시에 이를 통해 Overfitting을 (어느정도) 극복할 수 있습니다.

data_augmentation = keras.Sequential(
    [
        layers.experimental.preprocessing.RandomFlip("horizontal"),
        layers.experimental.preprocessing.RandomRotation(0.1),
    ]
)

data_augmentation을 통해 변형된 이미지를 그려봅니다.

plt.figure(figsize=(10, 10))
for images, _ in train_ds.take(1):
    for i in range(9):
        augmented_images = data_augmentation(images)
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(augmented_images[0].numpy().astype("uint8"))
        plt.axis("off")

 

Standardizing the data

현재 우리가 사용할 이미지는 픽셀 type float32, 크기는 180x180으로 이미 어느정도 표준화가 되어 있습니다. 하지만 RGB channel의 값들이 [0, 255] 값을 가지고 있어, 신경망을 사용하기엔 적합하지 않습니다. 따라서 Rescaling layer를 활용해서 [0, 1] 범위로 변환해줄 것입니다.

 

Two options to preprocess the data

augmentation을 적용할 수 있는 두 가지 방법이 있습니다.

>> Option 1: Make it part of the model like this:

inputs = keras.Input(shape=input_shape)
x = data_augmentation(inputs)
x = layers.experimental.preprocessing.Rescaling(1./255)(x)
...  # Rest of the model

이 방법은 모델 실행에서 동시에 사용될 수 있기 때문에 GPU를 사용한다는 큰 장점을 활용할 수 있습니다.

이 방법은 test time에는 동작하지 않기 때문에, evaluate(), predict()에서는 활용되지 않고, 오로지 fit() 동안에만 사용됩니다. 또한, GPU를 보유하고 있다면 이 방법을 긍정적으로 고려해볼 수 있습니다.

>> Option 2: apply it to the dataset, so as to obtain a dataset that yields batches of augmented images, like this:

augmented_train_ds = train_ds.map(
  lambda x, y: (data_augmentation(x, training=True), y))

이 방법은 CPU에서 주로 사용되고, 버퍼를 통해 모델에 입력됩니다.

CPU를 활용하여 학습하는 경우라면 이 방법이 더 효과적입니다.

 

Configure the dataset for performance

prefetch 옵션을 활용하면 더욱 효율적으로 데이터를 생성하여 모델에 입력할 수 있습니다.

train_ds = train_ds.prefetch(buffer_size=32)
val_ds = val_ds.prefetch(buffer_size=32)

 

Build a model

Xception의 small version을 작성해보겠습니다. 이 모델에 대해서는 최적화를 진행하지 않을 것이고, 만약 최적화에 관심이 있다면 Keras Tuner를 고려해보세요.

강조 사항:

  • data_augmentation을 사용하고, 추가로 Rescaling layer를 활용합니다.
  • 마지막 classification layer에 Dropout을 추가합니다.
def make_model(input_shape, num_classes):
    inputs = keras.Input(shape=input_shape)
    # 본격적으로 모델에 입력하기 전에 여기서 augmentation이 진행됩니다.
    # inference time에는 동작하지 않습니다.
    x = data_augmentation(inputs)

    # [0, 1] 변환을 위해 Rescaling Layer를 활용합니다.
    x = layers.experimental.preprocessing.Rescaling(1.0 / 255)(x)
    x = layers.Conv2D(32, 3, strides=2, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    x = layers.Conv2D(64, 3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    previous_block_activation = x  # Set aside residual

    for size in [128, 256, 512, 728]:
        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(size, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(size, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.MaxPooling2D(3, strides=2, padding="same")(x)

        # Project residual
        residual = layers.Conv2D(size, 1, strides=2, padding="same")(
            previous_block_activation)
        x = layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    x = layers.SeparableConv2D(1024, 3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    x = layers.GlobalAveragePooling2D()(x)
    if num_classes == 2:
        activation = "sigmoid"
        units = 1
    else:
        activation = "softmax"
        units = num_classes

    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(units, activation=activation)(x)
    
    return keras.Model(inputs, outputs)

# image size에 (3, ) 채널을 추가합니다.
model = make_model(input_shape=image_size + (3,), num_classes=2)

 

Train the model

epochs = 50

callbacks = [
    keras.callbacks.ModelCheckpoint("save_at_{epoch}.h5"),
]
model.compile(optimizer=keras.optimizers.Adam(1e-3),
              loss="binary_crossentropy",
              metrics=["accuracy"])
model.fit(train_ds, epochs=epochs, 
          callbacks=callbacks, validation_data=val_ds)

학습을 진행해보세요.

 

Run inference on new data

inference time에는 Dropout과 data augmentation이 비활성화 상태가 됩니다.
하지만 Rescaling layer는 그대로 활용되기 때문에 테스트용 데이터로 추론을 진행할 때 올바른 결과를 얻을 수 있습니다.

img = keras.preprocessing.image.load_img("PetImages/Cat/6779.jpg", 
                                         target_size=image_size)
img_array = keras.preprocessing.image.img_to_array(img)
img_array = tf.expand_dims(img_array, 0)  # Create batch axis

predictions = model.predict(img_array)
score = predictions[0]
print(
    "This image is %.2f percent cat and %.2f percent dog."
    % (100 * (1 - score), 100 * score)
)