이 글은 다음 튜토리얼을 번역한 것입니다.

 

Keras documentation: Classification with Gated Residual and Variable Selection Networks

Classification with Gated Residual and Variable Selection Networks Author: Khalid Salama Date created: 2021/02/10 Last modified: 2021/02/10 Description: Using Gated Residual and Variable Selection Networks for income level prediction. View in Colab • Git

keras.io


Introduction

이 예제에선 Structured data classification을 위해 Bryan Lim et al.(arxiv.org/abs/1912.09363)에서 제안된 Gated Residual NetworksVariable Selection Networks를 사용해봅니다. GRN은 필요에 따라 비선형성을 제공할 수 있는 유연성을 제공하고, VSN은 성능에 부정적인 영향을 주는 불필요한 feature를 제거하는 역할을 수행합니다. 또한, 이 두 가지를 함께 사용했을 때 모델의 학습 능력을 주요하게 향상시킬 수 있습니다.

이 예제는 논문에서 제안된 전체 모델을 구현하진 않고, 부분적으로 GRN과 VSN만 구현하게 됩니다.

※ TensorFlow 2.3 이상의 버전에서 정상 작동합니다.


The Dataset

데이터셋은 UC Irvine Machine Learning Repo에서 제공하는 United States Census Income Dataset을 사용합니다. 이진 분류이며, 연소득 50K가 넘는 사람인지를 구분하는 문제입니다.

데이터셋은 41개의 feature(7 numerical + 34 categorical)과 ~300K의 인스턴스로 이루어져 있습니다.


Setup

import math
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

Prepare Data

데이터를 로드합니다.

# Column names.
CSV_HEADER = [
    "age", "class_of_worker", "detailed_industry_recode",
    "detailed_occupation_recode", "education", "wage_per_hour",
    "enroll_in_edu_inst_last_wk", "marital_stat", "major_industry_code",
    "major_occupation_code", "race", "hispanic_origin",
    "sex", "member_of_a_labor_union", "reason_for_unemployment",
    "full_or_part_time_employment_stat", "capital_gains", "capital_losses",
    "dividends_from_stocks", "tax_filer_stat", "region_of_previous_residence",
    "state_of_previous_residence", "detailed_household_and_family_stat",
    "detailed_household_summary_in_household", "instance_weight",
    "migration_code-change_in_msa", "migration_code-change_in_reg",
    "migration_code-move_within_reg", "live_in_this_house_1_year_ago",
    "migration_prev_res_in_sunbelt", "num_persons_worked_for_employer",
    "family_members_under_18", "country_of_birth_father",
    "country_of_birth_mother", "country_of_birth_self",
    "citizenship", "own_business_or_self_employed",
    "fill_inc_questionnaire_for_veteran's_admin", "veterans_benefits",
    "weeks_worked_in_year", "year", "income_level"
]

data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census-income.data.gz"
data = pd.read_csv(data_url, header=None, names=CSV_HEADER)

test_data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census-income.test.gz"
test_data = pd.read_csv(test_data_url, header=None, names=CSV_HEADER)

print(f"Data shape: {data.shape}") # (199523, 42)
print(f"Test data shape: {test_data.shape}") # (99762, 42)

target은 ' - 50000.', ' 50000+.' 두 가지로 이루어져 있습니다.

따라서 이 두 가지를 Integer 형태로 바꿔줍니다.

data["income_level"] = data["income_level"].apply(
    lambda x: 0 if x == " - 50000." else 1
)

test_data["income_level"] = test_data["income_level"].apply(
    lambda x: 0 if x == " - 50000." else 1
)

훈련/검증 데이터로 분할합니다.

random_selection = np.random.rand(len(data.index)) <= 0.85
train_data = data[random_selection]
valid_data = data[~random_selection]

분할한 데이터를 csv 파일로 저장해둡니다.

train_data_file = "train_data.csv"
valid_data_file = "valid_data.csv"
test_data_file = "test_data.csv"

train_data.to_csv(train_data_file, index=False, header=False)
valid_data.to_csv(valid_data_file, index=False, header=False)
test_data.to_csv(test_data_file, index=False, header=False)

Define dataset metadata

reading, parsing, encoding이 원할하게 수행될 수 있도록 metadata를 정의하겠습니다.

# Target feature name.
TARGET_FEATURE_NAME = "income_level"

# Weight column name.
WEIGHT_COLUMN_NAME = "instance_weight"

# Numeric feature names.
NUMERIC_FEATURE_NAMES = [
    "age", "wage_per_hour",
    "capital_gains", "capital_losses",
    "dividends_from_stocks", "num_persons_worked_for_employer",
    "weeks_worked_in_year"
]

# Categorical feature와 feature가 가지고 있는 unique label을 구합니다.
CATEGORICAL_FEATURES_WITH_VOCABULARY = {
    feature_name: sorted([str(value) for value in list(data[feature_name].unique())])
    for feature_name in CSV_HEADER
    if feature_name
    not in list(NUMERIC_FEATURE_NAMES + [WEIGHT_COLUMN_NAME, TARGET_FEATURE_NAME])
}

# All features names.
FEATURE_NAMES = NUMERIC_FEATURE_NAMES + list(
    CATEGORICAL_FEATURES_WITH_VOCABULARY.keys()
)

# default 값 처리를 위해 categorical은 ["NA"], 나머지는 [0.0]으로 초기화합니다.
COLUMN_DEFAULTS = [
    [0.0]
    if feature_name in NUMERIC_FEATURE_NAMES + [TARGET_FEATURE_NAME, WEIGHT_COLUMN_NAME]
    else ["NA"]
    for feature_name in CSV_HEADER
]

Create a tf.data.Dataset for training and evaluation

file을 read and parsing하고 feature, label 변환을 수행하기 위해 tf.data.Dataset을 사용합니다.

from tensorflow.keras.layers.experimental.preprocessing import StringLookup

def process(features, target):
    for feature_name in features:
        if feature_name in CATEGORICAL_FEATURES_WITH_VOCABULARY:
            # Categorical feature이면 string으로 cast합니다.
            features[feature_name] = tf.cast(features[feature_name], tf.dtypes.string)
    # Get the instance weight.
    weight = features.pop(WEIGHT_COLUMN_NAME)
    
    return features, target, weight

def get_dataset_from_csv(csv_file_path, shuffle=False, batch_size=128):
    # csv 파일을 읽어오는 method
    dataset = tf.data.experimental.make_csv_dataset(
        csv_file_path,
        batch_size=batch_size,
        column_names=CSV_HEADER,
        column_defaults=COLUMN_DEFAULTS,
        label_name=TARGET_FEATURE_NAME,
        num_epochs=1,
        header=False,
        shuffle=shuffle,
    ).map(process)

    return dataset

Create model inputs

def create_model_inputs():
    inputs = {}
    
    for feature_name in FEATURE_NAMES:
        if feature_name in NUMERIC_FEATURE_NAMES:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.float32
            )
        else:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.string
            )
            
    return inputs

Encode input features

Categorical featureencoding_size dimension을 가지는 Embedding Layer를 사용해서 인코딩합니다. Numerical featurelayers.Dense를 사용해서 encoding_size dimension을 가지도록 선형 변환합니다. 따라서, 모든 인코딩된 feature는 동일한 dimension을 가지도록 구성됩니다.

embedding representation으로 변환하는 작업은 아래 테스트 코드로 확인해보세요.

index = StringLookup(
                vocabulary=CATEGORICAL_FEATURES_WITH_VOCABULARY['country_of_birth_father'], 
                mask_token=None, num_oov_indices=0
                )
test = train_data['country_of_birth_father'][:32]

# print(index(test))

embedding_ecoder = layers.Embedding(
                input_dim=len(CATEGORICAL_FEATURES_WITH_VOCABULARY['country_of_birth_father']),
                output_dim=200
            )

print(embedding_ecoder(index(test)))

 

from tensorflow.keras.layers.experimental.preprocessing import CategoryEncoding
from tensorflow.keras.layers.experimental.preprocessing import StringLookup

def encode_inputs(inputs, encoding_size):
    encoded_features = []
    
    for feature_name in inputs:
        if feature_name in CATEGORICAL_FEATURES_WITH_VOCABULARY:
            vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]
            # Create a lookup to convert a string values to an integer indices.
            # String value를 Integer index로 변환합니다.
            # 따라서, 동일한 label은 동일한 index로 변환됩니다.
            index = StringLookup(
                vocabulary=vocabulary, mask_token=None, num_oov_indices=0
            )
            # String -> Integer index
            value_index = index(inputs[feature_name])
            # encoding_size dimension을 가지는 Embedding layer를 정의합니다.
            embedding_ecoder = layers.Embedding(
                input_dim=len(vocabulary), output_dim=encoding_size
            )
            # index value를 embedding representation으로 변환합니다.
            encoded_feature = embedding_ecoder(value_index)
        else:
            # encoding_size unit을 가지는 Dense layer를 활용해서 numeric feature를 인코딩합니다.
            encoded_feature = tf.expand_dims(inputs[feature_name], -1)
            encoded_feature = layers.Dense(units=encoding_size)(encoded_feature)
        encoded_features.append(encoded_feature)
        
    return encoded_features

Implement the Gated Linear Unit

GLU는 주어진 작업에서 연관없는 input을 사용하지 않게끔 하는 유연성을 제공합니다.

class GatedLinearUnit(layers.Layer):
    def __init__(self, units):
        super(GatedLinearUnit, self).__init__()
        self.linear = layers.Dense(units)
        self.sigmoid = layers.Dense(units, activation="sigmoid")

    def call(self, inputs):
        return self.linear(inputs) * self.sigmoid(inputs)

Implement the Gated Residual Network

GRN은 다음과 같이 구성됩니다.

  1. ELU Activation을 적용합니다.
  2. Dropout을 적용합니다.
  3. GLU를 적용하고, original input과 Adding합니다. dimension이 다르면 project layer를 활용합니다.
  4. normalization을 적용하고, output을 반환합니다.
class GatedResidualNetwork(layers.Layer):
    def __init__(self, units, dropout_rate):
        super(GatedResidualNetwork, self).__init__()
        self.units = units
        self.elu_dense = layers.Dense(units, activation="elu")
        self.linear_dense = layers.Dense(units)
        self.dropout = layers.Dropout(dropout_rate)
        self.gated_linear_unit = GatedLinearUnit(units)
        self.layer_norm = layers.LayerNormalization()
        self.project = layers.Dense(units)

    def call(self, inputs):
        # 1.
        x = self.elu_dense(inputs)
        # 2.
        x = self.linear_dense(x)
        x = self.dropout(x)
        # 3.
        if inputs.shape[-1] != self.units:
            inputs = self.project(inputs)
        x = inputs + self.gated_linear_unit(x)
        # 4.
        x = self.layer_norm(x)
        
        return x

Implement the Variable Selection Network

VSN은 다음과 같이 구성됩니다.

  1. 각 feature에 개별적으로 GRN을 적용합니다.
  2. GRN을 적용한 feature를 concat하고, softmax 함수를 적용합니다.
  3. 개별 GRN의 가중합을 구합니다.

VSN은 input feature 개수와 상관없이 [batch_size, encoding_size] 형태를 가지는 output을 반환합니다.

class VariableSelection(layers.Layer):
    def __init__(self, num_features, units, dropout_rate):
        super(VariableSelection, self).__init__()
        self.grns = list()
        # Create a GRN for each feature independently
        for idx in range(num_features):
            grn = GatedResidualNetwork(units, dropout_rate)
            self.grns.append(grn)
        # Create a GRN for the concatenation of all the features
        self.grn_concat = GatedResidualNetwork(units, dropout_rate)
        self.softmax = layers.Dense(units=num_features, activation="softmax")

    def call(self, inputs):
        v = layers.concatenate(inputs) # (batch_size, embedding_size * num_features)
        v = self.grn_concat(v) # (batch_size, units)
        v = tf.expand_dims(self.softmax(v), axis=-1) # (batch_size, num_features, 1)

        x = []
        for idx, input in enumerate(inputs):
            x.append(self.grns[idx](input))
        x = tf.stack(x, axis=1) # (batch_size, num_features, units)

        # (batch_size, units)
        # (1, num_features) by (num_features, units) -> (1, units)
        outputs = tf.squeeze(tf.matmul(v, x, transpose_a=True), axis=1)

        return outputs

# test용 코드
# VariableSelection(num_features = 2, units = 100, dropout_rate = 0.1)([embedding_ecoder(index(test)), embedding_ecoder(index(test))])

Create Gated Residual and Variable Selection Networks model

def create_model(encoding_size):
    inputs = create_model_inputs()
    
    feature_list = encode_inputs(inputs, encoding_size)
    num_features = len(feature_list)

    features = VariableSelection(num_features, encoding_size, dropout_rate)(
        feature_list
    )

    outputs = layers.Dense(units=1, activation="sigmoid")(features)
    model = keras.Model(inputs=inputs, outputs=outputs)

    return model

Compile, train, and evaluate the model

learning_rate = 0.001
dropout_rate = 0.15
batch_size = 265
num_epochs = 20
encoding_size = 16

model = create_model(encoding_size)
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
    loss=keras.losses.BinaryCrossentropy(),
    metrics=[keras.metrics.BinaryAccuracy(name="accuracy")],
)


# Create an early stopping callback.
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=5, restore_best_weights=True
)

print("Start training the model...")
train_dataset = get_dataset_from_csv(
    train_data_file, shuffle=True, batch_size=batch_size
)
valid_dataset = get_dataset_from_csv(valid_data_file, batch_size=batch_size)
model.fit(
    train_dataset,
    epochs=num_epochs,
    validation_data=valid_dataset,
    callbacks=[early_stopping],
)
print("Model training finished.")

print("Evaluating model performance...")
test_dataset = get_dataset_from_csv(test_data_file, batch_size=batch_size)
_, accuracy = model.evaluate(test_dataset)

print(f"Test accuracy: {round(accuracy * 100, 2)}%")

예제에서는 95% acc를 달성합니다. 

acc 향상을 위해 encoding_size를 조절하거나 여러 개의 GRN 또는 VSN을 사용해볼 수 있습니다. 또, dropout_rate 조절은 overfitting을 피할 수 있도록 도울겁니다.