` 1

이 글은 다음 Keras Example을 번역합니다.

https://keras.io/examples/structured_data/tabtransformer/

 

Keras documentation: Structured data learning with TabTransformer

Structured data learning with TabTransformer Author: Khalid Salama Date created: 2022/01/18 Last modified: 2022/01/18 Description: Using contextual embeddings for structured data classification. View in Colab • GitHub source Introduction This example dem

keras.io


Introduction

이 예제는 suvervised, semi-supervised로 활용할 수 있는 TabTransformer를 다룹니다. TabTransformer는 self-attention의 Transformer로 이루어지며, 범주형 특성을 임베딩하는 일반적인 층이 아닌 문맥을 고려할 수 있는 임베딩 층을 사용하여 더 높은 정확도를 달성할 수 있습니다.

이 예제는 TensorFlow 2.7 이상, TensorFlow Addons가 필요합니다.

pip install -U tensorflow-addons

Setup

import math
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa
import matplotlib.pyplot as plt

Prepare the data

이 예제에서는 UC Irvine Machine Learning Repository에서 제공하는 United States Census Income Dataset을 사용합니다. 이 데이터셋은 한 사람이 연간 USD 50,000 이상 벌 가능성이 있는지 여부를 판단하는 이진 분류 문제입니다.

5 numerical feature, 9 categorical feature로 이루어진 48,842 데이터를 포함하고 있습니다.

먼저, 데이터셋을 로드합니다.

CSV_HEADER = [
    "age",
    "workclass",
    "fnlwgt",
    "education",
    "education_num",
    "marital_status",
    "occupation",
    "relationship",
    "race",
    "gender",
    "capital_gain",
    "capital_loss",
    "hours_per_week",
    "native_country",
    "income_bracket",
]

train_data_url = (
    "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data"
)
train_data = pd.read_csv(train_data_url, header=None, names=CSV_HEADER)

test_data_url = (
    "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test"
)
test_data = pd.read_csv(test_data_url, header=None, names=CSV_HEADER)

print(f"Train dataset shape: {train_data.shape}") # (32561, 15)
print(f"Test dataset shape: {test_data.shape}") # (16282, 15)

test_data의 첫 번째 행은 검증되지 않은 데이터이므로 제거하고, 레이블에 포함되어 있는 '.'을 제거합니다.

test_data = test_data[1:]
test_data.income_bracket = test_data.income_bracket.apply(
    lambda value: value.replace(".", "")
)

CSV 파일로 저장합니다.

train_data_file = "train_data.csv"
test_data_file = "test_data.csv"

train_data.to_csv(train_data_file, index=False, header=False)
test_data.to_csv(test_data_file, index=False, header=False)

Define dataset metadata

다음은 input feature를 인코딩하고, 처리하기 유용하도록 데이터셋의 메타데이터를 정의합니다.

# NUMERICAL FEATURE 목록입니다
NUMERIC_FEATURE_NAMES = [
    "age",
    "education_num",
    "capital_gain",
    "capital_loss",
    "hours_per_week",
]
# CATEGORICAL FEATURES, VOCABULARY를 모아놓은 DICT입니다
CATEGORICAL_FEATURES_WITH_VOCABULARY = {
    "workclass": sorted(list(train_data["workclass"].unique())),
    "education": sorted(list(train_data["education"].unique())),
    "marital_status": sorted(list(train_data["marital_status"].unique())),
    "occupation": sorted(list(train_data["occupation"].unique())),
    "relationship": sorted(list(train_data["relationship"].unique())),
    "race": sorted(list(train_data["race"].unique())),
    "gender": sorted(list(train_data["gender"].unique())),
    "native_country": sorted(list(train_data["native_country"].unique())),
}
# WEIGHT COLUMN 이름을 정의합니다
WEIGHT_COLUMN_NAME = "fnlwgt"
# CATEGORICAL FEATURE 이름 목록입니다
CATEGORICAL_FEATURE_NAMES = list(CATEGORICAL_FEATURES_WITH_VOCABULARY.keys())
# INPUT FEATURE의 모든 목록입니다
FEATURE_NAMES = NUMERIC_FEATURE_NAMES + CATEGORICAL_FEATURE_NAMES
# CSV_HEADER에 있는 값이면 [0], 아니면 ['NA']로 채웁니다
COLUMN_DEFAULTS = [
    [0.0] if feature_name in NUMERIC_FEATURE_NAMES + [WEIGHT_COLUMN_NAME] else ["NA"]
    for feature_name in CSV_HEADER
]
# TARGET FEATURE 이름입니다
TARGET_FEATURE_NAME = "income_bracket"
# TARGET FEATURE LABEL 목록입니다
TARGET_LABELS = [" <=50K", " >50K"]

Configure the hyperparameters

모델 구조와 트레이닝 옵션 관련 하이퍼파라미터를 정의합니다.

LEARNING_RATE = 0.001
WEIGHT_DECAY = 0.0001
DROPOUT_RATE = 0.2
BATCH_SIZE = 265
NUM_EPOCHS = 15

NUM_TRANSFORMER_BLOCKS = 3  # transformer block 갯수
NUM_HEADS = 4  # attention head 갯수
EMBEDDING_DIMS = 16  # 임베딩 차원
MLP_HIDDEN_UNITS_FACTORS = [
    2,
    1,
]  # MLP hidden layer unit 갯수
NUM_MLP_BLOCKS = 2  # MLP block 갯수

Implement data reading pipeline

파일을 읽고 처리하는 함수를 정의하고, 훈련 및 평가를 위해 feature와 label을 tf.data.Dataset으로 변환합니다.

target_label_lookup = layers.StringLookup(
    vocabulary=TARGET_LABELS, mask_token=None, num_oov_indices=0
)

# target(label)을 StringLookup 함수에 통과시킵니다
def prepare_example(features, target):
    target_index = target_label_lookup(target)
    weights = features.pop(WEIGHT_COLUMN_NAME)
    return features, target_index, weights


def get_dataset_from_csv(csv_file_path, batch_size=128, shuffle=False):
    dataset = tf.data.experimental.make_csv_dataset(
        csv_file_path,
        batch_size=batch_size,
        column_names=CSV_HEADER,
        column_defaults=COLUMN_DEFAULTS,
        label_name=TARGET_FEATURE_NAME,
        num_epochs=1,
        header=False,
        na_value="?",
        shuffle=shuffle,
    ).map(prepare_example, num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)
    return dataset.cache()

Implement a training and evaluation procedure

def run_experiment(
    model,
    train_data_file,
    test_data_file,
    num_epochs,
    learning_rate,
    weight_decay,
    batch_size,
):

    optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    )

    model.compile(
        optimizer=optimizer,
        loss=keras.losses.BinaryCrossentropy(),
        metrics=[keras.metrics.BinaryAccuracy(name="accuracy")],
    )

    train_dataset = get_dataset_from_csv(train_data_file, batch_size, shuffle=True)
    validation_dataset = get_dataset_from_csv(test_data_file, batch_size)

    print("Start training the model...")
    history = model.fit(
        train_dataset, epochs=num_epochs, validation_data=validation_dataset
    )
    print("Model training finished")

    _, accuracy = model.evaluate(validation_dataset, verbose=0)

    print(f"Validation accuracy: {round(accuracy * 100, 2)}%")

    return history

Create model inputs

Dictionary 형태로 model input을 구성합니다.

def create_model_inputs():
    inputs = {}
    for feature_name in FEATURE_NAMES:
        if feature_name in NUMERIC_FEATURE_NAMES:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.float32
            )
        else:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.string
            )
    return inputs

Encode features

encode_inputs method는 numerical_feature_list와 embedding_dims로 categorical feature를 임베딩한 encoded_categorical_feature_list를 반환합니다.

def encode_inputs(inputs, embedding_dims):

    encoded_categorical_feature_list = []
    numerical_feature_list = []

    for feature_name in inputs:
        if feature_name in CATEGORICAL_FEATURE_NAMES:

            # categorical feature의 vocabulary를 받아옵니다.
            vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]

            # vocabulary의 string value를 integer 형태로 변환하고,
            # mask token은 사용하지 않기 떄문에 mask_token은 None으로
            # num_oov_indices는 0으로 설정합니다.
            lookup = layers.StringLookup(
                vocabulary=vocabulary,
                mask_token=None,
                num_oov_indices=0,
                output_mode="int",
            )

            # string input value를 interger 형태로 변환합니다.
            encoded_feature = lookup(inputs[feature_name])

            # Embedding Layer를 정의합니다.
            embedding = layers.Embedding(
                input_dim=len(vocabulary), output_dim=embedding_dims
            )

            # Embedding Layer에 통과시켜 임베딩된 value를 얻습니다.
            encoded_categorical_feature = embedding(encoded_feature)
            encoded_categorical_feature_list.append(encoded_categorical_feature)

        else:

            # numerical feature는 별도의 처리없이 다음과 같이 list에 담습니다.
            numerical_feature = tf.expand_dims(inputs[feature_name], -1)
            numerical_feature_list.append(numerical_feature)

    return encoded_categorical_feature_list, numerical_feature_list

Implement an MLP block

def create_mlp(hidden_units, dropout_rate, activation, normalization_layer, name=None):

    mlp_layers = []
    for units in hidden_units:
        mlp_layers.append(normalization_layer),
        mlp_layers.append(layers.Dense(units, activation=activation))
        mlp_layers.append(layers.Dropout(dropout_rate))

    return keras.Sequential(mlp_layers, name=name)

Experiment 1: a baseline model

첫 번째 실험으로, 간단한 multi-layer feed-forward network를 만듭니다.

def create_baseline_model(
    embedding_dims, num_mlp_blocks, mlp_hidden_units_factors, dropout_rate
):

    # model inputs를 생성합니다.
    inputs = create_model_inputs()
    # categorical, numerical feature를 인코딩합니다.
    encoded_categorical_feature_list, numerical_feature_list = encode_inputs(
        inputs, embedding_dims
    )
    # 모든 feature를 합칩니다.
    features = layers.concatenate(
        encoded_categorical_feature_list + numerical_feature_list
    )
    # features 마지막 차원을 hidden_units 하이퍼파라미터로 사용합니다.
    feedforward_units = [features.shape[-1]]

    # Create several feedforwad layers with skip connections.
    for layer_idx in range(num_mlp_blocks):
        features = create_mlp(
            hidden_units=feedforward_units,
            dropout_rate=dropout_rate,
            activation=keras.activations.gelu,
            normalization_layer=layers.LayerNormalization(epsilon=1e-6),
            name=f"feedforward_{layer_idx}",
        )(features)

    # Compute MLP hidden_units.
    mlp_hidden_units = [
        factor * features.shape[-1] for factor in mlp_hidden_units_factors
    ]
    # Create final MLP.
    features = create_mlp(
        hidden_units=mlp_hidden_units,
        dropout_rate=dropout_rate,
        activation=keras.activations.selu,
        normalization_layer=layers.BatchNormalization(),
        name="MLP",
    )(features)

    # Add a sigmoid as a binary classifer.
    outputs = layers.Dense(units=1, activation="sigmoid", name="sigmoid")(features)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model


baseline_model = create_baseline_model(
    embedding_dims=EMBEDDING_DIMS,
    num_mlp_blocks=NUM_MLP_BLOCKS,
    mlp_hidden_units_factors=MLP_HIDDEN_UNITS_FACTORS,
    dropout_rate=DROPOUT_RATE,
)

print("Total model weights:", baseline_model.count_params())
keras.utils.plot_model(baseline_model, show_shapes=True, rankdir="LR")

훈련 및 평가를 수행합니다.

history = run_experiment(
    model=baseline_model,
    train_data_file=train_data_file,
    test_data_file=test_data_file,
    num_epochs=NUM_EPOCHS,
    learning_rate=LEARNING_RATE,
    weight_decay=WEIGHT_DECAY,
    batch_size=BATCH_SIZE,
)

Experiment 2: TabTransformer

Tabtransformer 구조는 다음과 같습니다.

  1. 모든 categorical feature는 동일한 embedding_dims로 category feature embedding됩니다. 각 categorical feature가 고유한 임베딩 벡터를 가지게 됩니다.
  2. categorical feature인 column에 대해 column embedding이 추가됩니다. 예제 모델은 각 column을 표현할 수 있는 Embedding Layer를 추가해서 1번의 categorical feature 임베딩 벡터와 더해줍니다.
  3. 임베딩된 categorical feature는 트랜스포머에 입력됩니다. 각 트랜스포머 블럭은 multi-head self-attention layer와 feed-forward layer로 구성됩니다.
  4. categorical feature의 contextual embedding을 담당하는 마지막 Transformer layer에서 numerical feature와 concat을 수행한 뒤, MLP block에 입력됩니다.
  5. 1번 실험과 다르게 softmax classifier가 사용됩니다.(?)

논문에서 column embedding에 대한 내용을 자세하게 다루며, 모델 구조를 볼 수 있습니다.

모델 구성 순서입니다.

categorical, numerical encoding&embedding → categorical column embedding →
categorical embedding vector + column embedding vector → (Multi-head attention → skip connection →
MLP block → skip connection) → concat with numerical features → MLP block → Classifier

def create_tabtransformer_classifier(
    num_transformer_blocks,
    num_heads,
    embedding_dims,
    mlp_hidden_units_factors,
    dropout_rate,
    use_column_embedding=False,
):

    # model inputs를 생성합니다.
    inputs = create_model_inputs()
    # 각 feature를 인코딩합니다.
    encoded_categorical_feature_list, numerical_feature_list = encode_inputs(
        inputs, embedding_dims
    )
    # categorical feature는 Transformer에 입력하기 위해 stack 합니다.
    # (None, 8, 16)이 됩니다.
    encoded_categorical_features = tf.stack(encoded_categorical_feature_list, axis=1)
    # (None, 5)가 됩니다.
    numerical_features = layers.concatenate(numerical_feature_list)

    # categorical feature embedding에 column embedding을 추가합니다.
    if use_column_embedding:
        num_columns = encoded_categorical_features.shape[1]
        column_embedding = layers.Embedding(
            input_dim=num_columns, output_dim=embedding_dims
        )
        column_indices = tf.range(start=0, limit=num_columns, delta=1)
        # (None, 8, 16) + (8, 16)
        encoded_categorical_features = encoded_categorical_features + column_embedding(
            column_indices
        )

    # Create multiple layers of the Transformer block.
    for block_idx in range(num_transformer_blocks):
        # Create a multi-head attention layer.
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads,
            key_dim=embedding_dims,
            dropout=dropout_rate,
            name=f"multihead_attention_{block_idx}",
        )(encoded_categorical_features, encoded_categorical_features)
        # Skip connection 1.
        x = layers.Add(name=f"skip_connection1_{block_idx}")(
            [attention_output, encoded_categorical_features]
        )
        # Layer normalization 1.
        x = layers.LayerNormalization(name=f"layer_norm1_{block_idx}", epsilon=1e-6)(x)
        # Feedforward.
        feedforward_output = create_mlp(
            hidden_units=[embedding_dims],
            dropout_rate=dropout_rate,
            activation=keras.activations.gelu,
            normalization_layer=layers.LayerNormalization(epsilon=1e-6),
            name=f"feedforward_{block_idx}",
        )(x)
        # Skip connection 2.
        x = layers.Add(name=f"skip_connection2_{block_idx}")([feedforward_output, x])
        # Layer normalization 2.
        encoded_categorical_features = layers.LayerNormalization(
            name=f"layer_norm2_{block_idx}", epsilon=1e-6
        )(x)

    # Flatten the "contextualized" embeddings of the categorical features.
    categorical_features = layers.Flatten()(encoded_categorical_features)
    # Apply layer normalization to the numerical features.
    numerical_features = layers.LayerNormalization(epsilon=1e-6)(numerical_features)
    # Prepare the input for the final MLP block.
    features = layers.concatenate([categorical_features, numerical_features])

    # Compute MLP hidden_units.
    mlp_hidden_units = [
        factor * features.shape[-1] for factor in mlp_hidden_units_factors
    ]
    # Create final MLP.
    features = create_mlp(
        hidden_units=mlp_hidden_units,
        dropout_rate=dropout_rate,
        activation=keras.activations.selu,
        normalization_layer=layers.BatchNormalization(),
        name="MLP",
    )(features)

    # Add a sigmoid as a binary classifer.
    outputs = layers.Dense(units=1, activation="sigmoid", name="sigmoid")(features)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model


tabtransformer_model = create_tabtransformer_classifier(
    num_transformer_blocks=NUM_TRANSFORMER_BLOCKS,
    num_heads=NUM_HEADS,
    embedding_dims=EMBEDDING_DIMS,
    mlp_hidden_units_factors=MLP_HIDDEN_UNITS_FACTORS,
    dropout_rate=DROPOUT_RATE,
)

print("Total model weights:", tabtransformer_model.count_params())
keras.utils.plot_model(tabtransformer_model, show_shapes=True, rankdir="LR")

훈련 및 평가를 진행합니다.

history = run_experiment(
    model=tabtransformer_model,
    train_data_file=train_data_file,
    test_data_file=test_data_file,
    num_epochs=NUM_EPOCHS,
    learning_rate=LEARNING_RATE,
    weight_decay=WEIGHT_DECAY,
    batch_size=BATCH_SIZE,
)

추가로 아래 예제 마지막 결론을 간단히 해석해보면
TabTransformer는 Embedding이 핵심 아이디어이기 때문에 unlabeled 데이터를 pre-train에 활용할 수 있다고 합니다. 아마 semi-supervised를 표현하는 것 같아 보입니다.

TabTransformer significantly outperforms MLP and recent deep networks for tabular data while matching the performance of tree-based ensemble models. TabTransformer can be learned in end-to-end supervised training using labeled examples. For a scenario where there are a few labeled examples and a large number of unlabeled examples, a pre-training procedure can be employed to train the Transformer layers using unlabeled data. This is followed by fine-tuning of the pre-trained Transformer layers along with the top MLP layer using the labeled data.

이 글은 다음 예제를 번역한 것입니다.


Introduction

이번 예제는 CSV 파일에 포함되어 있는 구조 데이터(structured data)를 활용하여 분류 문제를 해결합니다. 데이터는 numerical과 categorical 특성들로 구성되어 있습니다. 우리는 Keras Preprocessing Layer를 사용해서 numerical 특성을 정규화하고 categorical 특성을 벡터화할 것입니다.

주의! 이번 예제는 TensorFlow 2.3 이상의 버전에서 실행해야 합니다.

The Dataset

사용할 데이터셋은 Cleveland Clinic Foundation에서 제공한 심장병 관련 데이터셋입니다. 환자 정보를 포함하고 있는 303개의 데이터(sample)로 이루어져 있으며, 각 열에서 해당 속성(feature)을 살펴볼 수 있습니다. 이를 활용하여 심장병의 여부(binary classification)를 예측해 볼 것입니다.

Age Age in years Numerical
Sex (1 = male; 0 = female) Categorical
CP Chest pain type (0, 1, 2, 3, 4) Categorical
Trestbpd Resting blood pressure (in mm Hg on admission) Numerical
Chol Serum cholesterol in mg/dl Numerical
FBS fasting blood sugar in 120 mg/dl (1 = true; 0 = false) Categorical
RestECG Resting electrocardiogram results (0, 1, 2) Categorical
Thalach Maximum heart rate achieved Numerical
Exang Exercise induced angina (1 = yes; 0 = no) Categorical
Oldpeak ST depression induced by exercise relative to rest Numerical
Slope Slope of the peak exercise ST segment Numerical
CA Number of major vessels (0-3) colored by fluoroscopy Both numerical & categorical
Thal 3 = normal; 6 = fixed defect; 7 = reversible defect Categorical
Target Diagnosis of heart disease (1 = true; 0 = false) Target

 

Setup

import tensorflow as tf
import numpy as np
import pandas as pd
from tensorflow import keras
from tensorflow.keras import layers

Preparing the data

데이터를 다운로드받습니다.

file_url = "http://storage.googleapis.com/download.tensorflow.org/data/heart.csv"
dataframe = pd.read_csv(file_url)

303개 데이터와 14개 column(13 feature + 1 target label)으로 이루어져 있습니다.

dataframe.shape
# (303, 14)

head() 함수로 간단히 살펴봅니다.

dataframe.head()

마지막 속성은 "target"으로 '1'이면 심장병, '0'이면 심장병이 아닌 환자를 의미합니다.

training, validation set으로 나누겠습니다.

# 20% 샘플을 뽑고,
val_dataframe = dataframe.sample(frac=0.2, random_state=1337)
# 위에서 뽑힌 샘플을 제거해서 훈련 데이터셋으로 활용합니다.
train_dataframe = dataframe.drop(val_dataframe.index)

print(
    "Using %d samples for training and %d for validation"
    % (len(train_dataframe), len(val_dataframe))
)

tf.data.Dataset 객체를 만들어줍니다.

def dataframe_to_dataset(dataframe):
    dataframe = dataframe.copy()
    labels = dataframe.pop("target")
    ds = tf.data.Dataset.from_tensor_slices((dict(dataframe), labels))
    ds = ds.shuffle(buffer_size=len(dataframe))
    
    return ds


train_ds = dataframe_to_dataset(train_dataframe)
val_ds = dataframe_to_dataset(val_dataframe)

각 Dataset은 (input, target)을 반환합니다.

  • input: dictionary of feature
  • target: 1 or 0 value
for x, y in train_ds.take(1):
    print("Input:", x)
    print("Target:", y)
    
# Input: {'age': <tf.Tensor: shape=(), dtype=int64, numpy=60>, 'sex': <tf.Tensor: shape=(), dtype=int64, numpy=1>, 'cp': <tf.Tensor: shape=(), dtype=int64, numpy=4>, 'trestbps': <tf.Tensor: shape=(), dtype=int64, numpy=117>, 'chol': <tf.Tensor: shape=(), dtype=int64, numpy=230>, 'fbs': <tf.Tensor: shape=(), dtype=int64, numpy=1>, 'restecg': <tf.Tensor: shape=(), dtype=int64, numpy=0>, 'thalach': <tf.Tensor: shape=(), dtype=int64, numpy=160>, 'exang': <tf.Tensor: shape=(), dtype=int64, numpy=1>, 'oldpeak': <tf.Tensor: shape=(), dtype=float64, numpy=1.4>, 'slope': <tf.Tensor: shape=(), dtype=int64, numpy=1>, 'ca': <tf.Tensor: shape=(), dtype=int64, numpy=2>, 'thal': <tf.Tensor: shape=(), dtype=string, numpy=b'reversible'>}
# Target: tf.Tensor(1, shape=(), dtype=int64)

Dataset에 배치 크기를 설정합니다.

train_ds = train_ds.batch(32)
val_ds = val_ds.batch(32)

 

Feature preprocessing with Keras layers

정수형태로 인코딩되어 있는 categorical feature은 다음과 같습니다.

  • sex, cp, fbs, restecg, exang, ca

CategoryEncoding() layer를 사용해서 one-hot encoding 형태로 바꿔줄 것입니다. 또, thal은 문자형태로 인코딩되어 있는 categorical feature입니다. 먼저 StringLookup() layer를 사용해서 해당 속성이 가지고 있는 값들을 전부 index 형태로 변환하고, CategoryEncoding() layer를 사용해서 one-hot 인코딩을 동일하게 진행해줄 것입니다.

마지막으로, 연속적인 정수 형태인 feature는 다음과 같습니다.

  • age, trestbps, chol, thalach, oldpeak, slope

이들에게는 Normalization() layer를 사용해서 평균 0, 표준 편차를 1로 만들어 줄 것입니다.

이 작업을 수행하기 위해 세 가지 유틸리티 함수를 정의해서 사용합니다.

  • encode_numerical_feature: Normalization을 적용하기 위한 함수
  • encode_string_categorical_feature: 문자 형태로 된 열에 대해 one-hot encoding을 적용
  • encode_integer_categorical_feature: 숫자 형태로 된 열에 대해 one-hot encoding을 적용
from tensorflow.keras.layers.experimental.preprocessing import Normalization
from tensorflow.keras.layers.experimental.preprocessing import CategoryEncoding
from tensorflow.keras.layers.experimental.preprocessing import StringLookup


def encode_numerical_feature(feature, name, dataset):
    # Normalization layer 정의
    normalizer = Normalization()

    # 적용할 열만 가져옵니다.
    feature_ds = dataset.map(lambda x, y: x[name])
    feature_ds = feature_ds.map(lambda x: tf.expand_dims(x, -1))

    # 데이터의 통계치 학습
    normalizer.adapt(feature_ds)

    # 정규화 진행
    encoded_feature = normalizer(feature)
    
    return encoded_feature


def encode_string_categorical_feature(feature, name, dataset):
    # 문자열로 되어 있는 값을 integer 형태로 변환하기 위한 StringLookup() 함수 정의
    index = StringLookup()
    # one-hot encoding 진행을 위한 함수 정의
    encoder = CategoryEncoding(output_mode="binary")
    
    # 적용할 열만 가져옵니다.
    feature_ds = dataset.map(lambda x, y: x[name])
    feature_ds = feature_ds.map(lambda x: tf.expand_dims(x, -1))
    
    # 전체 순서: index adapt -> index 형태로 변환 -> categorical encoding adapt -> categorical 형태로 변환
    
    # feature_ds가 반환하는 문자열 데이터가 어떤 것이 있는지 확인합니다.
    # 문자열로 된 값과, integer index를 부여하기 위한 작업입니다.
    index.adapt(feature_ds)

    # (실제 반환 데이터) integer 형태로 변환합니다.
    encoded_feature = index(feature)
    
    # Dataset 객체가 index 변환 작업을 수행하도록 적용해줍니다.
    feature_ds = feature_ds.map(index)
    
    # feature_ds가 반환하는 index 데이터가 어떤 것이 있는지 확인합니다.
    encoder.adapt(feature_ds)

    # (실제 반환 데이터) one-hot encoding을 적용하고 return합니다.
    # 데이터셋 객체에 다시 map 해줄 필요는 없습니다.
    encoded_feature = encoder(encoded_feature)
    
    return encoded_feature

# 위와 동일
def encode_integer_categorical_feature(feature, name, dataset):
    # Create a CategoryEncoding for our integer indices
    encoder = CategoryEncoding(output_mode="binary")

    # Prepare a Dataset that only yields our feature
    feature_ds = dataset.map(lambda x, y: x[name])
    feature_ds = feature_ds.map(lambda x: tf.expand_dims(x, -1))

    # Learn the space of possible indices
    encoder.adapt(feature_ds)

    # Apply one-hot encoding to our indices
    encoded_feature = encoder(feature)
    return encoded_feature

 

Build a model

end-to-end 모델을 만들어봅니다.

# Numerical -> one-hot encoding
sex = keras.Input(shape=(1,), name="sex", dtype="int64")
cp = keras.Input(shape=(1,), name="cp", dtype="int64")
fbs = keras.Input(shape=(1,), name="fbs", dtype="int64")
restecg = keras.Input(shape=(1,), name="restecg", dtype="int64")
exang = keras.Input(shape=(1,), name="exang", dtype="int64")
ca = keras.Input(shape=(1,), name="ca", dtype="int64")

# String -> one-hot encoding
thal = keras.Input(shape=(1,), name="thal", dtype="string")

# Numerical features
age = keras.Input(shape=(1,), name="age")
trestbps = keras.Input(shape=(1,), name="trestbps")
chol = keras.Input(shape=(1,), name="chol")
thalach = keras.Input(shape=(1,), name="thalach")
oldpeak = keras.Input(shape=(1,), name="oldpeak")
slope = keras.Input(shape=(1,), name="slope")

all_inputs = [sex, cp, fbs,
                restecg, exang, ca,
                thal, age, trestbps,
                chol, thalach, oldpeak,slope]

# Integer categorical features
sex_encoded = encode_integer_categorical_feature(sex, "sex", train_ds)
cp_encoded = encode_integer_categorical_feature(cp, "cp", train_ds)
fbs_encoded = encode_integer_categorical_feature(fbs, "fbs", train_ds)
restecg_encoded = encode_integer_categorical_feature(restecg, "restecg", train_ds)
exang_encoded = encode_integer_categorical_feature(exang, "exang", train_ds)
ca_encoded = encode_integer_categorical_feature(ca, "ca", train_ds)

# String categorical features
thal_encoded = encode_string_categorical_feature(thal, "thal", train_ds)

# Numerical features
age_encoded = encode_numerical_feature(age, "age", train_ds)
trestbps_encoded = encode_numerical_feature(trestbps, "trestbps", train_ds)
chol_encoded = encode_numerical_feature(chol, "chol", train_ds)
thalach_encoded = encode_numerical_feature(thalach, "thalach", train_ds)
oldpeak_encoded = encode_numerical_feature(oldpeak, "oldpeak", train_ds)
slope_encoded = encode_numerical_feature(slope, "slope", train_ds)

all_features = layers.concatenate(
    [
        sex_encoded,
        cp_encoded,
        fbs_encoded,
        restecg_encoded,
        exang_encoded,
        slope_encoded,
        ca_encoded,
        thal_encoded,
        age_encoded,
        trestbps_encoded,
        chol_encoded,
        thalach_encoded,
        oldpeak_encoded,
    ]
)

x = layers.Dense(32, activation="relu")(all_features)
x = layers.Dropout(0.5)(x)

output = layers.Dense(1, activation="sigmoid")(x)

# 모델 생성
model = keras.Model(all_inputs, output)
model.compile("adam", "binary_crossentropy", metrics=["accuracy"])

 

Train the model

model.fit(train_ds, epochs=50, validation_data=val_ds)

약 82% 정확도를 얻을 수 있었습니다.

Inference on new data

model.predict()를 활용해서 새로운 환자의 심장병 여부를 확인해보죠.

  1. batch dimension을 까먹지 않도록 주의해야 합니다. 모델은 batch_dimension에서 과정에 수행됩니다.
  2. 새로운 샘플에서 값을 tensor 형태로 변환하기 위해 convert_to_tensor 활용
sample = {
    "age": 60,
    "sex": 1,
    "cp": 1,
    "trestbps": 145,
    "chol": 233,
    "fbs": 1,
    "restecg": 2,
    "thalach": 150,
    "exang": 0,
    "oldpeak": 2.3,
    "slope": 3,
    "ca": 0,
    "thal": "fixed",
}

input_dict = {name: tf.convert_to_tensor([value]) for name, value in sample.items()}
predictions = model.predict(input_dict)

print(
    "This particular patient had a %.1f percent probability "
    "of having a heart disease, as evaluated by our model." % (100 * predictions[0][0],)
)

# This particular patient had a 20.8 percent probability of having a heart disease, 
# as evaluated by our model.