이글은 다음 문서를 참조합니다.

www.tensorflow.org/guide/keras/save_and_serialize

(번역은 자력 + 파파고 + 구글 번역기를 사용하였으니, 부자연스럽더라도 양해바랍니다.)


이번 가이드는 모델을 저장하는 방법과 serialization하는 방법을 다룹니다.

 

Part 1 : Saving Sequential models or Functional models

예제를 보곘습니다.

from tensorflow import keras
from tensorflow.keras import layers

inputs = keras.Input(shape=(784,), name='digits')
x = layers.Dense(64, activation='relu', name='dense_1')(inputs)
x = layers.Dense(64, activation='relu', name='dense_2')(x)
outputs = layers.Dense(10, activation='softmax', name='predictions')(x)

model = keras.Model(inputs=inputs, outputs=outputs, name='3_layer_mlp')
model.summary()

훈련시킨 후 모델의 가중치와 optimizer state를 저장할 수 있습니다. 재미는 없지만 학습하지 않은 모델도 저장 가능합니다.

(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype('float32') / 255
x_test = x_test.reshape(10000, 784).astype('float32') / 255

model.compile(loss='sparse_categorical_crossentropy',
              optimizer=keras.optimizers.RMSprop())
history = model.fit(x_train, y_train,
                    batch_size=64,
                    epochs=1)
# Save predictions for future checks
predictions = model.predict(x_test)

 

Whole-model saving

빌드된 모델을 하나로 저장할 수 있습니다. 모델을 만든 코드가 더 이상 남아있지 않더라도 이 파일을 통해 다시 사용할 수 있습니다.

파일에는 다음 내용이 포함됩니다.

  • 모델 구조
  • weight values
  • training config(compile)
  • optimizer and state
# Save the model
model.save('path_to_my_model.h5')

# Recreate the exact same model purely from the file
new_model = keras.models.load_model('path_to_my_model.h5')

import numpy as np

# Check that the state is preserved
new_predictions = new_model.predict(x_test)
np.testing.assert_allclose(predictions, new_predictions, atol=1e-6)

# Note that the optimizer state is preserved as well:
# you can resume training where you left off.

 

Export to SavedModel

전체모델을 Tensorflow savedmodel format으로 추출할 수 있습니다. Tensorflow 객체용 독립형 serialization format이며 Python 이외의 Tensorflow 구현 뿐만아니라 Tensorflow Serving기능도 제공합니다.

# Export the model to a SavedModel
keras.experimental.export_saved_model(model, 'path_to_saved_model')

# Recreate the exact same model
new_model = keras.experimental.load_from_saved_model('path_to_saved_model')

# Check that the state is preserved
new_predictions = new_model.predict(x_test)
np.testing.assert_allclose(predictions, new_predictions, atol=1e-6)

# Note that the optimizer state is preserved as well:
# you can resume training where you left off.

SavedModel은 다음을 포함합니다.

  • model weights를 포함한 Tensorflow checkpoint
  • Tensorflow graph가 포함된 SavedModel 프로토타입. 예측(Serving), 학습, 평가를 위한 별도의 그래프. 이전에 compile하지 않았다면, inference 그래프만 추출됨.
  • 모델 구조

 

Architecture-only saving

weight나 optimizer는 관심이 없고, 오로지 모델 구조에만 관심이 가는 경우가 있습니다. 이러한 경우에 get_config()를 통해 모델의 config를 검색하면 됩니다. 같은 모델을 재생성할 수 있도록 파이썬 dict구조로 되어 있습니다. 이렇게 불러진 모델은 이전에 학습한 정보는 가지고 있지 않습니다.

config = model.get_config()
reinitialized_model = keras.Model.from_config(config)

# Note that the model state is not preserved! We only saved the architecture.
new_predictions = reinitialized_model.predict(x_test)
assert abs(np.sum(predictions - new_predictions)) > 0.

python dict대신에 to_json(), from_json()을 사용하여 JSON으로도 저장할 수 있습니다. disk에 저장할 때 유용하게 사용됩니다.

json_config = model.to_json()
reinitialized_model = keras.models.model_from_json(json_config)

 

Weights-only saving

weights에만 관심이 있는 경우에 get_weights()를 사용하여 Numpy array로 이루어진 weights를 찾을 수 있습니다. 또한, 모델의 상태를 set_weights로 변경할 수 있습니다.

weights = model.get_weights()  # Retrieves the state of the model.
model.set_weights(weights)  # Sets the state of the model.

get_config() / from_config() 와 get_weights() / set_weights()를 적절히 섞어 사용할 수 있습니다. 하지만, model.save()와는 다르게 이들은 학습 config와 optimizer를 포함하지 않습니다. 따라서 compile을 다시 설정해주어야 합니다.

config = model.get_config()
weights = model.get_weights()

new_model = keras.Model.from_config(config)
new_model.set_weights(weights)

# Check that the state is preserved
new_predictions = new_model.predict(x_test)
np.testing.assert_allclose(predictions, new_predictions, atol=1e-6)

# Note that the optimizer was not preserved,
# so the model should be compiled anew before training
# (and the optimizer will start from a blank state).

get_weights() / set_weights()와 같은 의미이면서 디스크에 저장하려면 save_weighs(fpath) / load_weights(fpath)를 사용하면 됩니다.

# Save JSON config to disk
json_config = model.to_json()
with open('model_config.json', 'w') as json_file:
    json_file.write(json_config)
# Save weights to disk
model.save_weights('path_to_my_weights.h5')

# Reload the model from the 2 files we saved
with open('model_config.json') as json_file:
    json_config = json_file.read()
new_model = keras.models.model_from_json(json_config)
new_model.load_weights('path_to_my_weights.h5')

# Check that the state is preserved
new_predictions = new_model.predict(x_test)
np.testing.assert_allclose(predictions, new_predictions, atol=1e-6)

# Note that the optimizer was not preserved.

이는 너무 복잡하니 다음과 같은 방법을 추천드립니다.

model.save('path_to_my_model.h5')
del model
model = keras.models.load_model('path_to_my_model.h5')

 

Weights-only saving in SavedModel format

save_weights는 keras에서 HDF5 format, Tensorflow에서 SavedModel format 형태를 만들어 냅니다. 이러한 형식은 사용자가 제공하는 되로 선택되어집니다. ".h5", "keras"인 경우는 Keras HDF5 형식을 사용합니다. 다른 모든 항목은 SavedModel로 기본 설정 됩니다.

model.save_weights('path_to_my_tf_savedmodel')

명시적으로 save_format인자("tf" or "h5")를 전달하여 설정할 수 있습니다. 

model.save_weights('path_to_my_tf_savedmodel', save_format='tf')

 

Saving Subclassed Models

Sequential, Functional Model은 DAG of layers를 나타내는 데이터구조입니다. 이들은 안전하게 serialized, deserialized될 수 있습니다.

subclassed되어진 모델은 데이터 구조가 아닌 그저 코드의 일부분일 뿐입니다. 이러한 모델의 구조는 call method에서 정의됩니다. 이는 모델이 안전하게 serialized될 수 없다는 것을 의미합니다.

모델을 불러오기 위해, 모델을 생성한 코드(the code of the model subclass)에 접근할 수 있어야합니다. 대신, 이러한 코드를 바이트코드(pickling)로 serializing할 수 있지만 안전하지 않습니다.

class ThreeLayerMLP(keras.Model):

  def __init__(self, name=None):
    super(ThreeLayerMLP, self).__init__(name=name)
    self.dense_1 = layers.Dense(64, activation='relu', name='dense_1')
    self.dense_2 = layers.Dense(64, activation='relu', name='dense_2')
    self.pred_layer = layers.Dense(10, activation='softmax', name='predictions')

  def call(self, inputs):
    x = self.dense_1(inputs)
    x = self.dense_2(x)
    return self.pred_layer(x)

def get_model():
  return ThreeLayerMLP(name='3_layer_mlp')

model = get_model()

우선, 한번도 사용되지 않은 모델은 저장할 수 없습니다. 

subclassed model은 weights를 생성하기 위해 한번쯤은 데이터가 필요하기 때문입니다. 

모델이 호출될 때 까지, input dtype, shape를 알 수 없고 weight variable또한 마찬가지입니다. Functional Model은 가장 처음에 keras.Input을 통해 input dtype, shape를 정의하기 때문에 이러한 문제가 없습니다. 

(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype('float32') / 255
x_test = x_test.reshape(10000, 784).astype('float32') / 255

model.compile(loss='sparse_categorical_crossentropy',
              optimizer=keras.optimizers.RMSprop())
history = model.fit(x_train, y_train,
                    batch_size=64,
                    epochs=1)

subclassed model을 저장하는데에 추천하는 방법은 model과 연관된 모든 variable을 담고있는 Tensorflow SavedModel checkpoint를 만드는 save_weights()를 사용하는 것입니다.(weights, optimizer state, etc.)

model.save_weights('path_to_my_weights', save_format='tf')

# Save predictions for future checks
predictions = model.predict(x_test)
# Also save the loss on the first batch
# to later assert that the optimizer state was preserved
first_batch_loss = model.train_on_batch(x_train[:64], y_train[:64])

모델을 복원하기 위해선 만들어진 모델 객체에 접근할 수 있어야합니다.

metric, optimizer의 상태를 복원하려면 model을 compile하고 load_weights를 호출하기 전에 일부 데이터를 호출해야합니다.

# Recreate the model
new_model = get_model()
new_model.compile(loss='sparse_categorical_crossentropy',
                  optimizer=keras.optimizers.RMSprop())

# This initializes the variables used by the optimizers,
# as well as any stateful metric variables
new_model.train_on_batch(x_train[:1], y_train[:1])

# Load the state of the old model
new_model.load_weights('path_to_my_weights')

# Check that the model state has been preserved
new_predictions = new_model.predict(x_test)
np.testing.assert_allclose(predictions, new_predictions, atol=1e-6)

# The optimizer state is preserved as well,
# so you can resume training where you left off
new_first_batch_loss = new_model.train_on_batch(x_train[:64], y_train[:64])
assert first_batch_loss == new_first_batch_loss

이글은 다음 문서를 참조합니다.

www.tensorflow.org/guide/keras/custom_layers_and_models

(번역은 자력 + 파파고 + 구글 번역기를 사용하였으니, 부자연스럽더라도 양해바랍니다.)


You can optionally enable serialization on your layers

Functional model의 기능 중 하나로서 get_config를 사용하여 layer의 정보를 serialization할 수 있습니다. (serialization은 unit과 같은 정보를 송수신 할 수 있게 하는 것입니다.)

class Linear(layers.Layer):

  def __init__(self, units=32):
    super(Linear, self).__init__()
    self.units = units

  def build(self, input_shape):
    self.w = self.add_weight(shape=(input_shape[-1], self.units),
                             initializer='random_normal',
                             trainable=True)
    self.b = self.add_weight(shape=(self.units,),
                             initializer='random_normal',
                             trainable=True)

  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

  def get_config(self):
    return {'units': self.units}


# Now you can recreate the layer from its config:
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)

base layer class의 __init__ method가 name, dtype과 같은 keyword arguments를 가지고 있습니다. 이러한 것을 __init__의 상위 클래스에 전달하고 layer 구성에 포함시키는 것이 좋습니다.

class Linear(layers.Layer):

  def __init__(self, units=32, **kwargs):
    super(Linear, self).__init__(**kwargs)
    self.units = units

  def build(self, input_shape):
    self.w = self.add_weight(shape=(input_shape[-1], self.units),
                             initializer='random_normal',
                             trainable=True)
    self.b = self.add_weight(shape=(self.units,),
                             initializer='random_normal',
                             trainable=True)

  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

  def get_config(self):
    config = super(Linear, self).get_config()
    config.update({'units': self.units})
    return config


layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)

이러한 config에서 deserializing을 하고 싶다면 from_config를 이용하면 좋습니다.

def from_config(cls, config):
  return cls(**config)

이 내용은 다음 글에서 더 자세히 다룹니다.

 

Privileged training argument in the call method

몇몇 BatchNormalization, Dropout과 같은 layer는 학습, 추론을 하는 동안 다른 기능을 가집니다. 이러한 layer의 경우 학습에 대한 여부를 boolean의 형태로 노출시키는 것이 표준 관례입니다.

이러한 인자는 fit()에서 사용하기 더 쉽습니다.

class CustomDropout(layers.Layer):

  def __init__(self, rate, **kwargs):
    super(CustomDropout, self).__init__(**kwargs)
    self.rate = rate

  def call(self, inputs, training=None):
    if training:
        return tf.nn.dropout(inputs, rate=self.rate)
    return inputs

 

Building Models

The Model class

일반적으로 block을 쌓기 위해 Layer 클래스를 사용하고, 학습을 위해 Model 객체를 정의합니다.

예를 들어 ResNet50에서는 여러 Layer로 subclassing된 ResNet block이 존재하고 Model에 통과시킵니다.

Model 클래스는 Layer와 같은 API를 가지고 있으며 다음과 같은 차이점을 가집니다.

  • model.fit(), model.evaluate(), model.predict()를 가집니다. 
  • model.layers 속성을 통해 내부 계층을 리스트로 접근할 수 있습니다. 
  • saving and serialization API를 가집니다.

사실 Layer 클래스는 "layer"(conv layer or recurrent layer)과 "block"(ResNet block or Inception block)과 의미를 공유합니다.

한편, Model 클래스는 문맥적으로 "model"(deep learning model) 과 "network"(deep neural network)와 의미를 공유합니다.

Model을 정의하여 fit()을 통해 학습하고, save_weights를 통해 weight를 저장할 수 있습니다.

class ResNet(tf.keras.Model):

    def __init__(self):
        super(ResNet, self).__init__()
        self.block_1 = ResNetBlock()
        self.block_2 = ResNetBlock()
        self.global_pool = layers.GlobalAveragePooling2D()
        self.classifier = Dense(num_classes)

    def call(self, inputs):
        x = self.block_1(inputs)
        x = self.block_2(x)
        x = self.global_pool(x)
        return self.classifier(x)


resnet = ResNet()
dataset = ...
resnet.fit(dataset, epochs=10)
resnet.save_weights(filepath)

 

Putting it all together: an end-to-end example

지금까지 다음과 같은 내용을 보았습니다.

  • Layer__init__ or build에서 만들어진 상태를 캡슐화하고 call을 통한 계산을 행합니다. 
  • 더 새롭고 더 큰 계산 블록으로 만들어질 수 있습니다.
  • (regularization)losses에 접근할 수 있습니다.
  • 외부 컨테이너에 해당하는 Model로 학습시킬 수 있습니다.
  • ModelLayer을 override하고 있으나 학습과 serialization 측면에서 차이점을 가집니다.

여태까지 배운 모든 내용을 포함한 예제를 보겠습니다. MNIST를 학습시키는 Variational AutoEncoder예제입니다. VAE는 하위 클래스 계층의 중첩된 구성으로 구축된 모델의 하위 클래스입니다. 또한, regularization loss(KL divergence)에 대해서도 다룹니다.

(개념에 대한 부분은 넘어가도 좋습니다. VAE의 개념은 처음에 이해하기엔 어렵습니다. 하지만 코드가 어떤 구조를 가지고 있는지는 관찰하고 넘어가시는 것을 추천드립니다.)

class Sampling(layers.Layer):
  """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

  def call(self, inputs):
    z_mean, z_log_var = inputs
    batch = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon


class Encoder(layers.Layer):
  """Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""

  def __init__(self,
               latent_dim=32,
               intermediate_dim=64,
               name='encoder',
               **kwargs):
    super(Encoder, self).__init__(name=name, **kwargs)
    self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
    self.dense_mean = layers.Dense(latent_dim)
    self.dense_log_var = layers.Dense(latent_dim)
    self.sampling = Sampling()

  def call(self, inputs):
    x = self.dense_proj(inputs)
    z_mean = self.dense_mean(x)
    z_log_var = self.dense_log_var(x)
    z = self.sampling((z_mean, z_log_var))
    return z_mean, z_log_var, z


class Decoder(layers.Layer):
  """Converts z, the encoded digit vector, back into a readable digit."""

  def __init__(self,
               original_dim,
               intermediate_dim=64,
               name='decoder',
               **kwargs):
    super(Decoder, self).__init__(name=name, **kwargs)
    self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
    self.dense_output = layers.Dense(original_dim, activation='sigmoid')

  def call(self, inputs):
    x = self.dense_proj(inputs)
    return self.dense_output(x)


class VariationalAutoEncoder(tf.keras.Model):
  """Combines the encoder and decoder into an end-to-end model for training."""

  def __init__(self,
               original_dim,
               intermediate_dim=64,
               latent_dim=32,
               name='autoencoder',
               **kwargs):
    super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
    self.original_dim = original_dim
    self.encoder = Encoder(latent_dim=latent_dim,
                           intermediate_dim=intermediate_dim)
    self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)

  def call(self, inputs):
    z_mean, z_log_var, z = self.encoder(inputs)
    reconstructed = self.decoder(z)
    # Add KL divergence regularization loss.
    kl_loss = - 0.5 * tf.reduce_mean(
        z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
    self.add_loss(kl_loss)
    return reconstructed


original_dim = 784
vae = VariationalAutoEncoder(original_dim, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype('float32') / 255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

# Iterate over epochs.
for epoch in range(3):
  print('Start of epoch %d' % (epoch,))

  # Iterate over the batches of the dataset.
  for step, x_batch_train in enumerate(train_dataset):
    with tf.GradientTape() as tape:
      reconstructed = vae(x_batch_train)
      # Compute reconstruction loss
      loss = mse_loss_fn(x_batch_train, reconstructed)
      loss += sum(vae.losses)  # Add KLD regularization loss

    grads = tape.gradient(loss, vae.trainable_variables)
    optimizer.apply_gradients(zip(grads, vae.trainable_variables))

    loss_metric(loss)

    if step % 100 == 0:
      print('step %s: mean loss = %s' % (step, loss_metric.result()))

VAE는 Model의 하위클래스이기 때문에 다음과 같이 학습시킬수도 있습니다.
(데이터가 model에 어떻게 흘러가는지도 잘 관찰하는게 좋습니다)

vae = VariationalAutoEncoder(784, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)

 

Beyond object-oriented development: the Functional API

Functional API를 사용해서 구현할 수도 있습니다. 어떤 방식을 쓰던 문제가 되지 않습니다. 섞어 사용하여도 무방합니다.

original_dim = 784
intermediate_dim = 64
latent_dim = 32

# Define encoder model.
original_inputs = tf.keras.Input(shape=(original_dim,), name='encoder_input')
x = layers.Dense(intermediate_dim, activation='relu')(original_inputs)
z_mean = layers.Dense(latent_dim, name='z_mean')(x)
z_log_var = layers.Dense(latent_dim, name='z_log_var')(x)
z = Sampling()((z_mean, z_log_var))
encoder = tf.keras.Model(inputs=original_inputs, outputs=z, name='encoder')

# Define decoder model.
latent_inputs = tf.keras.Input(shape=(latent_dim,), name='z_sampling')
x = layers.Dense(intermediate_dim, activation='relu')(latent_inputs)
outputs = layers.Dense(original_dim, activation='sigmoid')(x)
decoder = tf.keras.Model(inputs=latent_inputs, outputs=outputs, name='decoder')

# Define VAE model.
outputs = decoder(z)
vae = tf.keras.Model(inputs=original_inputs, outputs=outputs, name='vae')

# Add KL divergence regularization loss.
kl_loss = - 0.5 * tf.reduce_mean(
    z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
vae.add_loss(kl_loss)

# Train.
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)

 

이글은 다음 문서를 참조합니다.

www.tensorflow.org/guide/keras/custom_layers_and_models

(번역은 자력 + 파파고 + 구글 번역기를 사용하였으니, 부자연스럽더라도 양해바랍니다.)


Writing layers and models with Tensorflow Keras

 

The Layer class

Layers encapsulate a state (weights) and some computation

우리가 주로 작업할 데이터 구조는 layer입니다. 레이어는 상태(가중치)와 입력에서 출력으로의 변환을 동시에 캡슐화시킵니다.

예제를 살펴보겠습니다.

from tensorflow.keras import layers


class Linear(layers.Layer):

  def __init__(self, units=32, input_dim=32):
    super(Linear, self).__init__()
    w_init = tf.random_normal_initializer()
    self.w = tf.Variable(initial_value=w_init(shape=(input_dim, units),
                                              dtype='float32'),
                         trainable=True)
    b_init = tf.zeros_initializer()
    self.b = tf.Variable(initial_value=b_init(shape=(units,),
                                              dtype='float32'),
                         trainable=True)

  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)
tf.Tensor(
[[ 0.04076247  0.12488913 -0.09827997 -0.00854541]
 [ 0.04076247  0.12488913 -0.09827997 -0.00854541]], shape=(2, 4), dtype=float32)

레이어 속성으로 설정된 경우 레이어별로 가중치 w와 b가 자동으로 추적된다는 점에 유의해야 합니다. (쉽게 말해서 그냥 레이어의 가중치와 bias에 접근할 수 있다.. 요정도?)

assert linear_layer.weights == [linear_layer.w, linear_layer.b]

add_weight를 사용하면 위의 예보다 코드도 짧고 더 빠릅니다.

class Linear(layers.Layer):

  def __init__(self, units=32, input_dim=32):
    super(Linear, self).__init__()
    self.w = self.add_weight(shape=(input_dim, units),
                             initializer='random_normal',
                             trainable=True)
    self.b = self.add_weight(shape=(units,),
                             initializer='zeros',
                             trainable=True)

  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)

 

Layers can have non-trainable weights

layer는 학습가능한 가중치와 동시에 학습불가능한 가중치를 가질 수 있습니다. 이러한 가중치는 layer를 학습시킬 때 backprop을 진행하는 동안 학습 대상에서 고려되지 않습니다.

class ComputeSum(layers.Layer):

  def __init__(self, input_dim):
    super(ComputeSum, self).__init__()
    self.total = tf.Variable(initial_value=tf.zeros((input_dim,)),
                             trainable=False)

  def call(self, inputs):
    self.total.assign_add(tf.reduce_sum(inputs, axis=0))
    return self.total

x = tf.ones((2, 2))
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())
y = my_sum(x)
print(y.numpy())

tf.Variable(~, trainable = False) 는 backprop을 진행하지 않겠다는 것입니다.

layer.weights에서 non-trainable weight로 카테고리화 되어 있습니다.

print('weights:', len(my_sum.weights))
print('non-trainable weights:', len(my_sum.non_trainable_weights))

# It's not included in the trainable weights:
print('trainable_weights:', my_sum.trainable_weights)

 

Best practice: deferring weight creation until the shape of the inputs is known

위의 로지스틱 회귀 예에서, Linear 클래스는 __init__에서 wb의 shape을 결정짓는 input_dim의 인자를 받습니다.

class Linear(layers.Layer):

  def __init__(self, units=32, input_dim=32):
      super(Linear, self).__init__()
      self.w = self.add_weight(shape=(input_dim, units),
                               initializer='random_normal',
                               trainable=True)
      self.b = self.add_weight(shape=(units,),
                               initializer='random_normal',
                               trainable=True)

많은 경우에, 우리는 입력의 크기를 미리 알지 못할 수 있으며, 그 값이 알려질 때, 어떤 때는 레이어를 인스턴스화한 후에서야 가중치를 만들 수도 있습니다.

Keras API에서 다음 예와 같이 build(inputs_shape)에서 layer weights를 만드는 것을 추천합니다.

class Linear(layers.Layer):

  def __init__(self, units=32):
    super(Linear, self).__init__()
    self.units = units

  def build(self, input_shape):
    self.w = self.add_weight(shape=(input_shape[-1], self.units),
                             initializer='random_normal',
                             trainable=True)
    self.b = self.add_weight(shape=(self.units,),
                             initializer='random_normal',
                             trainable=True)

  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

__call__은 자동으로 처음 불렸을 때 build를 실행시킵니다. 좀 더 사용하기가 쉬워졌습니다.

linear_layer = Linear(32)  # At instantiation, we don't know on what inputs this is going to get called
y = linear_layer(x)  # The layer's weights are created dynamically the first time the layer is called

 

Layers are recursively composable

계층 인스턴스를 다른 계층의 속성으로 할당하면 외부 계층이 내부 계층의 가중치를 사용할 수 있게 됩니다.

우리는 이러한 sublayer들을 __init__ method에서 사용하길 추천합니다(서브레이어는 build가 호출되면 사용될 것입니다)

# Let's assume we are reusing the Linear class
# with a `build` method that we defined above.

class MLPBlock(layers.Layer):

  def __init__(self):
    super(MLPBlock, self).__init__()
    self.linear_1 = Linear(32)
    self.linear_2 = Linear(32)
    self.linear_3 = Linear(1)

  def call(self, inputs):
    x = self.linear_1(inputs)
    x = tf.nn.relu(x)
    x = self.linear_2(x)
    x = tf.nn.relu(x)
    return self.linear_3(x)


mlp = MLPBlock()
y = mlp(tf.ones(shape=(3, 64)))  # The first call to the `mlp` will create the weights
print('weights:', len(mlp.weights))
print('trainable weights:', len(mlp.trainable_weights))

 

Layers recursively collect losses created during the forward pass

call method를 호출한 후에, 우리는 loss tensor를 다룰 수 있습니다.  이는 self.add_loss(value)를 통해 할 수 있습니다.

# A layer that creates an activity regularization loss
class ActivityRegularizationLayer(layers.Layer):

  def __init__(self, rate=1e-2):
    super(ActivityRegularizationLayer, self).__init__()
    self.rate = rate

  def call(self, inputs):
    self.add_loss(self.rate * tf.reduce_sum(inputs))
    return inputs

 

이 loss(내부 층에 의해 만들어져 포함된)는 layer.losses를 통해 접근할 수 있습니다. layer.losses는 항상 마지막 forward pass를 하는 동안에 계산된 loss값을 포함하고, 최상위층에서 모든 __call__의 시작부분에서 초기화됩니다.

class OuterLayer(layers.Layer):

  def __init__(self):
    super(OuterLayer, self).__init__()
    self.activity_reg = ActivityRegularizationLayer(1e-2)

  def call(self, inputs):
    return self.activity_reg(inputs)


layer = OuterLayer()
assert len(layer.losses) == 0  # No losses yet since the layer has never been called
_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # We created one loss value

# `layer.losses` gets reset at the start of each __call__
_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # This is the loss created during the call above

또한 손실 속성은 내부 layer의 가중치에 대해 생성된 regularization losses도 포함합니다.

class OuterLayer(layers.Layer):

  def __init__(self):
    super(OuterLayer, self).__init__()
    self.dense = layers.Dense(32, kernel_regularizer=tf.keras.regularizers.l2(1e-3))

  def call(self, inputs):
    return self.dense(inputs)


layer = OuterLayer()
_ = layer(tf.zeros((1, 1)))

# This is `1e-3 * sum(layer.dense.kernel)`,
# created by the `kernel_regularizer` above.
print(layer.losses)

이러한 losses는 다음 예와 같이 학습 loops동안에 계산되어집니다.

# Instantiate an optimizer.
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Iterate over the batches of a dataset.
for x_batch_train, y_batch_train in train_dataset:
  with tf.GradientTape() as tape:
    logits = layer(x_batch_train)  # Logits for this minibatch
    # Loss value for this minibatch
    loss_value = loss_fn(y_batch_train, logits))
    # Add extra losses created during this forward pass:
    loss_value += sum(model.losses)

    grads = tape.gradient(loss_value, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

 

이글은 다음 문서를 참조합니다.

www.tensorflow.org/guide/keras/train_and_evaluate

(번역은 자력 + 파파고 + 구글 번역기를 사용하였으니, 부자연스럽더라도 양해바랍니다.)


Part 2: Writing your own training & evaluation loops from scratch

fit(), evaluate() 함수를 통한 학습&평가 방식이 아닌 좀 더 low-level을 다루고 싶다면, 매우 간단하게 커스터마이징할 수 있습니다. 그러나 디버깅할때 수많은 노력이 필요할 것입니다.

 

Using the GradientTape: a first end-to-end example

loss에 대하여 layer의 학습가능한 weight의 gradient를 알고싶다면 GradientTape scope를 정의해야 합니다. optimizer객체를 사용하여 model.trainable_variables를 통해 업데이트되는 gradient를 사용할 수 있습니다.

예제를 살펴보겠습니다.

# Get the model.
inputs = keras.Input(shape=(784,), name='digits')
x = layers.Dense(64, activation='relu', name='dense_1')(inputs)
x = layers.Dense(64, activation='relu', name='dense_2')(x)
outputs = layers.Dense(10, activation='softmax', name='predictions')(x)
model = keras.Model(inputs=inputs, outputs=outputs)

# Instantiate an optimizer.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy()

# Prepare the training dataset.
batch_size = 64
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

# Iterate over epochs.
for epoch in range(3):
  print('Start of epoch %d' % (epoch,))

  # Iterate over the batches of the dataset.
  for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):

    # Open a GradientTape to record the operations run
    # during the forward pass, which enables autodifferentiation.
    with tf.GradientTape() as tape:

      # Run the forward pass of the layer.
      # The operations that the layer applies
      # to its inputs are going to be recorded
      # on the GradientTape.
      logits = model(x_batch_train)  # Logits for this minibatch

      # Compute the loss value for this minibatch.
      loss_value = loss_fn(y_batch_train, logits)

    # Use the gradient tape to automatically retrieve
    # the gradients of the trainable variables with respect to the loss.
    grads = tape.gradient(loss_value, model.trainable_variables)

    # Run one step of gradient descent by updating
    # the value of the variables to minimize the loss.
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

    # Log every 200 batches.
    if step % 200 == 0:
        print('Training loss (for one batch) at step %s: %s' % (step, float(loss_value)))
        print('Seen so far: %s samples' % ((step + 1) * 64))

 

Low-level handling of metrics

우리들이 직접 정의한 metric이나 내장된 metric을 쉽게 사용(재사용)할 수 있습니다.

  • 첫 루프에서 메트릭을 초기화합니다.
  • 각 batch 후에 metric.update_state()를 호출합니다.
  • metric의 현재 값을 알고싶다면 metric.result()를 호출합니다.
  • (보통 the end of epoch에) metric의 상태를 clear하고 싶을 때 metric.reset_states()를 호출합니다.

SparseCategoricalAccuracy예제를 살펴보겠습니다.

# Get model
inputs = keras.Input(shape=(784,), name='digits')
x = layers.Dense(64, activation='relu', name='dense_1')(inputs)
x = layers.Dense(64, activation='relu', name='dense_2')(x)
outputs = layers.Dense(10, activation='softmax', name='predictions')(x)
model = keras.Model(inputs=inputs, outputs=outputs)

# Instantiate an optimizer to train the model.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy()

# Prepare the metrics.
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()

# Prepare the training dataset.
batch_size = 64
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

# Prepare the validation dataset.
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(64)


# Iterate over epochs.
for epoch in range(3):
  print('Start of epoch %d' % (epoch,))

  # Iterate over the batches of the dataset.
  for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
    with tf.GradientTape() as tape:
      logits = model(x_batch_train)
      loss_value = loss_fn(y_batch_train, logits)
    grads = tape.gradient(loss_value, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

    # Update training metric.
    train_acc_metric(y_batch_train, logits)

    # Log every 200 batches.
    if step % 200 == 0:
        print('Training loss (for one batch) at step %s: %s' % (step, float(loss_value)))
        print('Seen so far: %s samples' % ((step + 1) * 64))

  # Display metrics at the end of each epoch.
  train_acc = train_acc_metric.result()
  print('Training acc over epoch: %s' % (float(train_acc),))
  # Reset training metrics at the end of each epoch
  train_acc_metric.reset_states()

  # Run a validation loop at the end of each epoch.
  for x_batch_val, y_batch_val in val_dataset:
    val_logits = model(x_batch_val)
    # Update val metrics
    val_acc_metric(y_batch_val, val_logits)
  val_acc = val_acc_metric.result()
  val_acc_metric.reset_states()
  print('Validation acc: %s' % (float(val_acc),))

 

Low-level handling of extra losses

이전에 우리는 call method에서 self.add_loss(value)를 통하여 regularization loss를 사용할 수 있다는 것을 보았습니다.

일반적으로, custom 학습 루프에 이러한 loss를 고려하게 될 것입니다.(모델을 직접 작성하여 나만의 loss를 만들고 싶은 경우)

class ActivityRegularizationLayer(layers.Layer):

  def call(self, inputs):
    self.add_loss(1e-2 * tf.reduce_sum(inputs))
    return inputs

inputs = keras.Input(shape=(784,), name='digits')
x = layers.Dense(64, activation='relu', name='dense_1')(inputs)
# Insert activity regularization as a layer
x = ActivityRegularizationLayer()(x)
x = layers.Dense(64, activation='relu', name='dense_2')(x)
outputs = layers.Dense(10, activation='softmax', name='predictions')(x)

model = keras.Model(inputs=inputs, outputs=outputs)
logits = model(x_train)

모델을 만들고,

logits = model(x_train[:64])
print(model.losses)

forward pass동안 만들어지는 loss는 model.losses에 저장됩니다.

추적된 loss는 모델의 __call__을 시작할 때 clear되기 떄문에, 한 번의 forward pass 동안 발생한 손실만 볼 수 있을 것이다. 예를 들어 모델을 반복적으로 호출한 다음 loss를 쿼리하면 마지막에 발생한 최신 loss만 알 수 있습니다.

logits = model(x_train[:64])
logits = model(x_train[64: 128])
logits = model(x_train[128: 192])
print(model.losses)
[<tf.Tensor: id=999851, shape=(), dtype=float32, numpy=6.88884>]

 

학습하는 동안 발생하는 모든 loss를 고려하고 싶다면, 학습 loop에서 total_loss에 sum(model.losses)를 추가해야합니다.

optimizer = keras.optimizers.SGD(learning_rate=1e-3)

for epoch in range(3):
  print('Start of epoch %d' % (epoch,))

  for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
    with tf.GradientTape() as tape:
      logits = model(x_batch_train)
      loss_value = loss_fn(y_batch_train, logits)

      # Add extra losses created during this forward pass:
      loss_value += sum(model.losses)

    grads = tape.gradient(loss_value, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

    # Log every 200 batches.
    if step % 200 == 0:
        print('Training loss (for one batch) at step %s: %s' % (step, float(loss_value)))
        print('Seen so far: %s samples' % ((step + 1) * 64))