https://www.youtube.com/watch?v=kVEOCfBy9uY&list=PLQY2H8rRoyvzIuB8rZXs7pfyjiSUs8Vza&index=2


tf.data

이 동영상은 tf.data에 관한 동영상입니다. 본 영상은 여러 언어적 관점에서 다루지만 글은 Python view에서 나오는 내용만 적었습니다.

dataset = tf.data.TFRecordDataset(files)
dataset = dataset.shuffle(buffer_size = X)
dataset = dataset.map(lambda record: parse(record))
dataset = dataset.batch(batch_size = Y)

for element in datset:
	...

기존 tf 1.x와 다르게 매우 간단하게 정리되었습니다. eager execution이 본격적으로 사용되는 것처럼 위에서도 for-loop를 사용하여 데이터를 전달받을 수 있습니다. 

기존의  tf 1.x에서는 iterator를 초기화 한 후 사용해야 하는 등 좀 더 복잡한 과정을 거쳤었습니다.

전체적으로는 다음의 프로세스를 가진다. 'tfrecord -> shuffle -> map -> batch'

C++의 요소를 넣어 performance 측면에서 이득을 취했고, 그 결과 다시 다음의 코드를 만나볼 수 있다.

dataset = tf.data.TFRecordDataset(files) 
dataset = dataset.shuffle(buffer_size = X) 
dataset = dataset.map(lambda record: parse(record), num_parallel_cells =…) 
dataset = dataset.batch(batch_size = Y) 
dataset = dataset.prefetch(buffer_size=…)

for element in datset: …

 

supporting non-tensor types

텐서 타입이 아닌 타입을 다루기 위해서는 다음 이미지에 나오는 코드로 변환하여 tf.data를 사용할 수 있게 된다.

map이나 tensor_slice와 같은 함수는 기본적으로 tf.data Structure을 거쳐서 output 또는 input으로 반환되는 것을 볼 수 있다.

 

Static Optimizations

이 부분은 자세히 설명해주지는 않았지만, 대충 최적화를 위한 함수들이 있는 곳이라고 생각하면 될 것 같다. 다음과 같이 여러개의 함수가 존재한다.

  • map + batch fusion
  • map + filter fusion
  • map + map fusion
  • map parallelization
  • map vectorization
  • noop elimination
  • shuffle + repeat fusion

그리고 다음과 같이 사용한다.

(이 다음으로 C++ 측면에서 내부적으로 어떻게 흘러가는지 보여주지만 글에서는 생략한다.)

Dynamic optimization

아래 퀴즈에서 시작한다. 얼마나 많은 시간이 소요되겠는가?

모든 작업이 순차적으로 이루어지기 때문에 그렇다고 한다. 
그렇다면 다음 작업은 얼마나 걸리겠는가에 대한 퀴즈를 또 한번 낸다.

첫 번째 질문과의 차이는 num_parallel_calls의 차이이다. background에서 서로 다른 커널에서 수행되는 것 같은데, 감이 잘 안온다. 아무튼 여러가지의 스레드를 만들어서 동시작업을 행한다고 생각하면 편하다. 
하지만 맨 처음 element를 처리하는데 걸리는 시간은 첫 번째 문제와 같다는 것.

num_parallel_calls의 수가 늘어날 수록 output latency도 줄어든다. 하지만 그 수가 늘어난다고 해서 매우 많은 수의 thread를 만든다는 의미는 아니고, 여러 작업의 schedule을 동시에 처리해서 그 시간을 줄인다는 의미인 것 같다.

이 글은 다음 텐서플로우 유튜브 동영상을 공부하면서 끄적끄적한 글입니다.

https://www.youtube.com/watch?v=IzKXEbpT9Lg&list=PLQY2H8rRoyvzIuB8rZXs7pfyjiSUs8Vza


일단 이 동영상은 전체적으로 텐서플로우 내부의 흐름을 다루고, 이를 예를 들어 설명하고 있습니다.

 

Base API에서 사용되던 tf.cond, tf.while_loop는 Higher-level이나 Low-level API에서 따로 통용되는 함수가 존재합니다.

하지만, tf 2.0으로 넘어오면서 "Functional" ops로 변경되어 If나 While문과 같이 파이썬 문법처럼 사용할 수 있게 되었습니다.
(Eager Execution, Autograph)

첫번쨰로 Control flow v1이라고 해서 low-ops에 대해 설명합니다. 

영상에서는 tf.cond(x < y, lambda: tf.add(x, z), lambda: tf.square(y))와 몇 개의 예를 들면서 low-ops에서의 흐름을 설명합니다. 쉽게 예에 대해서 그래프의 흐름을 설명해주고 있습니다. 이를 구성하기 위해 대표적으로 Switch와 Merge의 표현을 사용하게 됩니다.

이것의 장점은 Pruning이 가능하고, dataflow만 잘 만들면된다. 하지만 분석이 어렵고 매우 복잡하다. 이를 매우 nested하다 라고 표현하고 있습니다. 

결과적으로 tf 내부적으로 어떻게 control flow가 구성되는지 tf.cond(), tf.while_loop()를 통해 설명하고 있는겁니다. 다음 사이트가 이를 잘 설명해주고 있습니다.

https://towardsdatascience.com/tensorflow-control-flow-tf-cond-903e020e722a

두번째로는 Control flow v2를 설명합니다. 

여기에서는 그래프가 좀 더 함수처럼 동작하는 것을 보여주고 있습니다. 특별한 점은 없으며 텐서플로우의 기본적인 표현입니다. 각각의 함수가 텐서 인풋과 텐서 아웃풋을 별개로 가지고 있고, 매우 simple해 졌습니다. 또한, v1에서는 true와 false가 매치되어 있다면, v2에서는 별개의 함수로 작용하는 것처럼 구성되어 있습니다. 

세번째, Gradient입니다.

tf.cond, tf_while_loop를 예로 들며, 이에 대해 내부적으로 gradient가 어떻게 만들어지는지 설명합니다. 

Control flow v2의 장점은 v1의 단점을 보완한다는 것입니다. 단순, 디버깅, 통합 등등. 하지만 이 때문에 성능적으로 좀 떨어진다는 단점이 생깁니다. 따라서 이에 대한 해결방법은 표현을 v2로 쓰되, 내부적으로 v1의 기능을 사용하여 성능과 속도를 동시에 잡는 것입니다. 이를 Lowering이라고 표현하고 있습니다. 

또한, 향후에는 control flow v1과 같은 표현을 되도록이면 제거하면 좋겠다고 말하네요. 이를 위해 세션을 제거하고, TensorArray를 재구성했다고 합니다. 모든 것을 Simple하게 바꾸면서도 efficient하게 하려는 작업인 것 같습니다. 

 

클래스 불균형(Class Imbalanced)

위의 그림처럼 우리의 데이터에서 클래스가 불균형하게 분포되어 있을 때 나타납니다. 주로 특이한 경우(은행 거래 사기,희귀 질병, 기계 불량음 등)가 포함되어 있는 데이터에서 두드러집니다. 이러한 문제들을 비정상 탐지(Anomaly Detection)이라고 부릅니다. 우리가 학습시킬 모델은 균형이 잡힌 많고, 다양한 클래스를 보는 것을 좋아합니다.


과소표집(UnderSampling)과 과대표집(OverSampling)

과소표집은 다른 클래스에 비해 상대적으로 많이 나타나있는 클래스의 개수를 줄이는 것입니다. 이를 통해 균형을 유지할 수 있게 되지만, 제거하는 과정에서 유용한 정보가 버려지게 되는 것이 큰 단점입니다. 

과대표집은 데이터를 복제하는 것입니다. 무작위로 하는 경우도 있고, 기준을 미리 정해서 복제하는 방법도 있습니다. 정보를 잃지 않고, 훈련용 데이터에서 높은 성능을 보이지만 실험용 데이터에서의 성능은 낮아질 수 있습니다. 대부분의 과대표집 방법은 Overfitting의  문제를 포함하고 있습니다. 이를 피하기 위해 주로 SMOTE(Synthetic Minority Over-sampling Technique)를 사용합니다. 간단히 설명하자면, 데이터의 개수가 적은 클래스의 표본(Sample)을 가져온 뒤에 임의의 값을 추가하여 새로운 샘플을 만들어 데이터에 추가합니다. 이 과정에서 각 표본은 주변 데이터를 고려하기 때문에 과대적합의 가능성이 낮아지게 됩니다.

 실제 현업에서는 특정 클래스의 데이터의 개수가 적은 경우가 대부분입니다. 제품을 예로 들면, 한 제품이 정상 제품일 확률이 불량품일 확률보다 현저히 높기 때문에 불량품에 대한 데이터는 상대적으로 적을 것 입니다. 

 

Model.fit 후에 세션이 종료되고 램을 놔주지 않는 현상이 있다.

* 케라스 내부적으로 해결이 되지 않은 이슈라는 글을 본적이 있다. 


1. del model    * 해결되지 않음

2. gc.collect() + gc.collect() 여러번    * 해결되지 않음

3. tf.as_default_graph() 안에서 처리    * 해결되지 않음

4. K.clear_session()    * Case by Case

* 사용은 하고 있지만 죽는 경우도 있고, 안죽는 경우도 있었습니다. jupyter notebook의 경우 같은 cell 안에서 fit과 함께 실행시킨 경우에 될지도 모르겠네요. 

5. 확실한 방법. + 이 방법과 비슷하게 프로세스를 죽이는 방법도 있음. nvidia-smi를 통해 PID를 확인한 후 kill

from numba import cuda
cuda.select_device(num)
cuda.close()

 

Custom data generator를 만들 때는 keras.utils.Sequence 클래스를 상속하는 것으로 시작합니다.

Sequence는 __getitem__, __len__, on_epoch_end, __iter__를 sub method로서 가지고 있습니다.

따라서, 이들을 우리의 데이터에 맞게 변형하여 사용하게 됩니다.


MNIST는 예를 들기 위해 사용했습니다.

import tensorflow as tf
from tensorflow.keras.utils import Sequence
from tensorflow.keras.utils import to_categorical
import numpy as np

mnist = tf.keras.datasets.mnist

(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

class DataGenerator(Sequence):
    def __init__(self, X, y, batch_size, dim, n_channels, n_classes, shuffle = True):
        self.X = X
        self.y = y if y is not None else y
        self.batch_size = batch_size
        self.dim = dim
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()
        
    def on_epoch_end(self):
        self.indexes = np.arange(len(self.X))
        if self.shuffle:
            np.random.shuffle(self.indexes)
            
    def __len__(self):
        return int(np.floor(len(self.X) / self.batch_size))
    
    def __data_generation(self, X_list, y_list):
        X = np.empty((self.batch_size, *self.dim))
        y = np.empty((self.batch_size), dtype = int)
        
        if y is not None:
            # 지금 같은 경우는 MNIST를 로드해서 사용하기 때문에
            # 배열에 그냥 넣어주면 되는 식이지만,
            # custom image data를 사용하는 경우 
            # 이 부분에서 이미지를 batch_size만큼 불러오게 하면 됩니다. 
            for i, (img, label) in enumerate(zip(X_list, y_list)):
                X[i] = img
                y[i] = label
                
            return X, to_categorical(y, num_classes = self.n_classes)
        
        else:
            for i, img in enumerate(X_list):
                X[i] = img
                
            return X
        
    def __getitem__(self, index):
        indexes = self.indexes[index * self.batch_size : (index + 1) * self.batch_size]
        X_list = [self.X[k] for k in indexes]
        
        if self.y is not None:
            y_list = [self.y[k] for k in indexes]
            X, y = self.__data_generation(X_list, y_list)
            return X, y
        else:
            y_list = None
            X = self.__data_generation(X_list, y_list)
            return X

__data_generation부분을 수정해서 사용하면 됩니다. 원래 같은 경우는 X를 정의할 때, (batch_size, *dim, n_channel)로 정의하나 편의를 위해 지웠습니다. MNIST 데이터셋은 (28, 28, 1)이 아닌 (28, 28)로 인식되기 때문. 필요에 따라 수정하시면 됩니다.


dg = DataGenerator(x_train, y_train, 4, (28, 28), 1, 10)

import matplotlib.pyplot as plt

for i, (x, y) in enumerate(dg):
    if(i <= 1):
        x_first = x[0]
        plt.title(y[0])
        plt.imshow(x_first)