이 글은 다음 Keras Example을 번역합니다.

https://keras.io/examples/timeseries/timeseries_traffic_forecasting/

 

Keras documentation: Traffic forecasting using graph neural networks and LSTM

Traffic forecasting using graph neural networks and LSTM Author: Arash Khodadadi Date created: 2021/12/28 Last modified: 2021/12/28 Description: This example demonstrates how to do timeseries forecasting over graphs. View in Colab • GitHub source Introdu

keras.io


Introduction

이 예제는 GNN과 LSTM을 활용하여 교통 상태를 예측해봅니다. 특히, 도로 구역별 체증(traffic speed) 히스토리를 활용해 향후 체증을 예측합니다.

문제를 해결하기 위해 사용되는 주요한 방법은 도로 구역별 체증 상태를 시계열 형태로 두고, 과거 상태를 이용해 미래를 예측하는 것입니다.

하지만 단순 시게열 방법은 인접 도로간 체증 상태를 활용하지 못합니다. 따라서 이웃한(인접한) 도로 체증의 복잡한 연관관계를 활용할 수 있도록 graph로 표현되는 traffic network를 구성하고, 그래프로 표현되는 체증을 활용합니다. 이 예제는 그래프로 이루어진 시계열 데이터를 입력받을 수 있는 모델을 구성합니다. 첫 번째로 어떻게 데이터를 가공하는지 살펴보고, 그래프를 예측하기 위해 tf.data.Dataset을 만듭니다. 그 후, GCN(Graph Convolution Network)와 LSTM으로 구성된 모델을 구현합니다.

데이터 전처리와 모델 구조는 다음 논문을 참조합니다.

Yu, Bing, Haoteng Yin, and Zhanxing Zhu. "Spatio-temporal graph convolutional networks: a deep learning framework for traffic forecasting." Proceedings of the 27th International Joint Conference on Artificial Intelligence, 2018. (github) 


Setup

import pandas as pd
import numpy as np
import os
import typing
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

Data preparation

Data description

실세계 교통 체증 데이터셋 PeMSD7을 활용합니다. 이 데이터셋은 Yu et al., 2018에서 수집 및 준비되었고, 여기서 활용할 수 있습니다. 데이터셋에 대한 구체적인 설명은 논문을 참조하세요.

이 데이터셋은 두 개의 파일로 구성되어 있습니다.

  • W_228.csv: 캘리포니아 7구역의 228개 거리
  • V_228.csv: 2012년 5, 6월 평일의 교통 체증 데이터

Loading Data

url = "https://github.com/VeritasYin/STGCN_IJCAI-18/raw/master/data_loader/PeMS-M.zip"
data_dir = keras.utils.get_file(origin=url, extract=True, archive_format="zip")
data_dir = data_dir.rstrip(".zip")

route_distances = pd.read_csv(
    os.path.join(data_dir, "W_228.csv"), header=None
).to_numpy()
speeds_array = pd.read_csv(os.path.join(data_dir, "V_228.csv"), header=None).to_numpy()

print(f"route_distances shape={route_distances.shape}") # (228, 228)
print(f"speeds_array shape={speeds_array.shape}") # (12672, 228)

sub-sampling roads

크기 문제를 감소시키고, 학습 속도를 증가시키기 위해 228개 도로 중 26개 도로만 사용합니다. 0번 도로부터 시작해 총 25개 도로를 선택합니다. sample_routes에 포함되어 있는 도로들은 각 도로별 상관성을 가지고 있습니다.

sample_routes = [
    0, 1, 4,
    7, 8, 11,
    15, 108, 109,
    114, 115, 118,
    120, 123, 124,
    126, 127, 129,
    130, 132, 133,
    136, 139, 144,
    147, 216,
]
route_distances = route_distances[np.ix_(sample_routes, sample_routes)]
speeds_array = speeds_array[:, sample_routes]

print(f"route_distances shape={route_distances.shape}") # (26, 26)
print(f"speeds_array shape={speeds_array.shape}") # (12672, 26)

Data Visualization

두 개 도로를 선택하여 시각화해봅니다.

plt.figure(figsize=(18, 6))
plt.plot(speeds_array[:, [0, -1]])
plt.legend(["route_0", "route_25"])

각 도로별 상관관계도 시각화할 수 있습니다.

plt.figure(figsize=(8, 8))
plt.matshow(np.corrcoef(speeds_array.T), 0)
plt.xlabel("road number")
plt.ylabel("road number")

Splitting and normalizing data

train_size, val_size = 0.5, 0.2


def preprocess(data_array: np.ndarray, train_size: float, val_size: float):
    """Splits data into train/val/test sets and normalizes the data.

    Args:
        data_array: ndarray of shape `(num_time_steps, num_routes)`
        train_size: A float value between 0.0 and 1.0 that represent the proportion of the dataset
            to include in the train split.
        val_size: A float value between 0.0 and 1.0 that represent the proportion of the dataset
            to include in the validation split.

    Returns:
        `train_array`, `val_array`, `test_array`
    """

    # Split 수행
    num_time_steps = data_array.shape[0]
    num_train, num_val = (
        int(num_time_steps * train_size),
        int(num_time_steps * val_size),
    )
    train_array = data_array[:num_train]
    mean, std = train_array.mean(axis=0), train_array.std(axis=0)

    # Normalize 수행
    train_array = (train_array - mean) / std
    val_array = (data_array[num_train : (num_train + num_val)] - mean) / std
    test_array = (data_array[(num_train + num_val) :] - mean) / std

    return train_array, val_array, test_array


train_array, val_array, test_array = preprocess(speeds_array, train_size, val_size)

print(f"train set size: {train_array.shape}") # (6336, 26)
print(f"validation set size: {val_array.shape}") # (2534, 26)
print(f"test set size: {test_array.shape}") # (3802, 26)

Creating TensorFlow Datasets

예측 문제를 해결하기 위한 Dataset을 만듭니다. 이 예제에서 다룰 데이터는 t+1, t+2, ..., t+T 형태의 시퀀스를 활용해 t+T+1, ..., t+T+h 시점의 미래 체증을 예측합니다. t 시점의 입력값은 N 크기의 T 벡터이며, 예측(또는 타겟)값은 N 크기의 h 벡터입니다. 여기서 N은 도로 수 입니다. 

데이터셋을 만들기 위해 Keras에서 제공하는 timeseries_dataset_from_array() 함수를 활용합니다. 또, 아래의 create_tf_dataset() 함수는 numpy.ndarray을 입력받고, tf.data.Dataset을 반환합니다. 이 함수 안에서 input_sequence_lengthforcast_horizon은 각각 위의 Th를 의미합니다.

multi_horizon 인자를 추가적으로 설명합니다. forcast_horizon=3으로 가정하고, multi_horizon=True로 설정되어 있으면, 모델은 t+T+1, t+T+2, t+T+3 시점을 예측하고, Target 값의 형태는 (T, 3)이 됩니다. 반대로 multi_horizon=False로 설정되어 있으면, 마지막 시점인 t+T+3 시점만 예측하고 Target 값 형태는 (T, 1)이 됩니다.

여기서 사용되는 input tensor의 형태는 (batch_size, input_sequence_length, num_routes, 1)입니다. 마지막 차원은 모델 입력을 일반화하기 위해 사용했습니다.
예를 들어, 각 도로별 온도를 추가 특성으로서 활용하고 싶다면, 마지막 차원에서 각 도로(num_routes)는 speed, temperature 두 가지 특성을 가질 것이므로 (batch_size, input_sequence_length, num_routes, 2)가 됩니다. 하지만 이번 예제에서는 추가로 다루지 않으며, 항상 1을 유지합니다.

과거 12개 값을 활용해 미래 3개 값을 예측합니다.

from tensorflow.keras.preprocessing import timeseries_dataset_from_array

batch_size = 64
input_sequence_length = 12
forecast_horizon = 3
multi_horizon = False


def create_tf_dataset(
    data_array: np.ndarray,
    input_sequence_length: int,
    forecast_horizon: int,
    batch_size: int = 128,
    shuffle=True,
    multi_horizon=True,
):
    """Creates tensorflow dataset from numpy array.

    This function creates a dataset where each element is a tuple `(inputs, targets)`.
    `inputs` is a Tensor
    of shape `(batch_size, input_sequence_length, num_routes, 1)` containing
    the `input_sequence_length` past values of the timeseries for each node.
    `targets` is a Tensor of shape `(batch_size, forecast_horizon, num_routes)`
    containing the `forecast_horizon`
    future values of the timeseries for each node.

    Args:
        data_array: np.ndarray with shape `(num_time_steps, num_routes)`
        input_sequence_length: Length of the input sequence (in number of timesteps).
        forecast_horizon: If `multi_horizon=True`, the target will be the values of the timeseries for 1 to
            `forecast_horizon` timesteps ahead. If `multi_horizon=False`, the target will be the value of the
            timeseries `forecast_horizon` steps ahead (only one value).
        batch_size: Number of timeseries samples in each batch.
        shuffle: Whether to shuffle output samples, or instead draw them in chronological order.
        multi_horizon: See `forecast_horizon`.

    Returns:
        A tf.data.Dataset instance.
    """

    inputs = timeseries_dataset_from_array(
        np.expand_dims(data_array[:-forecast_horizon], axis=-1),
        None,
        sequence_length=input_sequence_length,
        shuffle=False,
        batch_size=batch_size,
    ) # 개별 입력 형태 = (64, 12, 26, 1)

    target_offset = (
        input_sequence_length
        if multi_horizon
        else input_sequence_length + forecast_horizon - 1
    )
    # multi_horizon이 True이면 forcast_horizon 크기 만큼 Target으로 활용
    target_seq_length = forecast_horizon if multi_horizon else 1
    targets = timeseries_dataset_from_array(
        data_array[target_offset:], # input_sequence_length 이후부터
        None,
        sequence_length=target_seq_length, # target_seq_length 크기만큼
        shuffle=False,
        batch_size=batch_size,
    ) # (64, 3, 26)

    dataset = tf.data.Dataset.zip((inputs, targets))
    if shuffle:
        dataset = dataset.shuffle(100)

    return dataset.prefetch(16).cache()


# (64, 12, 26, 1), (64, 3, 26)을 반환
train_dataset, val_dataset = (
    create_tf_dataset(data_array, input_sequence_length, forecast_horizon, batch_size)
    for data_array in [train_array, val_array]
)

test_dataset = create_tf_dataset(
    test_array,
    input_sequence_length,
    forecast_horizon,
    batch_size=test_array.shape[0],
    shuffle=False,
    multi_horizon=multi_horizon,
)

Roads Graph

PeMSD7 데이터셋은 도로 구역별 거리를 포함하고 있습니다. 이 거리를 활용하여 인접 매트릭스를 만듭니다. 논문에서 사용된 것처럼 두 도로의 거리가 threshold보다 낮다면 그래프의 edge가 존재하는 것으로 구성합니다.

def compute_adjacency_matrix(
    route_distances: np.ndarray, sigma2: float, epsilon: float
):
    """Computes the adjacency matrix from distances matrix.

    It uses the formula in https://github.com/VeritasYin/STGCN_IJCAI-18#data-preprocessing to
    compute an adjacency matrix from the distance matrix.
    The implementation follows that paper.

    Args:
        route_distances: np.ndarray of shape `(num_routes, num_routes)`. Entry `i,j` of this array is the
            distance between roads `i,j`.
        sigma2: Determines the width of the Gaussian kernel applied to the square distances matrix.
        epsilon: A threshold specifying if there is an edge between two nodes. Specifically, `A[i,j]=1`
            if `np.exp(-w2[i,j] / sigma2) >= epsilon` and `A[i,j]=0` otherwise, where `A` is the adjacency
            matrix and `w2=route_distances * route_distances`

    Returns:
        A boolean graph adjacency matrix.
    """
    num_routes = route_distances.shape[0]
    route_distances = route_distances / 10000.0
    w2, w_mask = (
        route_distances * route_distances,
        np.ones([num_routes, num_routes]) - np.identity(num_routes),
    )
    return (np.exp(-w2 / sigma2) >= epsilon) * w_mask

compute_adjacency_matrix() 함수는 boolean 인접 매트릭스를 반환합니다. 이 매트릭스에서 1은 두 node간 edge가 존재, 0은 존재하지 않음을 의미합니다.

class GraphInfo:
    def __init__(self, edges: typing.Tuple[list, list], num_nodes: int):
        self.edges = edges
        self.num_nodes = num_nodes


sigma2 = 0.1
epsilon = 0.5
adjacency_matrix = compute_adjacency_matrix(route_distances, sigma2, epsilon)
node_indices, neighbor_indices = np.where(adjacency_matrix == 1)
graph = GraphInfo(
    edges=(node_indices.tolist(), neighbor_indices.tolist()),
    num_nodes=adjacency_matrix.shape[0],
)
# 26, 150
print(f"number of nodes: {graph.num_nodes}, number of edges: {len(graph.edges[0])}")

Network architecture

모델은 그래프를 예측하기 위해 Graph Convolution Layer와 LSTM Layer로 구성됩니다.

Graph convolution layer

여기서 사용되는 Graph convolution layer 구조는 이 예제와 비슷합니다. 다른 점은 예제에서는 (num_nodes, in_feat) 2D tensor를 입력하지만, 이번 예제에서는 (num_nodes, batch_size, input_seq_length, in_feat) 4D tensor를 입력으로 사용합니다. graph convolution layer는 다음 단계로 계산이 수행됩니다.

  • 노드들의 표현은 input feature에 self.weight가 곱해지면서 self.compute_nodes_representation()에서 계산됩니다.
  • aggregated neighbors' messages는 집계된 neighbors' 표현에 self.weight를 곱하여 self.compute_aggregated_messages()에서 계산됩니다.
  • 층의 마지막에서 노드 표현과 neighbors' aggregated messages가 결합되면서 self.update()에서 계산됩니다.

위 설명은 아래 코드에서 주석을 참고하여 읽어보면 쉽게 이해됩니다.

class GraphConv(layers.Layer):
    def __init__(
        self,
        in_feat,
        out_feat,
        graph_info: GraphInfo,
        aggregation_type="mean",
        combination_type="concat",
        activation: typing.Optional[str] = None,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.in_feat = in_feat
        self.out_feat = out_feat
        self.graph_info = graph_info
        self.aggregation_type = aggregation_type
        self.combination_type = combination_type
        self.weight = tf.Variable(
            initial_value=keras.initializers.glorot_uniform()(
                shape=(in_feat, out_feat), dtype="float32"
            ),
            trainable=True,
        )
        self.activation = layers.Activation(activation)

    def aggregate(self, neighbour_representations: tf.Tensor):
      # aggregation 방법에 따라 쓰이는 함수가 달라집니다
        aggregation_func = {
            "sum": tf.math.unsorted_segment_sum,
            "mean": tf.math.unsorted_segment_mean,
            "max": tf.math.unsorted_segment_max,
        }.get(self.aggregation_type)

        if aggregation_func:
            return aggregation_func(
                neighbour_representations,
                self.graph_info.edges[0],
                num_segments=self.graph_info.num_nodes,
            )

        raise ValueError(f"Invalid aggregation type: {self.aggregation_type}")

    def compute_nodes_representation(self, features: tf.Tensor):
        """각 노드 표현을 계산합니다.

        노드 표현은 feature tensor에 self.weight가 곱해지면서 만들어집니다.
        self.weight shape는 (in_feat, out_feat).

        Args:
            features: Tensor of shape `(num_nodes, batch_size, input_seq_len, in_feat)`

        Returns:
            A tensor of shape `(num_nodes, batch_size, input_seq_len, out_feat)`
        """
        return tf.matmul(features, self.weight)

    def compute_aggregated_messages(self, features: tf.Tensor):
        neighbour_representations = tf.gather(features, self.graph_info.edges[1])
        aggregated_messages = self.aggregate(neighbour_representations)
        return tf.matmul(aggregated_messages, self.weight)

    def update(self, nodes_representation: tf.Tensor, aggregated_messages: tf.Tensor):
        if self.combination_type == "concat":
            h = tf.concat([nodes_representation, aggregated_messages], axis=-1)
        elif self.combination_type == "add":
            h = nodes_representation + aggregated_messages
        else:
            raise ValueError(f"Invalid combination type: {self.combination_type}.")

        return self.activation(h)

    def call(self, features: tf.Tensor):
        """Forward pass.

        Args:
            features: tensor of shape `(num_nodes, batch_size, input_seq_len, in_feat)`

        Returns:
            A tensor of shape `(num_nodes, batch_size, input_seq_len, out_feat)`
        """
        nodes_representation = self.compute_nodes_representation(features)
        aggregated_messages = self.compute_aggregated_messages(features)
        return self.update(nodes_representation, aggregated_messages)

LSTM plus graph convolution

graph convolution layer를 통과시키면, 노드 표현을 포함한 4D tensor를 output으로 얻습니다. 여기서 각 timestep은 이웃 노드의 정보가 집계(aggregate)된 노드 표현입니다.

좋은 예측을 위해선 이웃 노드의 정보뿐만 아니라 시간에 따른 정보를 처리할 수 있어야 합니다. 이를 위해 node tensor를 recurrent layer에 통과시킵니다. 아래 코드에서 첫 번째로 graph convolution layer에 입력을 넣고나서 LSTM layer를 통과시키는 LSTMGC layer를 볼 수 있습니다.

class LSTMGC(layers.Layer):
    """Layer comprising a convolution layer followed by LSTM and dense layers."""

    def __init__(
        self,
        in_feat,
        out_feat,
        lstm_units: int,
        input_seq_len: int,
        output_seq_len: int,
        graph_info: GraphInfo,
        graph_conv_params: typing.Optional[dict] = None,
        **kwargs,
    ):
        super().__init__(**kwargs)

        # graph conv layer
        if graph_conv_params is None:
            graph_conv_params = {
                "aggregation_type": "mean",
                "combination_type": "concat",
                "activation": None,
            }
        self.graph_conv = GraphConv(in_feat, out_feat, graph_info, **graph_conv_params)

        self.lstm = layers.LSTM(lstm_units, activation="relu")
        self.dense = layers.Dense(output_seq_len)

        self.input_seq_len, self.output_seq_len = input_seq_len, output_seq_len

    def call(self, inputs):
        """Forward pass.

        Args:
            inputs: tf.Tensor of shape `(batch_size, input_seq_len, num_nodes, in_feat)`

        Returns:
            A tensor of shape `(batch_size, output_seq_len, num_nodes)`.
        """

        # convert shape to  (num_nodes, batch_size, input_seq_len, in_feat)
        inputs = tf.transpose(inputs, [2, 0, 1, 3])

        gcn_out = self.graph_conv(
            inputs
        )  # gcn_out has shape: (num_nodes, batch_size, input_seq_len, out_feat)
        shape = tf.shape(gcn_out)
        num_nodes, batch_size, input_seq_len, out_feat = (
            shape[0],
            shape[1],
            shape[2],
            shape[3],
        )

        # LSTM takes only 3D tensors as input
        gcn_out = tf.reshape(gcn_out, (batch_size * num_nodes, input_seq_len, out_feat))
        lstm_out = self.lstm(
            gcn_out
        )  # lstm_out has shape: (batch_size * num_nodes, lstm_units)

        dense_output = self.dense(
            lstm_out
        )  # dense_output has shape: (batch_size * num_nodes, output_seq_len)
        output = tf.reshape(dense_output, (num_nodes, batch_size, self.output_seq_len))
        return tf.transpose(
            output, [1, 2, 0]
        )  # returns Tensor of shape (batch_size, output_seq_len, num_nodes)

Model training

데이터를 다시 살펴보면 아래와 같습니다.

  • X 데이터는 (64, 12, 26, 1) ~ (batch_size, input_sequence_length(각 도로별 과거 체증값), 도로 노드, 도로 특성)
  • Y 데이터는 (64, 3, 26) ~ (batch_size, 예측 시점 수, 도로 노드)
in_feat = 1
batch_size = 64
epochs = 20
input_sequence_length = 12
forecast_horizon = 3
multi_horizon = False
out_feat = 10
lstm_units = 64
graph_conv_params = {
    "aggregation_type": "mean",
    "combination_type": "concat",
    "activation": None,
}

st_gcn = LSTMGC(
    in_feat,
    out_feat,
    lstm_units,
    input_sequence_length,
    forecast_horizon,
    graph,
    graph_conv_params,
)
inputs = layers.Input((input_sequence_length, graph.num_nodes, in_feat))
outputs = st_gcn(inputs)

model = keras.models.Model(inputs, outputs)
model.compile(
    optimizer=keras.optimizers.RMSprop(learning_rate=0.0002),
    loss=keras.losses.MeanSquaredError(),
)
model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=epochs,
    callbacks=[keras.callbacks.EarlyStopping(patience=10)],
)

Making forecasts on test set

학습된 모델로 테스트셋을 활용하여 예측 해봅니다. 아래에서 모델 예측값으로 얻은 MAE와 단순(naive) MAE를 비교합니다. 단순 예측는 각 노드의 마지막 값이 사용됩니다.

# (3788, 12, 26, 1), (3788, 1, 26)
x_test, y = next(test_dataset.as_numpy_iterator())
y_pred = model.predict(x_test)
plt.figure(figsize=(18, 6))
plt.plot(y[:, 0, 0])
plt.plot(y_pred[:, 0, 0])
plt.legend(["actual", "forecast"])

# naive는 마지막 값과 비교
naive_mse, model_mse = (
    np.square(x_test[:, -1, :, 0] - y[:, 0, :]).mean(),
    np.square(y_pred[:, 0, :] - y[:, 0, :]).mean(),
)
print(f"naive MAE: {naive_mse}, model MAE: {model_mse}")

LSTMGC block을 쌓으면 좀 더 좋은 결과를 얻을 수 있습니다.