이 글은 다음 예제를 번역한 것입니다.


Introduction

이 예제는 JPEG 파일을 직접 저장하고, 미리 학습된 모델을 사용하지 않으면서 이미지 분류를 어떻게 행하는지 알아보는 예제입니다. 'Cats vs Dogs' 데이터셋을 케라스를 활용하여 분류해봅니다.

데이터셋 생성을 위해 image_dataset_from_directory를 사용하고, 표준화와 augmentation을 위해 Keras Preprocessing Layer를 사용해봅니다.

Setup

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

 

Load the data: the Cats vs Dogs dataset

Raw data download

먼저, 765M 크기의 데이터를 다운로드합니다.
예제와 다르게 그냥 쥬피터 노트북 상에서 zip파일을 다운받고 싶으면, 아래 코드를 활용하면 됩니다.

import requests

target_path = 'datasets.zip'

url = 'https://download.microsoft.com/download/3/E/1/3E1C3F21-ECDB-4869-8368-6DEBA77B919F/kagglecatsanddogs_3367a.zip'
response = requests.get(url, stream = True)
handle = open(target_path, 'wb')

for chunk in response.iter_content(chunk_size = 512):
    if chunk:
        handle.write(chunk)
        
handle.close()   

압축을 풀었다면, 최상위 폴더에는 Cat, Dog 하위 폴더가 존재합니다.

Filter out corrupted images

손상된 이미지는 어디에나 존재합니다. 여기서는 "JFIF" 헤더를 가진 이미지를 필터링해보겠습니다.
필터링 시, 해당 헤더가 아니면 손상된 이미지이므로 제거합니다.

import os

num_skipped = 0
for folder_name in ("Cat", "Dog"):
    folder_path = os.path.join("PetImages", folder_name)
    for fname in os.listdir(folder_path):
        fpath = os.path.join(folder_path, fname) # PetImages/Cat/63.jpg
        try:
            fobj = open(fpath, "rb")
            # fobj.peek(10)은 10바이트를 가져오는데, 여기서는 보통
            # 헤더를 읽기 위해 전체를 가져온다고 생각해도 무방합니다.
            is_jfif = tf.compat.as_bytes("JFIF") in fobj.peek(10)
        finally:
            fobj.close()

        if not is_jfif:
            num_skipped += 1
            # Delete corrupted image
            os.remove(fpath)

print("Deleted %d images" % num_skipped)

 

Generate a Dataset

image_size = (180, 180)
batch_size = 32

train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    "PetImages",
    validation_split=0.2,
    subset="training",
    seed=1337,
    image_size=image_size,
    batch_size=batch_size,
)
val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    "PetImages",
    validation_split=0.2,
    subset="validation",
    seed=1337,
    image_size=image_size,
    batch_size=batch_size,
)

Visualize the data

9개 이미지를 그려봅니다. '1'은 Dog이고, '0'은 Cat에 해당합니다.

import matplotlib.pyplot as plt

# figure 크기를 조절합니다.
plt.figure(figsize=(10, 10))

# 배치 하나를 가져옵니다.
for images, labels in train_ds.take(1):
    for i in range(9):
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"))
        plt.title(int(labels[i]))
        plt.axis("off")

Using Image data augmentation

데이터가 다양하지 않거나, 데이터 개수가 충분하지 않을때 랜덤으로 회전시키거나 fliping하는 등 augmentation을 활용하면 이를 해결할 수 있습니다. 이는 모델이 다양한 데이터를 볼 수 있게 도와줌과 동시에 이를 통해 Overfitting을 (어느정도) 극복할 수 있습니다.

data_augmentation = keras.Sequential(
    [
        layers.experimental.preprocessing.RandomFlip("horizontal"),
        layers.experimental.preprocessing.RandomRotation(0.1),
    ]
)

data_augmentation을 통해 변형된 이미지를 그려봅니다.

plt.figure(figsize=(10, 10))
for images, _ in train_ds.take(1):
    for i in range(9):
        augmented_images = data_augmentation(images)
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(augmented_images[0].numpy().astype("uint8"))
        plt.axis("off")

 

Standardizing the data

현재 우리가 사용할 이미지는 픽셀 type float32, 크기는 180x180으로 이미 어느정도 표준화가 되어 있습니다. 하지만 RGB channel의 값들이 [0, 255] 값을 가지고 있어, 신경망을 사용하기엔 적합하지 않습니다. 따라서 Rescaling layer를 활용해서 [0, 1] 범위로 변환해줄 것입니다.

 

Two options to preprocess the data

augmentation을 적용할 수 있는 두 가지 방법이 있습니다.

>> Option 1: Make it part of the model like this:

inputs = keras.Input(shape=input_shape)
x = data_augmentation(inputs)
x = layers.experimental.preprocessing.Rescaling(1./255)(x)
...  # Rest of the model

이 방법은 모델 실행에서 동시에 사용될 수 있기 때문에 GPU를 사용한다는 큰 장점을 활용할 수 있습니다.

이 방법은 test time에는 동작하지 않기 때문에, evaluate(), predict()에서는 활용되지 않고, 오로지 fit() 동안에만 사용됩니다. 또한, GPU를 보유하고 있다면 이 방법을 긍정적으로 고려해볼 수 있습니다.

>> Option 2: apply it to the dataset, so as to obtain a dataset that yields batches of augmented images, like this:

augmented_train_ds = train_ds.map(
  lambda x, y: (data_augmentation(x, training=True), y))

이 방법은 CPU에서 주로 사용되고, 버퍼를 통해 모델에 입력됩니다.

CPU를 활용하여 학습하는 경우라면 이 방법이 더 효과적입니다.

 

Configure the dataset for performance

prefetch 옵션을 활용하면 더욱 효율적으로 데이터를 생성하여 모델에 입력할 수 있습니다.

train_ds = train_ds.prefetch(buffer_size=32)
val_ds = val_ds.prefetch(buffer_size=32)

 

Build a model

Xception의 small version을 작성해보겠습니다. 이 모델에 대해서는 최적화를 진행하지 않을 것이고, 만약 최적화에 관심이 있다면 Keras Tuner를 고려해보세요.

강조 사항:

  • data_augmentation을 사용하고, 추가로 Rescaling layer를 활용합니다.
  • 마지막 classification layer에 Dropout을 추가합니다.
def make_model(input_shape, num_classes):
    inputs = keras.Input(shape=input_shape)
    # 본격적으로 모델에 입력하기 전에 여기서 augmentation이 진행됩니다.
    # inference time에는 동작하지 않습니다.
    x = data_augmentation(inputs)

    # [0, 1] 변환을 위해 Rescaling Layer를 활용합니다.
    x = layers.experimental.preprocessing.Rescaling(1.0 / 255)(x)
    x = layers.Conv2D(32, 3, strides=2, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    x = layers.Conv2D(64, 3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    previous_block_activation = x  # Set aside residual

    for size in [128, 256, 512, 728]:
        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(size, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(size, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.MaxPooling2D(3, strides=2, padding="same")(x)

        # Project residual
        residual = layers.Conv2D(size, 1, strides=2, padding="same")(
            previous_block_activation)
        x = layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    x = layers.SeparableConv2D(1024, 3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    x = layers.GlobalAveragePooling2D()(x)
    if num_classes == 2:
        activation = "sigmoid"
        units = 1
    else:
        activation = "softmax"
        units = num_classes

    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(units, activation=activation)(x)
    
    return keras.Model(inputs, outputs)

# image size에 (3, ) 채널을 추가합니다.
model = make_model(input_shape=image_size + (3,), num_classes=2)

 

Train the model

epochs = 50

callbacks = [
    keras.callbacks.ModelCheckpoint("save_at_{epoch}.h5"),
]
model.compile(optimizer=keras.optimizers.Adam(1e-3),
              loss="binary_crossentropy",
              metrics=["accuracy"])
model.fit(train_ds, epochs=epochs, 
          callbacks=callbacks, validation_data=val_ds)

학습을 진행해보세요.

 

Run inference on new data

inference time에는 Dropout과 data augmentation이 비활성화 상태가 됩니다.
하지만 Rescaling layer는 그대로 활용되기 때문에 테스트용 데이터로 추론을 진행할 때 올바른 결과를 얻을 수 있습니다.

img = keras.preprocessing.image.load_img("PetImages/Cat/6779.jpg", 
                                         target_size=image_size)
img_array = keras.preprocessing.image.img_to_array(img)
img_array = tf.expand_dims(img_array, 0)  # Create batch axis

predictions = model.predict(img_array)
score = predictions[0]
print(
    "This image is %.2f percent cat and %.2f percent dog."
    % (100 * (1 - score), 100 * score)
)