이 글은 다음 튜토리얼을 번역한 것입니다.


Introduction

이 예제는 Structured Data Classification을 위해 P. Kontschieder et al에서 제안된 Deep neural Dicision Forest 구현에 관한 내용입니다. 확률적이고 미분가능한 dicision tree 구성, end-to-end 학습 방법, deep representation learning을 통한 decision tree 통합을 어떻게 하는지를 다룹니다.

 

The Dataset

이 예제는  UC Irvine Machine Learning Repository에서 제공하고 있는 United states Census Income Dataset을 활용합니다. 한 개인이 연간 50,000 USD 이상 버는지, 벌지 못하는지에 관한 이진 분류 문제입니다.

5개 numerical, 9개 categorical로 이루어진 총 14개 특징(나이, 직업 수준, 교육, 직업 등)과 함께 48,842개의 데이터를 포함하고 있습니다.

 

Setup

import tensorflow as tf
import numpy as np
import pandas as pd
from tensorflow import keras
from tensorflow.keras import layers
import math

 

Prepare the data

CSV_HEADER = [
    "age", "workclass", "fnlwgt",
    "education", "education_num", "marital_status",
    "occupation", "relationship", "race",
    "gender", "capital_gain", "capital_loss",
    "hours_per_week", "native_country","income_bracket"
]

train_data_url = (
    "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data"
)
train_data = pd.read_csv(train_data_url, header=None, names=CSV_HEADER)

test_data_url = (
    "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test"
)
test_data = pd.read_csv(test_data_url, header=None, names=CSV_HEADER)

print(f"Train dataset shape: {train_data.shape}")
print(f"Test dataset shape: {test_data.shape}")

테스트 데이터에서 첫 번째 데이터는 유효하지 않으므로 삭제하고, 레이블 데이터에 포함되어 있는 '.'을 없애줍니다.

test_data = test_data[1:]
# <=50K. -> <=50K
test_data.income_bracket = test_data.income_bracket.apply(
    lambda value: value.replace(".", "")
)

학습 데이터와 트레이닝 데이터를 csv로 저장합니다.

 

Define dataset metadata

이제 데이터에 포함되어 있는 특징들에 대해 분석과 encoding이 용이하도록 메타데이터를 정의합니다.

# numerical feature 입니다.
NUMERIC_FEATURE_NAMES = [
    "age", "education_num",
    "capital_gain","capital_loss",
    "hours_per_week"
]

# categorical feature과 unique한 값만 모아놓은 vocabulary 입니다.
CATEGORICAL_FEATURES_WITH_VOCABULARY = {
    "workclass": sorted(list(train_data["workclass"].unique())),
    "education": sorted(list(train_data["education"].unique())),
    "marital_status": sorted(list(train_data["marital_status"].unique())),
    "occupation": sorted(list(train_data["occupation"].unique())),
    "relationship": sorted(list(train_data["relationship"].unique())),
    "race": sorted(list(train_data["race"].unique())),
    "gender": sorted(list(train_data["gender"].unique())),
    "native_country": sorted(list(train_data["native_country"].unique())),
}
# fnlwgt는 특징으로 사용하지 않습니다.
IGNORE_COLUMN_NAMES = ["fnlwgt"]
# categorical feature 입니다.
CATEGORICAL_FEATURE_NAMES = list(CATEGORICAL_FEATURES_WITH_VOCABULARY.keys())
# 모든 input feature 입니다.
FEATURE_NAMES = NUMERIC_FEATURE_NAMES + CATEGORICAL_FEATURE_NAMES
# 각 특징별로 default value를 지정해놓습니다.
COLUMN_DEFAULTS = [
    [0.0] if feature_name in NUMERIC_FEATURE_NAMES + IGNORE_COLUMN_NAMES else ["NA"]
    for feature_name in CSV_HEADER
]
# target feature 입니다.
TARGET_FEATURE_NAME = "income_bracket"
# target feature에 포함되어 있는 label입니다.
TARGET_LABELS = [" <=50K", " >50K"]

Create tf.data.Dataset objects for training and validation

tf.data.Dataset을 활용해서 파일, feature과 label을 읽고 분석할 수 있는 Dataset 객체로 변환합니다. 또, label은 각각 index로 매핑합니다.

from tensorflow.keras.layers.experimental.preprocessing import StringLookup

target_label_lookup = StringLookup(
    vocabulary=TARGET_LABELS, mask_token=None, num_oov_indices=0
)


def get_dataset_from_csv(csv_file_path, shuffle=False, batch_size=128):
    dataset = tf.data.experimental.make_csv_dataset(
        csv_file_path,
        batch_size=batch_size,
        column_names=CSV_HEADER,
        column_defaults=COLUMN_DEFAULTS,
        label_name=TARGET_FEATURE_NAME,
        num_epochs=1,
        header=False,
        na_value="?",
        shuffle=shuffle,
    ).map(lambda features, target: (features, target_label_lookup(target)))
    return dataset.cache()

Create model inputs

def create_model_inputs():
    inputs = {}
    for feature_name in FEATURE_NAMES:
        # Numerical과 Categorical을 구분합니다.
        if feature_name in NUMERIC_FEATURE_NAMES:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.float32
            )
        else:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.string
            )
    return inputs

Encode input features

from tensorflow.keras.layers.experimental.preprocessing import CategoryEncoding
from tensorflow.keras.layers.experimental.preprocessing import StringLookup

def encode_inputs(inputs, use_embedding=False):
    encoded_features = []
    for feature_name in inputs:
        # Categorical일 경우
        if feature_name in CATEGORICAL_FEATURE_NAMES:
            vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]
            # vocabulary에 포함되어 있는 unique value를 integer indice로 변환합니다.
            # (oov), mask token은 사용하지 않으므로
            # mask_token = None, num_oov_indices = 0으로 설정합니다.
            index = StringLookup(
                vocabulary=vocabulary, mask_token=None, num_oov_indices=0
            )
            # String value -> Integer value
            value_index = index(inputs[feature_name])

            # Embedding을 사용하는 경우
            if use_embedding:
                # embedding_dims를 정의하고, Embedding을 만듭니다.
                embedding_dims = int(math.sqrt(len(vocabulary)))
                embedding_ecoder = layers.Embedding(
                    input_dim=len(vocabulary), output_dim=embedding_dims
                )
                # 해당 feature에 포함된 값을 embedding representation으로 변환합니다.
                encoded_feature = embedding_ecoder(value_index)
            # Embedding을 사용하지 않는 경우
            else:
                # one-hot encoder를 선언하고, vocab을 adapt한 뒤, 인덱스로 변환합니다.
                onehot_encoder = CategoryEncoding(output_mode="binary")
                onehot_encoder.adapt(index(vocabulary))
                encoded_feature = onehot_encoder(value_index)
        # Numerical일 경우
        else:
            # Numerical Feature는 shape이 None이므로
            # Concat을 위해 dim을 늘려줍니다.
            encoded_feature = inputs[feature_name]
            if inputs[feature_name].shape[-1] is None:
                print(feature_name)
                encoded_feature = tf.expand_dims(encoded_feature, -1)

        encoded_features.append(encoded_feature)

    encoded_features = layers.concatenate(encoded_features)
    return encoded_features

Deep Neural Decision Tree

neural decision tree를 학습시키기 위해 두 가지 가중치 집합을 사용합니다. 첫 번째 집합인 pi는 tree leaves에서 클래스 확률 분포를 표현합니다. 두 번째 집합인 가중치를 포함한 layer decision_fn은 어떤 leave로 향할지에 대한 확률을 표현하는 집합입니다. forward pass는 아래 순서로 진행됩니다.

  1. 모델은 배치에서 instance의 모든 특성이 single vector로 인코딩된 형태의 input feature를 받습니다. 여기서 single vector는 CNN이나 dense transformation을 통해 만들어집니다. 여기서는 structured data를 사용하므로 dense layer를 사용합니다.
  2. used_features_mask를 사용해서 input feature 중 사용할 subset feature를 랜덤하게 선택합니다.
  3. 그리고 나서 전체 트리를 순회하면서 해당 instance가 leaves에 도달할 확률을 담을 mu를 계산합니다.
  4. 마지막으로 leaves에 도달할 확률과 leaves가 가지고 있는 class 확률이 합쳐져 최종 outputs을 생성하게 됩니다.
class NeuralDecisionTree(keras.Model):
    def __init__(self, depth, num_features, used_features_rate, num_classes):
        super(NeuralDecisionTree, self).__init__()
        self.depth = depth
        self.num_leaves = 2 ** depth
        self.num_classes = num_classes

        # input feature 중 subset feature를 랜덤으로 선택하기 위한 mask를 생성합니다.
        num_used_features = int(num_features * used_features_rate)
        one_hot = np.eye(num_features)
        sampled_feature_indicies = np.random.choice(
            np.arange(num_features), num_used_features, replace=False
        )
        self.used_features_mask = one_hot[sampled_feature_indicies]

        # leaves가 가지고 있는 class weights를 초기화합니다.
        self.pi = tf.Variable(
            initial_value=tf.random_normal_initializer()(
                shape=[self.num_leaves, self.num_classes]
            ),
            dtype="float32",
            trainable=True,
        )

        # 어떤 leave로 향할지에 대한 확률을 담은 decision_fn을 선언합니다.
        self.decision_fn = layers.Dense(
            units=self.num_leaves, activation="sigmoid", name="decision"
        )

    def call(self, features):
        batch_size = tf.shape(features)[0]

        # 마스크를 통해 subset feature를 선택합니다.
        # [batch_size, num_used_features]
        features = tf.matmul(
            features, self.used_features_mask, transpose_b=True
        )  
        # leave로 향할 확률을 계산합니다.
        # [batch_size, num_leaves, 1]
        decisions = tf.expand_dims(
            self.decision_fn(features), axis=2
        )  
        # 향하지 않을 확률을 계산하고, concat합니다.
        # [batch_size, num_leaves, 2]
        decisions = layers.concatenate(
            [decisions, 1 - decisions], axis=2
        )  

        mu = tf.ones([batch_size, 1, 1])

        begin_idx = 1
        end_idx = 2
        # 트리를 순회하면서 instance가 각 leaves에 도달할 확률을 mu에 담습니다.
        for level in range(self.depth):
            mu = tf.reshape(mu, [batch_size, -1, 1])  # [batch_size, 2 ** level, 1]
            mu = tf.tile(mu, (1, 1, 2))  # [batch_size, 2 ** level, 2]
            level_decisions = decisions[
                :, begin_idx:end_idx, :
            ]  # [batch_size, 2 ** level, 2]
            mu = mu * level_decisions  # [batch_size, 2**level, 2]
            begin_idx = end_idx
            end_idx = begin_idx + 2 ** (level + 1)

        mu = tf.reshape(mu, [batch_size, self.num_leaves])  # [batch_size, num_leaves]
        probabilities = keras.activations.softmax(self.pi)  # [num_leaves, num_classes]
        outputs = tf.matmul(mu, probabilities)  # [batch_size, num_classes]
        
        return outputs

Deep Neural Decision Forest

Forest 모델은 동시에 학습되는 여러 개의 Decision Tree로 이루어집니다. 마치 앙상블처럼, Forest 모델의 output은 Tree model output의 평균값입니다. 

class NeuralDecisionForest(keras.Model):
    def __init__(self, num_trees, depth, num_features, used_features_rate, num_classes):
        super(NeuralDecisionForest, self).__init__()
        self.ensemble = []
        # num_trees만큼 DecisionTree 모델을 ensemble에 담습니다.
        # 트리는 각각 input feature를 랜덤하게 선택하기 때문에 서로 다르게 학습됩니다.
        for _ in range(num_trees):
            self.ensemble.append(
                NeuralDecisionTree(depth, num_features, used_features_rate, num_classes)
            )

    def call(self, inputs):
        # Initialize the outputs: a [batch_size, num_classes] matrix of zeros.
        batch_size = tf.shape(inputs)[0]
        outputs = tf.zeros([batch_size, num_classes])

        # 각 트리 output을 모두 더해주고,
        # 평균을 구한 뒤 반환합니다.
        for tree in self.ensemble:
            outputs += tree(inputs)
        outputs /= len(self.ensemble)
        return outputs

마지막으로 학습과 평가를 위한 코드는 아래와 같습니다.

learning_rate = 0.01
batch_size = 265
num_epochs = 10
hidden_units = [64, 64]


def run_experiment(model):

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
        loss=keras.losses.SparseCategoricalCrossentropy(),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )

    print("Start training the model...")
    train_dataset = get_dataset_from_csv(
        train_data_file, shuffle=True, batch_size=batch_size
    )

    model.fit(train_dataset, epochs=num_epochs)
    print("Model training finished")

    print("Evaluating the model on the test data...")
    test_dataset = get_dataset_from_csv(test_data_file, batch_size=batch_size)

    _, accuracy = model.evaluate(test_dataset)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

Experiment 1: train a decision tree model

전체 input feature를 활용해서 Decision tree model을 학습시켜 봅니다.

num_trees = 10
depth = 10
used_features_rate = 1.0 # 전체 feature를 사용합니다.
num_classes = len(TARGET_LABELS)


def create_tree_model():
    inputs = create_model_inputs()
    features = encode_inputs(inputs, use_embedding=True)
    features = layers.BatchNormalization()(features)
    num_features = features.shape[1]

    tree = NeuralDecisionTree(depth, num_features, used_features_rate, num_classes)

    outputs = tree(features)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model


tree_model = create_tree_model()
run_experiment(tree_model)

Experiment 2: train a forest model

이 실험에서는 input feature의 50%만 사용하는 num_trees만큼의 트리를 사용하여 forest model을 학습합니다. used_features_rate를 활용해서 각 트리가 얼만큼의 input feature를 활용할지 정할 수 있습니다. 또, 이전 실험과 다르게 depth는 10에서 5로 조정합니다.

num_trees = 25
depth = 5
used_features_rate = 0.5


def create_forest_model():
    inputs = create_model_inputs()
    features = encode_inputs(inputs, use_embedding=True)
    features = layers.BatchNormalization()(features)
    num_features = features.shape[1]

    forest_model = NeuralDecisionForest(
        num_trees, depth, num_features, used_features_rate, num_classes
    )

    outputs = forest_model(features)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model


forest_model = create_forest_model()

run_experiment(forest_model)