이 글은 다음 Keras Example을 번역합니다.

https://keras.io/examples/timeseries/timeseries_traffic_forecasting/

 

Keras documentation: Traffic forecasting using graph neural networks and LSTM

Traffic forecasting using graph neural networks and LSTM Author: Arash Khodadadi Date created: 2021/12/28 Last modified: 2021/12/28 Description: This example demonstrates how to do timeseries forecasting over graphs. View in Colab • GitHub source Introdu

keras.io


Introduction

이 예제는 GNN과 LSTM을 활용하여 교통 상태를 예측해봅니다. 특히, 도로 구역별 체증(traffic speed) 히스토리를 활용해 향후 체증을 예측합니다.

문제를 해결하기 위해 사용되는 주요한 방법은 도로 구역별 체증 상태를 시계열 형태로 두고, 과거 상태를 이용해 미래를 예측하는 것입니다.

하지만 단순 시게열 방법은 인접 도로간 체증 상태를 활용하지 못합니다. 따라서 이웃한(인접한) 도로 체증의 복잡한 연관관계를 활용할 수 있도록 graph로 표현되는 traffic network를 구성하고, 그래프로 표현되는 체증을 활용합니다. 이 예제는 그래프로 이루어진 시계열 데이터를 입력받을 수 있는 모델을 구성합니다. 첫 번째로 어떻게 데이터를 가공하는지 살펴보고, 그래프를 예측하기 위해 tf.data.Dataset을 만듭니다. 그 후, GCN(Graph Convolution Network)와 LSTM으로 구성된 모델을 구현합니다.

데이터 전처리와 모델 구조는 다음 논문을 참조합니다.

Yu, Bing, Haoteng Yin, and Zhanxing Zhu. "Spatio-temporal graph convolutional networks: a deep learning framework for traffic forecasting." Proceedings of the 27th International Joint Conference on Artificial Intelligence, 2018. (github) 


Setup

import pandas as pd
import numpy as np
import os
import typing
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

Data preparation

Data description

실세계 교통 체증 데이터셋 PeMSD7을 활용합니다. 이 데이터셋은 Yu et al., 2018에서 수집 및 준비되었고, 여기서 활용할 수 있습니다. 데이터셋에 대한 구체적인 설명은 논문을 참조하세요.

이 데이터셋은 두 개의 파일로 구성되어 있습니다.

  • W_228.csv: 캘리포니아 7구역의 228개 거리
  • V_228.csv: 2012년 5, 6월 평일의 교통 체증 데이터

Loading Data

url = "https://github.com/VeritasYin/STGCN_IJCAI-18/raw/master/data_loader/PeMS-M.zip"
data_dir = keras.utils.get_file(origin=url, extract=True, archive_format="zip")
data_dir = data_dir.rstrip(".zip")

route_distances = pd.read_csv(
    os.path.join(data_dir, "W_228.csv"), header=None
).to_numpy()
speeds_array = pd.read_csv(os.path.join(data_dir, "V_228.csv"), header=None).to_numpy()

print(f"route_distances shape={route_distances.shape}") # (228, 228)
print(f"speeds_array shape={speeds_array.shape}") # (12672, 228)

sub-sampling roads

크기 문제를 감소시키고, 학습 속도를 증가시키기 위해 228개 도로 중 26개 도로만 사용합니다. 0번 도로부터 시작해 총 25개 도로를 선택합니다. sample_routes에 포함되어 있는 도로들은 각 도로별 상관성을 가지고 있습니다.

sample_routes = [
    0, 1, 4,
    7, 8, 11,
    15, 108, 109,
    114, 115, 118,
    120, 123, 124,
    126, 127, 129,
    130, 132, 133,
    136, 139, 144,
    147, 216,
]
route_distances = route_distances[np.ix_(sample_routes, sample_routes)]
speeds_array = speeds_array[:, sample_routes]

print(f"route_distances shape={route_distances.shape}") # (26, 26)
print(f"speeds_array shape={speeds_array.shape}") # (12672, 26)

Data Visualization

두 개 도로를 선택하여 시각화해봅니다.

plt.figure(figsize=(18, 6))
plt.plot(speeds_array[:, [0, -1]])
plt.legend(["route_0", "route_25"])

각 도로별 상관관계도 시각화할 수 있습니다.

plt.figure(figsize=(8, 8))
plt.matshow(np.corrcoef(speeds_array.T), 0)
plt.xlabel("road number")
plt.ylabel("road number")

Splitting and normalizing data

train_size, val_size = 0.5, 0.2


def preprocess(data_array: np.ndarray, train_size: float, val_size: float):
    """Splits data into train/val/test sets and normalizes the data.

    Args:
        data_array: ndarray of shape `(num_time_steps, num_routes)`
        train_size: A float value between 0.0 and 1.0 that represent the proportion of the dataset
            to include in the train split.
        val_size: A float value between 0.0 and 1.0 that represent the proportion of the dataset
            to include in the validation split.

    Returns:
        `train_array`, `val_array`, `test_array`
    """

    # Split 수행
    num_time_steps = data_array.shape[0]
    num_train, num_val = (
        int(num_time_steps * train_size),
        int(num_time_steps * val_size),
    )
    train_array = data_array[:num_train]
    mean, std = train_array.mean(axis=0), train_array.std(axis=0)

    # Normalize 수행
    train_array = (train_array - mean) / std
    val_array = (data_array[num_train : (num_train + num_val)] - mean) / std
    test_array = (data_array[(num_train + num_val) :] - mean) / std

    return train_array, val_array, test_array


train_array, val_array, test_array = preprocess(speeds_array, train_size, val_size)

print(f"train set size: {train_array.shape}") # (6336, 26)
print(f"validation set size: {val_array.shape}") # (2534, 26)
print(f"test set size: {test_array.shape}") # (3802, 26)

Creating TensorFlow Datasets

예측 문제를 해결하기 위한 Dataset을 만듭니다. 이 예제에서 다룰 데이터는 t+1, t+2, ..., t+T 형태의 시퀀스를 활용해 t+T+1, ..., t+T+h 시점의 미래 체증을 예측합니다. t 시점의 입력값은 N 크기의 T 벡터이며, 예측(또는 타겟)값은 N 크기의 h 벡터입니다. 여기서 N은 도로 수 입니다. 

데이터셋을 만들기 위해 Keras에서 제공하는 timeseries_dataset_from_array() 함수를 활용합니다. 또, 아래의 create_tf_dataset() 함수는 numpy.ndarray을 입력받고, tf.data.Dataset을 반환합니다. 이 함수 안에서 input_sequence_lengthforcast_horizon은 각각 위의 Th를 의미합니다.

multi_horizon 인자를 추가적으로 설명합니다. forcast_horizon=3으로 가정하고, multi_horizon=True로 설정되어 있으면, 모델은 t+T+1, t+T+2, t+T+3 시점을 예측하고, Target 값의 형태는 (T, 3)이 됩니다. 반대로 multi_horizon=False로 설정되어 있으면, 마지막 시점인 t+T+3 시점만 예측하고 Target 값 형태는 (T, 1)이 됩니다.

여기서 사용되는 input tensor의 형태는 (batch_size, input_sequence_length, num_routes, 1)입니다. 마지막 차원은 모델 입력을 일반화하기 위해 사용했습니다.
예를 들어, 각 도로별 온도를 추가 특성으로서 활용하고 싶다면, 마지막 차원에서 각 도로(num_routes)는 speed, temperature 두 가지 특성을 가질 것이므로 (batch_size, input_sequence_length, num_routes, 2)가 됩니다. 하지만 이번 예제에서는 추가로 다루지 않으며, 항상 1을 유지합니다.

과거 12개 값을 활용해 미래 3개 값을 예측합니다.

from tensorflow.keras.preprocessing import timeseries_dataset_from_array

batch_size = 64
input_sequence_length = 12
forecast_horizon = 3
multi_horizon = False


def create_tf_dataset(
    data_array: np.ndarray,
    input_sequence_length: int,
    forecast_horizon: int,
    batch_size: int = 128,
    shuffle=True,
    multi_horizon=True,
):
    """Creates tensorflow dataset from numpy array.

    This function creates a dataset where each element is a tuple `(inputs, targets)`.
    `inputs` is a Tensor
    of shape `(batch_size, input_sequence_length, num_routes, 1)` containing
    the `input_sequence_length` past values of the timeseries for each node.
    `targets` is a Tensor of shape `(batch_size, forecast_horizon, num_routes)`
    containing the `forecast_horizon`
    future values of the timeseries for each node.

    Args:
        data_array: np.ndarray with shape `(num_time_steps, num_routes)`
        input_sequence_length: Length of the input sequence (in number of timesteps).
        forecast_horizon: If `multi_horizon=True`, the target will be the values of the timeseries for 1 to
            `forecast_horizon` timesteps ahead. If `multi_horizon=False`, the target will be the value of the
            timeseries `forecast_horizon` steps ahead (only one value).
        batch_size: Number of timeseries samples in each batch.
        shuffle: Whether to shuffle output samples, or instead draw them in chronological order.
        multi_horizon: See `forecast_horizon`.

    Returns:
        A tf.data.Dataset instance.
    """

    inputs = timeseries_dataset_from_array(
        np.expand_dims(data_array[:-forecast_horizon], axis=-1),
        None,
        sequence_length=input_sequence_length,
        shuffle=False,
        batch_size=batch_size,
    ) # 개별 입력 형태 = (64, 12, 26, 1)

    target_offset = (
        input_sequence_length
        if multi_horizon
        else input_sequence_length + forecast_horizon - 1
    )
    # multi_horizon이 True이면 forcast_horizon 크기 만큼 Target으로 활용
    target_seq_length = forecast_horizon if multi_horizon else 1
    targets = timeseries_dataset_from_array(
        data_array[target_offset:], # input_sequence_length 이후부터
        None,
        sequence_length=target_seq_length, # target_seq_length 크기만큼
        shuffle=False,
        batch_size=batch_size,
    ) # (64, 3, 26)

    dataset = tf.data.Dataset.zip((inputs, targets))
    if shuffle:
        dataset = dataset.shuffle(100)

    return dataset.prefetch(16).cache()


# (64, 12, 26, 1), (64, 3, 26)을 반환
train_dataset, val_dataset = (
    create_tf_dataset(data_array, input_sequence_length, forecast_horizon, batch_size)
    for data_array in [train_array, val_array]
)

test_dataset = create_tf_dataset(
    test_array,
    input_sequence_length,
    forecast_horizon,
    batch_size=test_array.shape[0],
    shuffle=False,
    multi_horizon=multi_horizon,
)

Roads Graph

PeMSD7 데이터셋은 도로 구역별 거리를 포함하고 있습니다. 이 거리를 활용하여 인접 매트릭스를 만듭니다. 논문에서 사용된 것처럼 두 도로의 거리가 threshold보다 낮다면 그래프의 edge가 존재하는 것으로 구성합니다.

def compute_adjacency_matrix(
    route_distances: np.ndarray, sigma2: float, epsilon: float
):
    """Computes the adjacency matrix from distances matrix.

    It uses the formula in https://github.com/VeritasYin/STGCN_IJCAI-18#data-preprocessing to
    compute an adjacency matrix from the distance matrix.
    The implementation follows that paper.

    Args:
        route_distances: np.ndarray of shape `(num_routes, num_routes)`. Entry `i,j` of this array is the
            distance between roads `i,j`.
        sigma2: Determines the width of the Gaussian kernel applied to the square distances matrix.
        epsilon: A threshold specifying if there is an edge between two nodes. Specifically, `A[i,j]=1`
            if `np.exp(-w2[i,j] / sigma2) >= epsilon` and `A[i,j]=0` otherwise, where `A` is the adjacency
            matrix and `w2=route_distances * route_distances`

    Returns:
        A boolean graph adjacency matrix.
    """
    num_routes = route_distances.shape[0]
    route_distances = route_distances / 10000.0
    w2, w_mask = (
        route_distances * route_distances,
        np.ones([num_routes, num_routes]) - np.identity(num_routes),
    )
    return (np.exp(-w2 / sigma2) >= epsilon) * w_mask

compute_adjacency_matrix() 함수는 boolean 인접 매트릭스를 반환합니다. 이 매트릭스에서 1은 두 node간 edge가 존재, 0은 존재하지 않음을 의미합니다.

class GraphInfo:
    def __init__(self, edges: typing.Tuple[list, list], num_nodes: int):
        self.edges = edges
        self.num_nodes = num_nodes


sigma2 = 0.1
epsilon = 0.5
adjacency_matrix = compute_adjacency_matrix(route_distances, sigma2, epsilon)
node_indices, neighbor_indices = np.where(adjacency_matrix == 1)
graph = GraphInfo(
    edges=(node_indices.tolist(), neighbor_indices.tolist()),
    num_nodes=adjacency_matrix.shape[0],
)
# 26, 150
print(f"number of nodes: {graph.num_nodes}, number of edges: {len(graph.edges[0])}")

Network architecture

모델은 그래프를 예측하기 위해 Graph Convolution Layer와 LSTM Layer로 구성됩니다.

Graph convolution layer

여기서 사용되는 Graph convolution layer 구조는 이 예제와 비슷합니다. 다른 점은 예제에서는 (num_nodes, in_feat) 2D tensor를 입력하지만, 이번 예제에서는 (num_nodes, batch_size, input_seq_length, in_feat) 4D tensor를 입력으로 사용합니다. graph convolution layer는 다음 단계로 계산이 수행됩니다.

  • 노드들의 표현은 input feature에 self.weight가 곱해지면서 self.compute_nodes_representation()에서 계산됩니다.
  • aggregated neighbors' messages는 집계된 neighbors' 표현에 self.weight를 곱하여 self.compute_aggregated_messages()에서 계산됩니다.
  • 층의 마지막에서 노드 표현과 neighbors' aggregated messages가 결합되면서 self.update()에서 계산됩니다.

위 설명은 아래 코드에서 주석을 참고하여 읽어보면 쉽게 이해됩니다.

class GraphConv(layers.Layer):
    def __init__(
        self,
        in_feat,
        out_feat,
        graph_info: GraphInfo,
        aggregation_type="mean",
        combination_type="concat",
        activation: typing.Optional[str] = None,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.in_feat = in_feat
        self.out_feat = out_feat
        self.graph_info = graph_info
        self.aggregation_type = aggregation_type
        self.combination_type = combination_type
        self.weight = tf.Variable(
            initial_value=keras.initializers.glorot_uniform()(
                shape=(in_feat, out_feat), dtype="float32"
            ),
            trainable=True,
        )
        self.activation = layers.Activation(activation)

    def aggregate(self, neighbour_representations: tf.Tensor):
      # aggregation 방법에 따라 쓰이는 함수가 달라집니다
        aggregation_func = {
            "sum": tf.math.unsorted_segment_sum,
            "mean": tf.math.unsorted_segment_mean,
            "max": tf.math.unsorted_segment_max,
        }.get(self.aggregation_type)

        if aggregation_func:
            return aggregation_func(
                neighbour_representations,
                self.graph_info.edges[0],
                num_segments=self.graph_info.num_nodes,
            )

        raise ValueError(f"Invalid aggregation type: {self.aggregation_type}")

    def compute_nodes_representation(self, features: tf.Tensor):
        """각 노드 표현을 계산합니다.

        노드 표현은 feature tensor에 self.weight가 곱해지면서 만들어집니다.
        self.weight shape는 (in_feat, out_feat).

        Args:
            features: Tensor of shape `(num_nodes, batch_size, input_seq_len, in_feat)`

        Returns:
            A tensor of shape `(num_nodes, batch_size, input_seq_len, out_feat)`
        """
        return tf.matmul(features, self.weight)

    def compute_aggregated_messages(self, features: tf.Tensor):
        neighbour_representations = tf.gather(features, self.graph_info.edges[1])
        aggregated_messages = self.aggregate(neighbour_representations)
        return tf.matmul(aggregated_messages, self.weight)

    def update(self, nodes_representation: tf.Tensor, aggregated_messages: tf.Tensor):
        if self.combination_type == "concat":
            h = tf.concat([nodes_representation, aggregated_messages], axis=-1)
        elif self.combination_type == "add":
            h = nodes_representation + aggregated_messages
        else:
            raise ValueError(f"Invalid combination type: {self.combination_type}.")

        return self.activation(h)

    def call(self, features: tf.Tensor):
        """Forward pass.

        Args:
            features: tensor of shape `(num_nodes, batch_size, input_seq_len, in_feat)`

        Returns:
            A tensor of shape `(num_nodes, batch_size, input_seq_len, out_feat)`
        """
        nodes_representation = self.compute_nodes_representation(features)
        aggregated_messages = self.compute_aggregated_messages(features)
        return self.update(nodes_representation, aggregated_messages)

LSTM plus graph convolution

graph convolution layer를 통과시키면, 노드 표현을 포함한 4D tensor를 output으로 얻습니다. 여기서 각 timestep은 이웃 노드의 정보가 집계(aggregate)된 노드 표현입니다.

좋은 예측을 위해선 이웃 노드의 정보뿐만 아니라 시간에 따른 정보를 처리할 수 있어야 합니다. 이를 위해 node tensor를 recurrent layer에 통과시킵니다. 아래 코드에서 첫 번째로 graph convolution layer에 입력을 넣고나서 LSTM layer를 통과시키는 LSTMGC layer를 볼 수 있습니다.

class LSTMGC(layers.Layer):
    """Layer comprising a convolution layer followed by LSTM and dense layers."""

    def __init__(
        self,
        in_feat,
        out_feat,
        lstm_units: int,
        input_seq_len: int,
        output_seq_len: int,
        graph_info: GraphInfo,
        graph_conv_params: typing.Optional[dict] = None,
        **kwargs,
    ):
        super().__init__(**kwargs)

        # graph conv layer
        if graph_conv_params is None:
            graph_conv_params = {
                "aggregation_type": "mean",
                "combination_type": "concat",
                "activation": None,
            }
        self.graph_conv = GraphConv(in_feat, out_feat, graph_info, **graph_conv_params)

        self.lstm = layers.LSTM(lstm_units, activation="relu")
        self.dense = layers.Dense(output_seq_len)

        self.input_seq_len, self.output_seq_len = input_seq_len, output_seq_len

    def call(self, inputs):
        """Forward pass.

        Args:
            inputs: tf.Tensor of shape `(batch_size, input_seq_len, num_nodes, in_feat)`

        Returns:
            A tensor of shape `(batch_size, output_seq_len, num_nodes)`.
        """

        # convert shape to  (num_nodes, batch_size, input_seq_len, in_feat)
        inputs = tf.transpose(inputs, [2, 0, 1, 3])

        gcn_out = self.graph_conv(
            inputs
        )  # gcn_out has shape: (num_nodes, batch_size, input_seq_len, out_feat)
        shape = tf.shape(gcn_out)
        num_nodes, batch_size, input_seq_len, out_feat = (
            shape[0],
            shape[1],
            shape[2],
            shape[3],
        )

        # LSTM takes only 3D tensors as input
        gcn_out = tf.reshape(gcn_out, (batch_size * num_nodes, input_seq_len, out_feat))
        lstm_out = self.lstm(
            gcn_out
        )  # lstm_out has shape: (batch_size * num_nodes, lstm_units)

        dense_output = self.dense(
            lstm_out
        )  # dense_output has shape: (batch_size * num_nodes, output_seq_len)
        output = tf.reshape(dense_output, (num_nodes, batch_size, self.output_seq_len))
        return tf.transpose(
            output, [1, 2, 0]
        )  # returns Tensor of shape (batch_size, output_seq_len, num_nodes)

Model training

데이터를 다시 살펴보면 아래와 같습니다.

  • X 데이터는 (64, 12, 26, 1) ~ (batch_size, input_sequence_length(각 도로별 과거 체증값), 도로 노드, 도로 특성)
  • Y 데이터는 (64, 3, 26) ~ (batch_size, 예측 시점 수, 도로 노드)
in_feat = 1
batch_size = 64
epochs = 20
input_sequence_length = 12
forecast_horizon = 3
multi_horizon = False
out_feat = 10
lstm_units = 64
graph_conv_params = {
    "aggregation_type": "mean",
    "combination_type": "concat",
    "activation": None,
}

st_gcn = LSTMGC(
    in_feat,
    out_feat,
    lstm_units,
    input_sequence_length,
    forecast_horizon,
    graph,
    graph_conv_params,
)
inputs = layers.Input((input_sequence_length, graph.num_nodes, in_feat))
outputs = st_gcn(inputs)

model = keras.models.Model(inputs, outputs)
model.compile(
    optimizer=keras.optimizers.RMSprop(learning_rate=0.0002),
    loss=keras.losses.MeanSquaredError(),
)
model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=epochs,
    callbacks=[keras.callbacks.EarlyStopping(patience=10)],
)

Making forecasts on test set

학습된 모델로 테스트셋을 활용하여 예측 해봅니다. 아래에서 모델 예측값으로 얻은 MAE와 단순(naive) MAE를 비교합니다. 단순 예측는 각 노드의 마지막 값이 사용됩니다.

# (3788, 12, 26, 1), (3788, 1, 26)
x_test, y = next(test_dataset.as_numpy_iterator())
y_pred = model.predict(x_test)
plt.figure(figsize=(18, 6))
plt.plot(y[:, 0, 0])
plt.plot(y_pred[:, 0, 0])
plt.legend(["actual", "forecast"])

# naive는 마지막 값과 비교
naive_mse, model_mse = (
    np.square(x_test[:, -1, :, 0] - y[:, 0, :]).mean(),
    np.square(y_pred[:, 0, :] - y[:, 0, :]).mean(),
)
print(f"naive MAE: {naive_mse}, model MAE: {model_mse}")

LSTMGC block을 쌓으면 좀 더 좋은 결과를 얻을 수 있습니다.

이 글은 다음 Keras Example을 번역합니다.

https://keras.io/examples/structured_data/tabtransformer/

 

Keras documentation: Structured data learning with TabTransformer

Structured data learning with TabTransformer Author: Khalid Salama Date created: 2022/01/18 Last modified: 2022/01/18 Description: Using contextual embeddings for structured data classification. View in Colab • GitHub source Introduction This example dem

keras.io


Introduction

이 예제는 suvervised, semi-supervised로 활용할 수 있는 TabTransformer를 다룹니다. TabTransformer는 self-attention의 Transformer로 이루어지며, 범주형 특성을 임베딩하는 일반적인 층이 아닌 문맥을 고려할 수 있는 임베딩 층을 사용하여 더 높은 정확도를 달성할 수 있습니다.

이 예제는 TensorFlow 2.7 이상, TensorFlow Addons가 필요합니다.

pip install -U tensorflow-addons

Setup

import math
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa
import matplotlib.pyplot as plt

Prepare the data

이 예제에서는 UC Irvine Machine Learning Repository에서 제공하는 United States Census Income Dataset을 사용합니다. 이 데이터셋은 한 사람이 연간 USD 50,000 이상 벌 가능성이 있는지 여부를 판단하는 이진 분류 문제입니다.

5 numerical feature, 9 categorical feature로 이루어진 48,842 데이터를 포함하고 있습니다.

먼저, 데이터셋을 로드합니다.

CSV_HEADER = [
    "age",
    "workclass",
    "fnlwgt",
    "education",
    "education_num",
    "marital_status",
    "occupation",
    "relationship",
    "race",
    "gender",
    "capital_gain",
    "capital_loss",
    "hours_per_week",
    "native_country",
    "income_bracket",
]

train_data_url = (
    "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data"
)
train_data = pd.read_csv(train_data_url, header=None, names=CSV_HEADER)

test_data_url = (
    "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test"
)
test_data = pd.read_csv(test_data_url, header=None, names=CSV_HEADER)

print(f"Train dataset shape: {train_data.shape}") # (32561, 15)
print(f"Test dataset shape: {test_data.shape}") # (16282, 15)

test_data의 첫 번째 행은 검증되지 않은 데이터이므로 제거하고, 레이블에 포함되어 있는 '.'을 제거합니다.

test_data = test_data[1:]
test_data.income_bracket = test_data.income_bracket.apply(
    lambda value: value.replace(".", "")
)

CSV 파일로 저장합니다.

train_data_file = "train_data.csv"
test_data_file = "test_data.csv"

train_data.to_csv(train_data_file, index=False, header=False)
test_data.to_csv(test_data_file, index=False, header=False)

Define dataset metadata

다음은 input feature를 인코딩하고, 처리하기 유용하도록 데이터셋의 메타데이터를 정의합니다.

# NUMERICAL FEATURE 목록입니다
NUMERIC_FEATURE_NAMES = [
    "age",
    "education_num",
    "capital_gain",
    "capital_loss",
    "hours_per_week",
]
# CATEGORICAL FEATURES, VOCABULARY를 모아놓은 DICT입니다
CATEGORICAL_FEATURES_WITH_VOCABULARY = {
    "workclass": sorted(list(train_data["workclass"].unique())),
    "education": sorted(list(train_data["education"].unique())),
    "marital_status": sorted(list(train_data["marital_status"].unique())),
    "occupation": sorted(list(train_data["occupation"].unique())),
    "relationship": sorted(list(train_data["relationship"].unique())),
    "race": sorted(list(train_data["race"].unique())),
    "gender": sorted(list(train_data["gender"].unique())),
    "native_country": sorted(list(train_data["native_country"].unique())),
}
# WEIGHT COLUMN 이름을 정의합니다
WEIGHT_COLUMN_NAME = "fnlwgt"
# CATEGORICAL FEATURE 이름 목록입니다
CATEGORICAL_FEATURE_NAMES = list(CATEGORICAL_FEATURES_WITH_VOCABULARY.keys())
# INPUT FEATURE의 모든 목록입니다
FEATURE_NAMES = NUMERIC_FEATURE_NAMES + CATEGORICAL_FEATURE_NAMES
# CSV_HEADER에 있는 값이면 [0], 아니면 ['NA']로 채웁니다
COLUMN_DEFAULTS = [
    [0.0] if feature_name in NUMERIC_FEATURE_NAMES + [WEIGHT_COLUMN_NAME] else ["NA"]
    for feature_name in CSV_HEADER
]
# TARGET FEATURE 이름입니다
TARGET_FEATURE_NAME = "income_bracket"
# TARGET FEATURE LABEL 목록입니다
TARGET_LABELS = [" <=50K", " >50K"]

Configure the hyperparameters

모델 구조와 트레이닝 옵션 관련 하이퍼파라미터를 정의합니다.

LEARNING_RATE = 0.001
WEIGHT_DECAY = 0.0001
DROPOUT_RATE = 0.2
BATCH_SIZE = 265
NUM_EPOCHS = 15

NUM_TRANSFORMER_BLOCKS = 3  # transformer block 갯수
NUM_HEADS = 4  # attention head 갯수
EMBEDDING_DIMS = 16  # 임베딩 차원
MLP_HIDDEN_UNITS_FACTORS = [
    2,
    1,
]  # MLP hidden layer unit 갯수
NUM_MLP_BLOCKS = 2  # MLP block 갯수

Implement data reading pipeline

파일을 읽고 처리하는 함수를 정의하고, 훈련 및 평가를 위해 feature와 label을 tf.data.Dataset으로 변환합니다.

target_label_lookup = layers.StringLookup(
    vocabulary=TARGET_LABELS, mask_token=None, num_oov_indices=0
)

# target(label)을 StringLookup 함수에 통과시킵니다
def prepare_example(features, target):
    target_index = target_label_lookup(target)
    weights = features.pop(WEIGHT_COLUMN_NAME)
    return features, target_index, weights


def get_dataset_from_csv(csv_file_path, batch_size=128, shuffle=False):
    dataset = tf.data.experimental.make_csv_dataset(
        csv_file_path,
        batch_size=batch_size,
        column_names=CSV_HEADER,
        column_defaults=COLUMN_DEFAULTS,
        label_name=TARGET_FEATURE_NAME,
        num_epochs=1,
        header=False,
        na_value="?",
        shuffle=shuffle,
    ).map(prepare_example, num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)
    return dataset.cache()

Implement a training and evaluation procedure

def run_experiment(
    model,
    train_data_file,
    test_data_file,
    num_epochs,
    learning_rate,
    weight_decay,
    batch_size,
):

    optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    )

    model.compile(
        optimizer=optimizer,
        loss=keras.losses.BinaryCrossentropy(),
        metrics=[keras.metrics.BinaryAccuracy(name="accuracy")],
    )

    train_dataset = get_dataset_from_csv(train_data_file, batch_size, shuffle=True)
    validation_dataset = get_dataset_from_csv(test_data_file, batch_size)

    print("Start training the model...")
    history = model.fit(
        train_dataset, epochs=num_epochs, validation_data=validation_dataset
    )
    print("Model training finished")

    _, accuracy = model.evaluate(validation_dataset, verbose=0)

    print(f"Validation accuracy: {round(accuracy * 100, 2)}%")

    return history

Create model inputs

Dictionary 형태로 model input을 구성합니다.

def create_model_inputs():
    inputs = {}
    for feature_name in FEATURE_NAMES:
        if feature_name in NUMERIC_FEATURE_NAMES:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.float32
            )
        else:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.string
            )
    return inputs

Encode features

encode_inputs method는 numerical_feature_list와 embedding_dims로 categorical feature를 임베딩한 encoded_categorical_feature_list를 반환합니다.

def encode_inputs(inputs, embedding_dims):

    encoded_categorical_feature_list = []
    numerical_feature_list = []

    for feature_name in inputs:
        if feature_name in CATEGORICAL_FEATURE_NAMES:

            # categorical feature의 vocabulary를 받아옵니다.
            vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]

            # vocabulary의 string value를 integer 형태로 변환하고,
            # mask token은 사용하지 않기 떄문에 mask_token은 None으로
            # num_oov_indices는 0으로 설정합니다.
            lookup = layers.StringLookup(
                vocabulary=vocabulary,
                mask_token=None,
                num_oov_indices=0,
                output_mode="int",
            )

            # string input value를 interger 형태로 변환합니다.
            encoded_feature = lookup(inputs[feature_name])

            # Embedding Layer를 정의합니다.
            embedding = layers.Embedding(
                input_dim=len(vocabulary), output_dim=embedding_dims
            )

            # Embedding Layer에 통과시켜 임베딩된 value를 얻습니다.
            encoded_categorical_feature = embedding(encoded_feature)
            encoded_categorical_feature_list.append(encoded_categorical_feature)

        else:

            # numerical feature는 별도의 처리없이 다음과 같이 list에 담습니다.
            numerical_feature = tf.expand_dims(inputs[feature_name], -1)
            numerical_feature_list.append(numerical_feature)

    return encoded_categorical_feature_list, numerical_feature_list

Implement an MLP block

def create_mlp(hidden_units, dropout_rate, activation, normalization_layer, name=None):

    mlp_layers = []
    for units in hidden_units:
        mlp_layers.append(normalization_layer),
        mlp_layers.append(layers.Dense(units, activation=activation))
        mlp_layers.append(layers.Dropout(dropout_rate))

    return keras.Sequential(mlp_layers, name=name)

Experiment 1: a baseline model

첫 번째 실험으로, 간단한 multi-layer feed-forward network를 만듭니다.

def create_baseline_model(
    embedding_dims, num_mlp_blocks, mlp_hidden_units_factors, dropout_rate
):

    # model inputs를 생성합니다.
    inputs = create_model_inputs()
    # categorical, numerical feature를 인코딩합니다.
    encoded_categorical_feature_list, numerical_feature_list = encode_inputs(
        inputs, embedding_dims
    )
    # 모든 feature를 합칩니다.
    features = layers.concatenate(
        encoded_categorical_feature_list + numerical_feature_list
    )
    # features 마지막 차원을 hidden_units 하이퍼파라미터로 사용합니다.
    feedforward_units = [features.shape[-1]]

    # Create several feedforwad layers with skip connections.
    for layer_idx in range(num_mlp_blocks):
        features = create_mlp(
            hidden_units=feedforward_units,
            dropout_rate=dropout_rate,
            activation=keras.activations.gelu,
            normalization_layer=layers.LayerNormalization(epsilon=1e-6),
            name=f"feedforward_{layer_idx}",
        )(features)

    # Compute MLP hidden_units.
    mlp_hidden_units = [
        factor * features.shape[-1] for factor in mlp_hidden_units_factors
    ]
    # Create final MLP.
    features = create_mlp(
        hidden_units=mlp_hidden_units,
        dropout_rate=dropout_rate,
        activation=keras.activations.selu,
        normalization_layer=layers.BatchNormalization(),
        name="MLP",
    )(features)

    # Add a sigmoid as a binary classifer.
    outputs = layers.Dense(units=1, activation="sigmoid", name="sigmoid")(features)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model


baseline_model = create_baseline_model(
    embedding_dims=EMBEDDING_DIMS,
    num_mlp_blocks=NUM_MLP_BLOCKS,
    mlp_hidden_units_factors=MLP_HIDDEN_UNITS_FACTORS,
    dropout_rate=DROPOUT_RATE,
)

print("Total model weights:", baseline_model.count_params())
keras.utils.plot_model(baseline_model, show_shapes=True, rankdir="LR")

훈련 및 평가를 수행합니다.

history = run_experiment(
    model=baseline_model,
    train_data_file=train_data_file,
    test_data_file=test_data_file,
    num_epochs=NUM_EPOCHS,
    learning_rate=LEARNING_RATE,
    weight_decay=WEIGHT_DECAY,
    batch_size=BATCH_SIZE,
)

Experiment 2: TabTransformer

Tabtransformer 구조는 다음과 같습니다.

  1. 모든 categorical feature는 동일한 embedding_dims로 category feature embedding됩니다. 각 categorical feature가 고유한 임베딩 벡터를 가지게 됩니다.
  2. categorical feature인 column에 대해 column embedding이 추가됩니다. 예제 모델은 각 column을 표현할 수 있는 Embedding Layer를 추가해서 1번의 categorical feature 임베딩 벡터와 더해줍니다.
  3. 임베딩된 categorical feature는 트랜스포머에 입력됩니다. 각 트랜스포머 블럭은 multi-head self-attention layer와 feed-forward layer로 구성됩니다.
  4. categorical feature의 contextual embedding을 담당하는 마지막 Transformer layer에서 numerical feature와 concat을 수행한 뒤, MLP block에 입력됩니다.
  5. 1번 실험과 다르게 softmax classifier가 사용됩니다.(?)

논문에서 column embedding에 대한 내용을 자세하게 다루며, 모델 구조를 볼 수 있습니다.

모델 구성 순서입니다.

categorical, numerical encoding&embedding → categorical column embedding →
categorical embedding vector + column embedding vector → (Multi-head attention → skip connection →
MLP block → skip connection) → concat with numerical features → MLP block → Classifier

def create_tabtransformer_classifier(
    num_transformer_blocks,
    num_heads,
    embedding_dims,
    mlp_hidden_units_factors,
    dropout_rate,
    use_column_embedding=False,
):

    # model inputs를 생성합니다.
    inputs = create_model_inputs()
    # 각 feature를 인코딩합니다.
    encoded_categorical_feature_list, numerical_feature_list = encode_inputs(
        inputs, embedding_dims
    )
    # categorical feature는 Transformer에 입력하기 위해 stack 합니다.
    # (None, 8, 16)이 됩니다.
    encoded_categorical_features = tf.stack(encoded_categorical_feature_list, axis=1)
    # (None, 5)가 됩니다.
    numerical_features = layers.concatenate(numerical_feature_list)

    # categorical feature embedding에 column embedding을 추가합니다.
    if use_column_embedding:
        num_columns = encoded_categorical_features.shape[1]
        column_embedding = layers.Embedding(
            input_dim=num_columns, output_dim=embedding_dims
        )
        column_indices = tf.range(start=0, limit=num_columns, delta=1)
        # (None, 8, 16) + (8, 16)
        encoded_categorical_features = encoded_categorical_features + column_embedding(
            column_indices
        )

    # Create multiple layers of the Transformer block.
    for block_idx in range(num_transformer_blocks):
        # Create a multi-head attention layer.
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads,
            key_dim=embedding_dims,
            dropout=dropout_rate,
            name=f"multihead_attention_{block_idx}",
        )(encoded_categorical_features, encoded_categorical_features)
        # Skip connection 1.
        x = layers.Add(name=f"skip_connection1_{block_idx}")(
            [attention_output, encoded_categorical_features]
        )
        # Layer normalization 1.
        x = layers.LayerNormalization(name=f"layer_norm1_{block_idx}", epsilon=1e-6)(x)
        # Feedforward.
        feedforward_output = create_mlp(
            hidden_units=[embedding_dims],
            dropout_rate=dropout_rate,
            activation=keras.activations.gelu,
            normalization_layer=layers.LayerNormalization(epsilon=1e-6),
            name=f"feedforward_{block_idx}",
        )(x)
        # Skip connection 2.
        x = layers.Add(name=f"skip_connection2_{block_idx}")([feedforward_output, x])
        # Layer normalization 2.
        encoded_categorical_features = layers.LayerNormalization(
            name=f"layer_norm2_{block_idx}", epsilon=1e-6
        )(x)

    # Flatten the "contextualized" embeddings of the categorical features.
    categorical_features = layers.Flatten()(encoded_categorical_features)
    # Apply layer normalization to the numerical features.
    numerical_features = layers.LayerNormalization(epsilon=1e-6)(numerical_features)
    # Prepare the input for the final MLP block.
    features = layers.concatenate([categorical_features, numerical_features])

    # Compute MLP hidden_units.
    mlp_hidden_units = [
        factor * features.shape[-1] for factor in mlp_hidden_units_factors
    ]
    # Create final MLP.
    features = create_mlp(
        hidden_units=mlp_hidden_units,
        dropout_rate=dropout_rate,
        activation=keras.activations.selu,
        normalization_layer=layers.BatchNormalization(),
        name="MLP",
    )(features)

    # Add a sigmoid as a binary classifer.
    outputs = layers.Dense(units=1, activation="sigmoid", name="sigmoid")(features)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model


tabtransformer_model = create_tabtransformer_classifier(
    num_transformer_blocks=NUM_TRANSFORMER_BLOCKS,
    num_heads=NUM_HEADS,
    embedding_dims=EMBEDDING_DIMS,
    mlp_hidden_units_factors=MLP_HIDDEN_UNITS_FACTORS,
    dropout_rate=DROPOUT_RATE,
)

print("Total model weights:", tabtransformer_model.count_params())
keras.utils.plot_model(tabtransformer_model, show_shapes=True, rankdir="LR")

훈련 및 평가를 진행합니다.

history = run_experiment(
    model=tabtransformer_model,
    train_data_file=train_data_file,
    test_data_file=test_data_file,
    num_epochs=NUM_EPOCHS,
    learning_rate=LEARNING_RATE,
    weight_decay=WEIGHT_DECAY,
    batch_size=BATCH_SIZE,
)

추가로 아래 예제 마지막 결론을 간단히 해석해보면
TabTransformer는 Embedding이 핵심 아이디어이기 때문에 unlabeled 데이터를 pre-train에 활용할 수 있다고 합니다. 아마 semi-supervised를 표현하는 것 같아 보입니다.

TabTransformer significantly outperforms MLP and recent deep networks for tabular data while matching the performance of tree-based ensemble models. TabTransformer can be learned in end-to-end supervised training using labeled examples. For a scenario where there are a few labeled examples and a large number of unlabeled examples, a pre-training procedure can be employed to train the Transformer layers using unlabeled data. This is followed by fine-tuning of the pre-trained Transformer layers along with the top MLP layer using the labeled data.

이 글은 다음 튜토리얼을 번역한 것입니다.

https://keras.io/examples/vision/mlp_image_classification/

 

Keras documentation: Image classification with modern MLP models

Image classification with modern MLP models Author: Khalid Salama Date created: 2021/05/30 Last modified: 2021/05/30 Description: Implementing the MLP-Mixer, FNet, and gMLP models for CIFAR-100 image classification. View in Colab • GitHub source Introduc

keras.io

 


Introduction

이 예제는 최근 많이 사용되고 있는 Attention 방법을 사용하지 않고 MLP(multi-layer perceptron)을 사용하는 세 가지 모델에 대해 CIFAR-100 데이터셋을 활용하여 이미지 분류 문제를 해결해봅니다.

  1. 두 가지 타입의 MLP를 사용하는 MLP Mixer model(by Ilya Tolstikhin et al.)을 구현합니다
  2. based on unparameterized Fourier Transform, FNet model을 구현합니다
  3. gating 방법을 사용하는 gMLP model을 구현합니다

이 예제의 목적은 다른 데이터셋에서 서로 다른 성능을 낼 수 있기 때문에 각 모델의 성능을 비교하는 것이 아닙니다. 모델을 구성하는 main block들을 간단히 구현해보는 예제입니다.

+ 이 예제에서 살펴볼 세 개 모델은 다른 방법에 비해 꽤 간단하지만, 성능은 좋기 때문에 논문을 한번 찾아볼 필요가 있을 것 같습니다.
+ Attention mechnism도 여전히 강력하고, 훌륭한 방법이지만, 최근 연구 트렌드는 MLP인 것 같습니다.

tensorflow version 2.4 또는 그 이상에서 실행할 수 있으며, tf-addons 모듈 설치가 필요합니다.

pip install -U tensorflow-addons

Setup

import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa

Prepare the data

num_classes = 100
input_shape = (32, 32, 3)

(x_train, y_train), (x_test, y_test) = keras.datasets.cifar100.load_data()

print(f"x_train shape: {x_train.shape} - y_train shape: {y_train.shape}")
print(f"x_test shape: {x_test.shape} - y_test shape: {y_test.shape}")

Configure the hyperparameters

weight_decay = 0.0001
batch_size = 128
num_epochs = 50
dropout_rate = 0.2
image_size = 64  # 이미지 resize 크기
patch_size = 8  # 이미지 패치 크기
num_patches = (image_size // patch_size) ** 2  # 패치로 이루어진 data array 크기
embedding_dim = 256  # Number of hidden units.
num_blocks = 4  # Number of blocks.

print(f"Image size: {image_size} X {image_size} = {image_size ** 2}")
print(f"Patch size: {patch_size} X {patch_size} = {patch_size ** 2} ")
print(f"Patches per image: {num_patches}")
print(f"Elements per patch (3 channels): {(patch_size ** 2) * 3}")

Build a classification model

processing block을 포함한 classifier를 구현합니다.

def build_classifier(blocks, positional_encoding=False):
    inputs = layers.Input(shape=input_shape) # inputs: [batch_size, 32, 32, 3]
    # Augment data.
    # augmented: [batch_size, 64, 64, 3]
    augmented = data_augmentation(inputs)

    # Create patches.
    # patches: [batch_size, 64, 192]
    patches = Patches(patch_size, num_patches)(augmented)
    
    # [batch_size, num_patches, embedding_dim] tensor를 생성하기 위한 패치 인코딩 부분입니다.
    x = layers.Dense(units=embedding_dim)(patches)
    
    if positional_encoding:
        positions = tf.range(start=0, limit=num_patches, delta=1)
        position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=embedding_dim
        )(positions)
        x = x + position_embedding

    # Process x using the module blocks.
    x = blocks(x)
    
    # global average pooling을 사용
    # [batch_size, embedding_dim] 형태의 representation tensor를 만듭니다.
    representation = layers.GlobalAveragePooling1D()(x)
    representation = layers.Dropout(rate=dropout_rate)(representation)
    
    # Compute logits outputs.
    logits = layers.Dense(num_classes)(representation)
    
    # Create the Keras model.
    return keras.Model(inputs=inputs, outputs=logits)

Define an experiment

compile, train, evaluate를 위한 구현입니다.

def run_experiment(model):
    # Create Adam optimizer with weight decay.
    optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay,
    )
    # Compile the model.
    # top5-acc -> name = "top5-acc"
    model.compile(
        optimizer=optimizer,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[
            keras.metrics.SparseCategoricalAccuracy(name="acc"),
            keras.metrics.SparseTopKCategoricalAccuracy(5, name="top5-acc"),
        ],
    )
    # learning rate 스케쥴러
    reduce_lr = keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss", factor=0.5, patience=5
    )
    # early stopping callback
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor="val_loss", patience=10, restore_best_weights=True
    )
    # Fit the model.
    history = model.fit(
        x=x_train,
        y=y_train,
        batch_size=batch_size,
        epochs=num_epochs,
        validation_split=0.1,
        callbacks=[early_stopping, reduce_lr],
    )

    _, accuracy, top_5_accuracy = model.evaluate(x_test, y_test)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")
    print(f"Test top 5 accuracy: {round(top_5_accuracy * 100, 2)}%")

    # Return history to plot learning curves.
    return history

Use data augmentation

data_augmentation = keras.Sequential(
    [
        layers.experimental.preprocessing.Normalization(),
        layers.experimental.preprocessing.Resizing(image_size, image_size),
        layers.experimental.preprocessing.RandomFlip("horizontal"),
        layers.experimental.preprocessing.RandomZoom(
            height_factor=0.2, width_factor=0.2
        ),
    ],
    name="data_augmentation",
)
# normalization을 위해 training data의 mean, varfmf 계산합니다.
data_augmentation.layers[0].adapt(x_train)

Implement patch extraction as a layer

Patches class를 이해하려면 tf.image.extract_patches를 이해할 필요가 있습니다.

class Patches(layers.Layer):
    def __init__(self, patch_size, num_patches):
        super(Patches, self).__init__()
        self.patch_size = patch_size
        self.num_patches = num_patches

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, self.num_patches, patch_dims])
        return patches

The MLP-Mixer model

MLP-Mixer model은 MLP만 사용하는 아키텍처이며, 두 가지 유형의 MLP layer를 포함합니다.

  1. 각 이미지 패치를 독립적으로 사용함으로써 per-location feature를 혼합합니다
  2. 채널축으로 적용하여 spatial information을 혼합합니다.

이 방법은 어떻게 보면 Xception model에서 대표적으로 사용한 depthwise separable convolution 구조와 유사해보이지만, 차이점으로는 two chained dense transforms, no max pooling, layer normalization instead of batch normalization 사용에 있습니다.

Implement the MLP-Mixer model

class MLPMixerLayer(layers.Layer):
    def __init__(self, num_patches, hidden_units, dropout_rate, *args, **kwargs):
        super(MLPMixerLayer, self).__init__(*args, **kwargs)

        self.mlp1 = keras.Sequential(
            [
                layers.Dense(units=num_patches),
                tfa.layers.GELU(),
                layers.Dense(units=num_patches),
                layers.Dropout(rate=dropout_rate),
            ]
        )
        self.mlp2 = keras.Sequential(
            [
                layers.Dense(units=num_patches),
                tfa.layers.GELU(),
                layers.Dense(units=embedding_dim),
                layers.Dropout(rate=dropout_rate),
            ]
        )
        self.normalize = layers.LayerNormalization(epsilon=1e-6)

    def call(self, inputs):
        # Apply layer normalization.
        x = self.normalize(inputs)

        # [num_batches, num_patches, hidden_units] -> [num_batches, hidden_units, num_patches]
        x_channels = tf.linalg.matrix_transpose(x)
        
        # mlp1을 채널 독립적으로 적용합니다.
        # Dense Layer는 2-D 이상일 경우, 마지막 차원에서 가중치 연산이 일어납니다 -> 일종의 trick으로 사용
        mlp1_outputs = self.mlp1(x_channels)
        
        # [num_batches, hidden_dim, num_patches] -> [num_batches, num_patches, hidden_units]
        mlp1_outputs = tf.linalg.matrix_transpose(mlp1_outputs)
        
        # Add skip connection.
        x = mlp1_outputs + inputs
        
        # Apply layer normalization.
        x_patches = self.normalize(x)
        
        # mlp2를 각 패치에 독립적으로 적용합니다.
        mlp2_outputs = self.mlp2(x_patches)
        
        # Add skip connection.
        x = x + mlp2_outputs
        
        return x

Build, train, and evaluate the MLP-Mixer model

mlpmixer_blocks = keras.Sequential(
    [MLPMixerLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)]
)
learning_rate = 0.005
mlpmixer_classifier = build_classifier(mlpmixer_blocks)
history = run_experiment(mlpmixer_classifier)

MLP-Mixer 모델은 다른 모델에 비해 사용되는 파라미터 수가 적습니다. 

MLP-Mixer 논문에 언급된 바에 따르면, large-dataset에 pre-trained하고 modern regularization 방법을 함께 사용한다면 SOTA 모델들의 score와 경쟁할 수 있는 수준의 score를 얻을 수 있다고 합니다. 임베딩 차원, 블록 수, 입력 이미지 크기를 늘리거나 다른 패치 크기를 사용하거나 모델을 더 오랫동안 학습시키면 더 좋은 성능을 얻을 수 있습니다.


The FNet model

FNet 모델은 Transformer block과 유사한 구조의 블록을 사용합니다. 하지만 FNet 모델은 self-attention layer 대신, 파라미터에서 자유로운 2D Fourier transformation layer를 사용합니다.

위 모델과 동일하게 패치, 채널 독립적 연산이 사용됩니다.

Implement the FNet module

class FNetLayer(layers.Layer):
    def __init__(self, num_patches, embedding_dim, dropout_rate, *args, **kwargs):
        super(FNetLayer, self).__init__(*args, **kwargs)

        self.ffn = keras.Sequential(
            [
                layers.Dense(units=embedding_dim),
                tfa.layers.GELU(),
                layers.Dropout(rate=dropout_rate),
                layers.Dense(units=embedding_dim),
            ]
        )

        self.normalize1 = layers.LayerNormalization(epsilon=1e-6)
        self.normalize2 = layers.LayerNormalization(epsilon=1e-6)

    def call(self, inputs):
        # Apply fourier transformations.
        x = tf.cast(
            tf.signal.fft2d(tf.cast(inputs, dtype=tf.dtypes.complex64)),
            dtype=tf.dtypes.float32,
        )
        # Add skip connection.
        x = x + inputs
        # Apply layer normalization.
        x = self.normalize1(x)
        # Apply Feedfowrad network.
        x_ffn = self.ffn(x)
        # Add skip connection.
        x = x + x_ffn
        # Apply layer normalization.
        return self.normalize2(x)

Build, train, and evaluate the FNet model

fnet_blocks = keras.Sequential(
    [FNetLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)]
)
learning_rate = 0.001
fnet_classifier = build_classifier(fnet_blocks, positional_encoding=True)
history = run_experiment(fnet_classifier)

The gMLP model

gMLP 모델은 Spatial Gating Unit(SGU)를 사용하는 MLP 아키텍처입니다. SGU는 spatial(channel) dimention을 따라 각 패치가 상호작용할 수 있도록 합니다.

  1.  각 패치를 따라 linear projection을 적용해서 input을 spatially transform합니다
  2. inputs와 spatially transform input을 element-wise multiplication합니다

Implement the gMLP module

class gMLPLayer(layers.Layer):
    def __init__(self, num_patches, embedding_dim, dropout_rate, *args, **kwargs):
        super(gMLPLayer, self).__init__(*args, **kwargs)

        self.channel_projection1 = keras.Sequential(
            [
                layers.Dense(units=embedding_dim * 2),
                tfa.layers.GELU(),
                layers.Dropout(rate=dropout_rate),
            ]
        )

        self.channel_projection2 = layers.Dense(units=embedding_dim)

        self.spatial_projection = layers.Dense(
            units=num_patches, bias_initializer="Ones"
        )

        self.normalize1 = layers.LayerNormalization(epsilon=1e-6)
        self.normalize2 = layers.LayerNormalization(epsilon=1e-6)

    def spatial_gating_unit(self, x):
        # Split x along the channel dimensions.
        # Tensors u and v will in th shape of [batch_size, num_patchs, embedding_dim].
        u, v = tf.split(x, num_or_size_splits=2, axis=2)
        # Apply layer normalization.
        v = self.normalize2(v)
        # Apply spatial projection.
        # [batch_size, num_patches, embedding_dim] -> [batch_size, embedding_dim, num_patches]
        v_channels = tf.linalg.matrix_transpose(v)
        v_projected = self.spatial_projection(v_channels)
        v_projected = tf.linalg.matrix_transpose(v_projected)
        
        # Apply element-wise multiplication.
        return u * v_projected

    def call(self, inputs):
        # Apply layer normalization.
        x = self.normalize1(inputs)
        
        # 여기서 embedding_dim을 2배로 만들어주고,
        # Apply the first channel projection. x_projected shape: [batch_size, num_patches, embedding_dim * 2].
        x_projected = self.channel_projection1(x)
        
        # 2배로 만들어진 channel을 두 개로 쪼개서 하나는 projection을 적용, 하나는 기존걸 유지한 뒤,
        # element-wise multiplication을 적용함. 연산은 skip-connection과 비슷한 원리
        # Apply the spatial gating unit. x_spatial shape: [batch_size, num_patches, embedding_dim].
        x_spatial = self.spatial_gating_unit(x_projected)
        
        # Apply the second channel projection. x_projected shape: [batch_size, num_patches, embedding_dim].
        x_projected = self.channel_projection2(x_spatial)
        
        # Add skip connection.
        return x + x_projected

Build, train, and evaluate the gMLP model

gmlp_blocks = keras.Sequential(
    [gMLPLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)]
)
learning_rate = 0.003
gmlp_classifier = build_classifier(gmlp_blocks)
history = run_experiment(gmlp_classifier)

gMLP 논문에 따르면, embedding dimension, gMLP blocks을 늘리거나 더 오랫동안 학습시키면 더 나은 결과를 얻을 수 있다고 합니다. 또한, 입력 이미지 크기나 패치 크기를 조절해가면서 학습시켜보세요.

또, gMLP 논문에서의 구현은 advanced regularization strategies나 MixUp, CutMix 뿐만 아니라 AutoAugmentation을 사용했습니다.

이 글은 다음 튜토리얼을 번역한 것입니다.

 

Keras documentation: Classification with Gated Residual and Variable Selection Networks

Classification with Gated Residual and Variable Selection Networks Author: Khalid Salama Date created: 2021/02/10 Last modified: 2021/02/10 Description: Using Gated Residual and Variable Selection Networks for income level prediction. View in Colab • Git

keras.io


Introduction

이 예제에선 Structured data classification을 위해 Bryan Lim et al.(arxiv.org/abs/1912.09363)에서 제안된 Gated Residual NetworksVariable Selection Networks를 사용해봅니다. GRN은 필요에 따라 비선형성을 제공할 수 있는 유연성을 제공하고, VSN은 성능에 부정적인 영향을 주는 불필요한 feature를 제거하는 역할을 수행합니다. 또한, 이 두 가지를 함께 사용했을 때 모델의 학습 능력을 주요하게 향상시킬 수 있습니다.

이 예제는 논문에서 제안된 전체 모델을 구현하진 않고, 부분적으로 GRN과 VSN만 구현하게 됩니다.

※ TensorFlow 2.3 이상의 버전에서 정상 작동합니다.


The Dataset

데이터셋은 UC Irvine Machine Learning Repo에서 제공하는 United States Census Income Dataset을 사용합니다. 이진 분류이며, 연소득 50K가 넘는 사람인지를 구분하는 문제입니다.

데이터셋은 41개의 feature(7 numerical + 34 categorical)과 ~300K의 인스턴스로 이루어져 있습니다.


Setup

import math
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

Prepare Data

데이터를 로드합니다.

# Column names.
CSV_HEADER = [
    "age", "class_of_worker", "detailed_industry_recode",
    "detailed_occupation_recode", "education", "wage_per_hour",
    "enroll_in_edu_inst_last_wk", "marital_stat", "major_industry_code",
    "major_occupation_code", "race", "hispanic_origin",
    "sex", "member_of_a_labor_union", "reason_for_unemployment",
    "full_or_part_time_employment_stat", "capital_gains", "capital_losses",
    "dividends_from_stocks", "tax_filer_stat", "region_of_previous_residence",
    "state_of_previous_residence", "detailed_household_and_family_stat",
    "detailed_household_summary_in_household", "instance_weight",
    "migration_code-change_in_msa", "migration_code-change_in_reg",
    "migration_code-move_within_reg", "live_in_this_house_1_year_ago",
    "migration_prev_res_in_sunbelt", "num_persons_worked_for_employer",
    "family_members_under_18", "country_of_birth_father",
    "country_of_birth_mother", "country_of_birth_self",
    "citizenship", "own_business_or_self_employed",
    "fill_inc_questionnaire_for_veteran's_admin", "veterans_benefits",
    "weeks_worked_in_year", "year", "income_level"
]

data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census-income.data.gz"
data = pd.read_csv(data_url, header=None, names=CSV_HEADER)

test_data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census-income.test.gz"
test_data = pd.read_csv(test_data_url, header=None, names=CSV_HEADER)

print(f"Data shape: {data.shape}") # (199523, 42)
print(f"Test data shape: {test_data.shape}") # (99762, 42)

target은 ' - 50000.', ' 50000+.' 두 가지로 이루어져 있습니다.

따라서 이 두 가지를 Integer 형태로 바꿔줍니다.

data["income_level"] = data["income_level"].apply(
    lambda x: 0 if x == " - 50000." else 1
)

test_data["income_level"] = test_data["income_level"].apply(
    lambda x: 0 if x == " - 50000." else 1
)

훈련/검증 데이터로 분할합니다.

random_selection = np.random.rand(len(data.index)) <= 0.85
train_data = data[random_selection]
valid_data = data[~random_selection]

분할한 데이터를 csv 파일로 저장해둡니다.

train_data_file = "train_data.csv"
valid_data_file = "valid_data.csv"
test_data_file = "test_data.csv"

train_data.to_csv(train_data_file, index=False, header=False)
valid_data.to_csv(valid_data_file, index=False, header=False)
test_data.to_csv(test_data_file, index=False, header=False)

Define dataset metadata

reading, parsing, encoding이 원할하게 수행될 수 있도록 metadata를 정의하겠습니다.

# Target feature name.
TARGET_FEATURE_NAME = "income_level"

# Weight column name.
WEIGHT_COLUMN_NAME = "instance_weight"

# Numeric feature names.
NUMERIC_FEATURE_NAMES = [
    "age", "wage_per_hour",
    "capital_gains", "capital_losses",
    "dividends_from_stocks", "num_persons_worked_for_employer",
    "weeks_worked_in_year"
]

# Categorical feature와 feature가 가지고 있는 unique label을 구합니다.
CATEGORICAL_FEATURES_WITH_VOCABULARY = {
    feature_name: sorted([str(value) for value in list(data[feature_name].unique())])
    for feature_name in CSV_HEADER
    if feature_name
    not in list(NUMERIC_FEATURE_NAMES + [WEIGHT_COLUMN_NAME, TARGET_FEATURE_NAME])
}

# All features names.
FEATURE_NAMES = NUMERIC_FEATURE_NAMES + list(
    CATEGORICAL_FEATURES_WITH_VOCABULARY.keys()
)

# default 값 처리를 위해 categorical은 ["NA"], 나머지는 [0.0]으로 초기화합니다.
COLUMN_DEFAULTS = [
    [0.0]
    if feature_name in NUMERIC_FEATURE_NAMES + [TARGET_FEATURE_NAME, WEIGHT_COLUMN_NAME]
    else ["NA"]
    for feature_name in CSV_HEADER
]

Create a tf.data.Dataset for training and evaluation

file을 read and parsing하고 feature, label 변환을 수행하기 위해 tf.data.Dataset을 사용합니다.

from tensorflow.keras.layers.experimental.preprocessing import StringLookup

def process(features, target):
    for feature_name in features:
        if feature_name in CATEGORICAL_FEATURES_WITH_VOCABULARY:
            # Categorical feature이면 string으로 cast합니다.
            features[feature_name] = tf.cast(features[feature_name], tf.dtypes.string)
    # Get the instance weight.
    weight = features.pop(WEIGHT_COLUMN_NAME)
    
    return features, target, weight

def get_dataset_from_csv(csv_file_path, shuffle=False, batch_size=128):
    # csv 파일을 읽어오는 method
    dataset = tf.data.experimental.make_csv_dataset(
        csv_file_path,
        batch_size=batch_size,
        column_names=CSV_HEADER,
        column_defaults=COLUMN_DEFAULTS,
        label_name=TARGET_FEATURE_NAME,
        num_epochs=1,
        header=False,
        shuffle=shuffle,
    ).map(process)

    return dataset

Create model inputs

def create_model_inputs():
    inputs = {}
    
    for feature_name in FEATURE_NAMES:
        if feature_name in NUMERIC_FEATURE_NAMES:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.float32
            )
        else:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.string
            )
            
    return inputs

Encode input features

Categorical featureencoding_size dimension을 가지는 Embedding Layer를 사용해서 인코딩합니다. Numerical featurelayers.Dense를 사용해서 encoding_size dimension을 가지도록 선형 변환합니다. 따라서, 모든 인코딩된 feature는 동일한 dimension을 가지도록 구성됩니다.

embedding representation으로 변환하는 작업은 아래 테스트 코드로 확인해보세요.

index = StringLookup(
                vocabulary=CATEGORICAL_FEATURES_WITH_VOCABULARY['country_of_birth_father'], 
                mask_token=None, num_oov_indices=0
                )
test = train_data['country_of_birth_father'][:32]

# print(index(test))

embedding_ecoder = layers.Embedding(
                input_dim=len(CATEGORICAL_FEATURES_WITH_VOCABULARY['country_of_birth_father']),
                output_dim=200
            )

print(embedding_ecoder(index(test)))

 

from tensorflow.keras.layers.experimental.preprocessing import CategoryEncoding
from tensorflow.keras.layers.experimental.preprocessing import StringLookup

def encode_inputs(inputs, encoding_size):
    encoded_features = []
    
    for feature_name in inputs:
        if feature_name in CATEGORICAL_FEATURES_WITH_VOCABULARY:
            vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]
            # Create a lookup to convert a string values to an integer indices.
            # String value를 Integer index로 변환합니다.
            # 따라서, 동일한 label은 동일한 index로 변환됩니다.
            index = StringLookup(
                vocabulary=vocabulary, mask_token=None, num_oov_indices=0
            )
            # String -> Integer index
            value_index = index(inputs[feature_name])
            # encoding_size dimension을 가지는 Embedding layer를 정의합니다.
            embedding_ecoder = layers.Embedding(
                input_dim=len(vocabulary), output_dim=encoding_size
            )
            # index value를 embedding representation으로 변환합니다.
            encoded_feature = embedding_ecoder(value_index)
        else:
            # encoding_size unit을 가지는 Dense layer를 활용해서 numeric feature를 인코딩합니다.
            encoded_feature = tf.expand_dims(inputs[feature_name], -1)
            encoded_feature = layers.Dense(units=encoding_size)(encoded_feature)
        encoded_features.append(encoded_feature)
        
    return encoded_features

Implement the Gated Linear Unit

GLU는 주어진 작업에서 연관없는 input을 사용하지 않게끔 하는 유연성을 제공합니다.

class GatedLinearUnit(layers.Layer):
    def __init__(self, units):
        super(GatedLinearUnit, self).__init__()
        self.linear = layers.Dense(units)
        self.sigmoid = layers.Dense(units, activation="sigmoid")

    def call(self, inputs):
        return self.linear(inputs) * self.sigmoid(inputs)

Implement the Gated Residual Network

GRN은 다음과 같이 구성됩니다.

  1. ELU Activation을 적용합니다.
  2. Dropout을 적용합니다.
  3. GLU를 적용하고, original input과 Adding합니다. dimension이 다르면 project layer를 활용합니다.
  4. normalization을 적용하고, output을 반환합니다.
class GatedResidualNetwork(layers.Layer):
    def __init__(self, units, dropout_rate):
        super(GatedResidualNetwork, self).__init__()
        self.units = units
        self.elu_dense = layers.Dense(units, activation="elu")
        self.linear_dense = layers.Dense(units)
        self.dropout = layers.Dropout(dropout_rate)
        self.gated_linear_unit = GatedLinearUnit(units)
        self.layer_norm = layers.LayerNormalization()
        self.project = layers.Dense(units)

    def call(self, inputs):
        # 1.
        x = self.elu_dense(inputs)
        # 2.
        x = self.linear_dense(x)
        x = self.dropout(x)
        # 3.
        if inputs.shape[-1] != self.units:
            inputs = self.project(inputs)
        x = inputs + self.gated_linear_unit(x)
        # 4.
        x = self.layer_norm(x)
        
        return x

Implement the Variable Selection Network

VSN은 다음과 같이 구성됩니다.

  1. 각 feature에 개별적으로 GRN을 적용합니다.
  2. GRN을 적용한 feature를 concat하고, softmax 함수를 적용합니다.
  3. 개별 GRN의 가중합을 구합니다.

VSN은 input feature 개수와 상관없이 [batch_size, encoding_size] 형태를 가지는 output을 반환합니다.

class VariableSelection(layers.Layer):
    def __init__(self, num_features, units, dropout_rate):
        super(VariableSelection, self).__init__()
        self.grns = list()
        # Create a GRN for each feature independently
        for idx in range(num_features):
            grn = GatedResidualNetwork(units, dropout_rate)
            self.grns.append(grn)
        # Create a GRN for the concatenation of all the features
        self.grn_concat = GatedResidualNetwork(units, dropout_rate)
        self.softmax = layers.Dense(units=num_features, activation="softmax")

    def call(self, inputs):
        v = layers.concatenate(inputs) # (batch_size, embedding_size * num_features)
        v = self.grn_concat(v) # (batch_size, units)
        v = tf.expand_dims(self.softmax(v), axis=-1) # (batch_size, num_features, 1)

        x = []
        for idx, input in enumerate(inputs):
            x.append(self.grns[idx](input))
        x = tf.stack(x, axis=1) # (batch_size, num_features, units)

        # (batch_size, units)
        # (1, num_features) by (num_features, units) -> (1, units)
        outputs = tf.squeeze(tf.matmul(v, x, transpose_a=True), axis=1)

        return outputs

# test용 코드
# VariableSelection(num_features = 2, units = 100, dropout_rate = 0.1)([embedding_ecoder(index(test)), embedding_ecoder(index(test))])

Create Gated Residual and Variable Selection Networks model

def create_model(encoding_size):
    inputs = create_model_inputs()
    
    feature_list = encode_inputs(inputs, encoding_size)
    num_features = len(feature_list)

    features = VariableSelection(num_features, encoding_size, dropout_rate)(
        feature_list
    )

    outputs = layers.Dense(units=1, activation="sigmoid")(features)
    model = keras.Model(inputs=inputs, outputs=outputs)

    return model

Compile, train, and evaluate the model

learning_rate = 0.001
dropout_rate = 0.15
batch_size = 265
num_epochs = 20
encoding_size = 16

model = create_model(encoding_size)
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
    loss=keras.losses.BinaryCrossentropy(),
    metrics=[keras.metrics.BinaryAccuracy(name="accuracy")],
)


# Create an early stopping callback.
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=5, restore_best_weights=True
)

print("Start training the model...")
train_dataset = get_dataset_from_csv(
    train_data_file, shuffle=True, batch_size=batch_size
)
valid_dataset = get_dataset_from_csv(valid_data_file, batch_size=batch_size)
model.fit(
    train_dataset,
    epochs=num_epochs,
    validation_data=valid_dataset,
    callbacks=[early_stopping],
)
print("Model training finished.")

print("Evaluating model performance...")
test_dataset = get_dataset_from_csv(test_data_file, batch_size=batch_size)
_, accuracy = model.evaluate(test_dataset)

print(f"Test accuracy: {round(accuracy * 100, 2)}%")

예제에서는 95% acc를 달성합니다. 

acc 향상을 위해 encoding_size를 조절하거나 여러 개의 GRN 또는 VSN을 사용해볼 수 있습니다. 또, dropout_rate 조절은 overfitting을 피할 수 있도록 도울겁니다.

이 글은 다음 튜토리얼을 번역한 것입니다.


Introduction

이 예제는 Structured Data Classification을 위해 P. Kontschieder et al에서 제안된 Deep neural Dicision Forest 구현에 관한 내용입니다. 확률적이고 미분가능한 dicision tree 구성, end-to-end 학습 방법, deep representation learning을 통한 decision tree 통합을 어떻게 하는지를 다룹니다.

 

The Dataset

이 예제는  UC Irvine Machine Learning Repository에서 제공하고 있는 United states Census Income Dataset을 활용합니다. 한 개인이 연간 50,000 USD 이상 버는지, 벌지 못하는지에 관한 이진 분류 문제입니다.

5개 numerical, 9개 categorical로 이루어진 총 14개 특징(나이, 직업 수준, 교육, 직업 등)과 함께 48,842개의 데이터를 포함하고 있습니다.

 

Setup

import tensorflow as tf
import numpy as np
import pandas as pd
from tensorflow import keras
from tensorflow.keras import layers
import math

 

Prepare the data

CSV_HEADER = [
    "age", "workclass", "fnlwgt",
    "education", "education_num", "marital_status",
    "occupation", "relationship", "race",
    "gender", "capital_gain", "capital_loss",
    "hours_per_week", "native_country","income_bracket"
]

train_data_url = (
    "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data"
)
train_data = pd.read_csv(train_data_url, header=None, names=CSV_HEADER)

test_data_url = (
    "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test"
)
test_data = pd.read_csv(test_data_url, header=None, names=CSV_HEADER)

print(f"Train dataset shape: {train_data.shape}")
print(f"Test dataset shape: {test_data.shape}")

테스트 데이터에서 첫 번째 데이터는 유효하지 않으므로 삭제하고, 레이블 데이터에 포함되어 있는 '.'을 없애줍니다.

test_data = test_data[1:]
# <=50K. -> <=50K
test_data.income_bracket = test_data.income_bracket.apply(
    lambda value: value.replace(".", "")
)

학습 데이터와 트레이닝 데이터를 csv로 저장합니다.

 

Define dataset metadata

이제 데이터에 포함되어 있는 특징들에 대해 분석과 encoding이 용이하도록 메타데이터를 정의합니다.

# numerical feature 입니다.
NUMERIC_FEATURE_NAMES = [
    "age", "education_num",
    "capital_gain","capital_loss",
    "hours_per_week"
]

# categorical feature과 unique한 값만 모아놓은 vocabulary 입니다.
CATEGORICAL_FEATURES_WITH_VOCABULARY = {
    "workclass": sorted(list(train_data["workclass"].unique())),
    "education": sorted(list(train_data["education"].unique())),
    "marital_status": sorted(list(train_data["marital_status"].unique())),
    "occupation": sorted(list(train_data["occupation"].unique())),
    "relationship": sorted(list(train_data["relationship"].unique())),
    "race": sorted(list(train_data["race"].unique())),
    "gender": sorted(list(train_data["gender"].unique())),
    "native_country": sorted(list(train_data["native_country"].unique())),
}
# fnlwgt는 특징으로 사용하지 않습니다.
IGNORE_COLUMN_NAMES = ["fnlwgt"]
# categorical feature 입니다.
CATEGORICAL_FEATURE_NAMES = list(CATEGORICAL_FEATURES_WITH_VOCABULARY.keys())
# 모든 input feature 입니다.
FEATURE_NAMES = NUMERIC_FEATURE_NAMES + CATEGORICAL_FEATURE_NAMES
# 각 특징별로 default value를 지정해놓습니다.
COLUMN_DEFAULTS = [
    [0.0] if feature_name in NUMERIC_FEATURE_NAMES + IGNORE_COLUMN_NAMES else ["NA"]
    for feature_name in CSV_HEADER
]
# target feature 입니다.
TARGET_FEATURE_NAME = "income_bracket"
# target feature에 포함되어 있는 label입니다.
TARGET_LABELS = [" <=50K", " >50K"]

Create tf.data.Dataset objects for training and validation

tf.data.Dataset을 활용해서 파일, feature과 label을 읽고 분석할 수 있는 Dataset 객체로 변환합니다. 또, label은 각각 index로 매핑합니다.

from tensorflow.keras.layers.experimental.preprocessing import StringLookup

target_label_lookup = StringLookup(
    vocabulary=TARGET_LABELS, mask_token=None, num_oov_indices=0
)


def get_dataset_from_csv(csv_file_path, shuffle=False, batch_size=128):
    dataset = tf.data.experimental.make_csv_dataset(
        csv_file_path,
        batch_size=batch_size,
        column_names=CSV_HEADER,
        column_defaults=COLUMN_DEFAULTS,
        label_name=TARGET_FEATURE_NAME,
        num_epochs=1,
        header=False,
        na_value="?",
        shuffle=shuffle,
    ).map(lambda features, target: (features, target_label_lookup(target)))
    return dataset.cache()

Create model inputs

def create_model_inputs():
    inputs = {}
    for feature_name in FEATURE_NAMES:
        # Numerical과 Categorical을 구분합니다.
        if feature_name in NUMERIC_FEATURE_NAMES:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.float32
            )
        else:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.string
            )
    return inputs

Encode input features

from tensorflow.keras.layers.experimental.preprocessing import CategoryEncoding
from tensorflow.keras.layers.experimental.preprocessing import StringLookup

def encode_inputs(inputs, use_embedding=False):
    encoded_features = []
    for feature_name in inputs:
        # Categorical일 경우
        if feature_name in CATEGORICAL_FEATURE_NAMES:
            vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]
            # vocabulary에 포함되어 있는 unique value를 integer indice로 변환합니다.
            # (oov), mask token은 사용하지 않으므로
            # mask_token = None, num_oov_indices = 0으로 설정합니다.
            index = StringLookup(
                vocabulary=vocabulary, mask_token=None, num_oov_indices=0
            )
            # String value -> Integer value
            value_index = index(inputs[feature_name])

            # Embedding을 사용하는 경우
            if use_embedding:
                # embedding_dims를 정의하고, Embedding을 만듭니다.
                embedding_dims = int(math.sqrt(len(vocabulary)))
                embedding_ecoder = layers.Embedding(
                    input_dim=len(vocabulary), output_dim=embedding_dims
                )
                # 해당 feature에 포함된 값을 embedding representation으로 변환합니다.
                encoded_feature = embedding_ecoder(value_index)
            # Embedding을 사용하지 않는 경우
            else:
                # one-hot encoder를 선언하고, vocab을 adapt한 뒤, 인덱스로 변환합니다.
                onehot_encoder = CategoryEncoding(output_mode="binary")
                onehot_encoder.adapt(index(vocabulary))
                encoded_feature = onehot_encoder(value_index)
        # Numerical일 경우
        else:
            # Numerical Feature는 shape이 None이므로
            # Concat을 위해 dim을 늘려줍니다.
            encoded_feature = inputs[feature_name]
            if inputs[feature_name].shape[-1] is None:
                print(feature_name)
                encoded_feature = tf.expand_dims(encoded_feature, -1)

        encoded_features.append(encoded_feature)

    encoded_features = layers.concatenate(encoded_features)
    return encoded_features

Deep Neural Decision Tree

neural decision tree를 학습시키기 위해 두 가지 가중치 집합을 사용합니다. 첫 번째 집합인 pi는 tree leaves에서 클래스 확률 분포를 표현합니다. 두 번째 집합인 가중치를 포함한 layer decision_fn은 어떤 leave로 향할지에 대한 확률을 표현하는 집합입니다. forward pass는 아래 순서로 진행됩니다.

  1. 모델은 배치에서 instance의 모든 특성이 single vector로 인코딩된 형태의 input feature를 받습니다. 여기서 single vector는 CNN이나 dense transformation을 통해 만들어집니다. 여기서는 structured data를 사용하므로 dense layer를 사용합니다.
  2. used_features_mask를 사용해서 input feature 중 사용할 subset feature를 랜덤하게 선택합니다.
  3. 그리고 나서 전체 트리를 순회하면서 해당 instance가 leaves에 도달할 확률을 담을 mu를 계산합니다.
  4. 마지막으로 leaves에 도달할 확률과 leaves가 가지고 있는 class 확률이 합쳐져 최종 outputs을 생성하게 됩니다.
class NeuralDecisionTree(keras.Model):
    def __init__(self, depth, num_features, used_features_rate, num_classes):
        super(NeuralDecisionTree, self).__init__()
        self.depth = depth
        self.num_leaves = 2 ** depth
        self.num_classes = num_classes

        # input feature 중 subset feature를 랜덤으로 선택하기 위한 mask를 생성합니다.
        num_used_features = int(num_features * used_features_rate)
        one_hot = np.eye(num_features)
        sampled_feature_indicies = np.random.choice(
            np.arange(num_features), num_used_features, replace=False
        )
        self.used_features_mask = one_hot[sampled_feature_indicies]

        # leaves가 가지고 있는 class weights를 초기화합니다.
        self.pi = tf.Variable(
            initial_value=tf.random_normal_initializer()(
                shape=[self.num_leaves, self.num_classes]
            ),
            dtype="float32",
            trainable=True,
        )

        # 어떤 leave로 향할지에 대한 확률을 담은 decision_fn을 선언합니다.
        self.decision_fn = layers.Dense(
            units=self.num_leaves, activation="sigmoid", name="decision"
        )

    def call(self, features):
        batch_size = tf.shape(features)[0]

        # 마스크를 통해 subset feature를 선택합니다.
        # [batch_size, num_used_features]
        features = tf.matmul(
            features, self.used_features_mask, transpose_b=True
        )  
        # leave로 향할 확률을 계산합니다.
        # [batch_size, num_leaves, 1]
        decisions = tf.expand_dims(
            self.decision_fn(features), axis=2
        )  
        # 향하지 않을 확률을 계산하고, concat합니다.
        # [batch_size, num_leaves, 2]
        decisions = layers.concatenate(
            [decisions, 1 - decisions], axis=2
        )  

        mu = tf.ones([batch_size, 1, 1])

        begin_idx = 1
        end_idx = 2
        # 트리를 순회하면서 instance가 각 leaves에 도달할 확률을 mu에 담습니다.
        for level in range(self.depth):
            mu = tf.reshape(mu, [batch_size, -1, 1])  # [batch_size, 2 ** level, 1]
            mu = tf.tile(mu, (1, 1, 2))  # [batch_size, 2 ** level, 2]
            level_decisions = decisions[
                :, begin_idx:end_idx, :
            ]  # [batch_size, 2 ** level, 2]
            mu = mu * level_decisions  # [batch_size, 2**level, 2]
            begin_idx = end_idx
            end_idx = begin_idx + 2 ** (level + 1)

        mu = tf.reshape(mu, [batch_size, self.num_leaves])  # [batch_size, num_leaves]
        probabilities = keras.activations.softmax(self.pi)  # [num_leaves, num_classes]
        outputs = tf.matmul(mu, probabilities)  # [batch_size, num_classes]
        
        return outputs

Deep Neural Decision Forest

Forest 모델은 동시에 학습되는 여러 개의 Decision Tree로 이루어집니다. 마치 앙상블처럼, Forest 모델의 output은 Tree model output의 평균값입니다. 

class NeuralDecisionForest(keras.Model):
    def __init__(self, num_trees, depth, num_features, used_features_rate, num_classes):
        super(NeuralDecisionForest, self).__init__()
        self.ensemble = []
        # num_trees만큼 DecisionTree 모델을 ensemble에 담습니다.
        # 트리는 각각 input feature를 랜덤하게 선택하기 때문에 서로 다르게 학습됩니다.
        for _ in range(num_trees):
            self.ensemble.append(
                NeuralDecisionTree(depth, num_features, used_features_rate, num_classes)
            )

    def call(self, inputs):
        # Initialize the outputs: a [batch_size, num_classes] matrix of zeros.
        batch_size = tf.shape(inputs)[0]
        outputs = tf.zeros([batch_size, num_classes])

        # 각 트리 output을 모두 더해주고,
        # 평균을 구한 뒤 반환합니다.
        for tree in self.ensemble:
            outputs += tree(inputs)
        outputs /= len(self.ensemble)
        return outputs

마지막으로 학습과 평가를 위한 코드는 아래와 같습니다.

learning_rate = 0.01
batch_size = 265
num_epochs = 10
hidden_units = [64, 64]


def run_experiment(model):

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
        loss=keras.losses.SparseCategoricalCrossentropy(),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )

    print("Start training the model...")
    train_dataset = get_dataset_from_csv(
        train_data_file, shuffle=True, batch_size=batch_size
    )

    model.fit(train_dataset, epochs=num_epochs)
    print("Model training finished")

    print("Evaluating the model on the test data...")
    test_dataset = get_dataset_from_csv(test_data_file, batch_size=batch_size)

    _, accuracy = model.evaluate(test_dataset)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

Experiment 1: train a decision tree model

전체 input feature를 활용해서 Decision tree model을 학습시켜 봅니다.

num_trees = 10
depth = 10
used_features_rate = 1.0 # 전체 feature를 사용합니다.
num_classes = len(TARGET_LABELS)


def create_tree_model():
    inputs = create_model_inputs()
    features = encode_inputs(inputs, use_embedding=True)
    features = layers.BatchNormalization()(features)
    num_features = features.shape[1]

    tree = NeuralDecisionTree(depth, num_features, used_features_rate, num_classes)

    outputs = tree(features)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model


tree_model = create_tree_model()
run_experiment(tree_model)

Experiment 2: train a forest model

이 실험에서는 input feature의 50%만 사용하는 num_trees만큼의 트리를 사용하여 forest model을 학습합니다. used_features_rate를 활용해서 각 트리가 얼만큼의 input feature를 활용할지 정할 수 있습니다. 또, 이전 실험과 다르게 depth는 10에서 5로 조정합니다.

num_trees = 25
depth = 5
used_features_rate = 0.5


def create_forest_model():
    inputs = create_model_inputs()
    features = encode_inputs(inputs, use_embedding=True)
    features = layers.BatchNormalization()(features)
    num_features = features.shape[1]

    forest_model = NeuralDecisionForest(
        num_trees, depth, num_features, used_features_rate, num_classes
    )

    outputs = forest_model(features)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model


forest_model = create_forest_model()

run_experiment(forest_model)