Рассматривается нейронная сеть, выполняющая прогнозирование на временном ряду.
Временной ряд формируется как арифметическая прогрессия:
an = an – 1 + d
Временной ряд генерируется в виде списка функцией makeListData. В частности, может быть сформирован следующий список из 29 элементов:
[5, 15, 25, 35, 45, 55, 65, 75, 85, 95, 105, 115, 125, 135, 145, 155, 165, 175, 185, 195, 205, 215, 225, 235, 245, 255, 265, 275, 285]
Серия данных (тип pandas.core.frame.DataFrame) формируется функцией series_to_supervised.
Получив вышеприведенный список, функция вернет следующую серию:
var1(t-3) var1(t-2) var1(t-1) var1(t)
3 5.0 15.0 25.0 35
4 15.0 25.0 35.0 45
5 25.0 35.0 45.0 55
...
28 255.0 265.0 275.0 285
Задача сети научится прогнозировать по фрейму var1(t-3), var1(t-2), var1(t-1), обнаруженному в точках временной оси t-3, t-2, t-1, значение в точке t.
Так, если обучить сеть по приведенной выше серии и задать для проверки фрейм 27, 37, 47, то правильным прогнозом будет число 57.
Используется рекуррентная сеть. Ее структура показана рис. 1.
Рис. 1. Структура сети
Ниже приводится описание сети.
Layer (type) Output Shape Param #
==========================================
input_1 (InputLayer) (None, 3, 1) 0
______________________________________________
lstm_1 (LSTM) (None, 40) 6720
______________________________________________
dense_1 (Dense) (None, 1) 41
==========================================
Total params: 6,761
Trainable params: 6,761
Non-trainable params: 0
Исходная серия данных делится на обучающую и тестовую:
train, test = train_test_split(dataFrame, test_size = 0.2)
После чего в полученных частях выделяются фреймы var1(t-3), var1(t-2), var1(t-1) и серии var1(t):
train_x, train_y = train.iloc[:, :n_in], train.iloc[:, n_in]
test_x, test_y = test.iloc[:, :n_in], test.iloc[:, n_in]
Тип train_x, test_x – pandas.core.frame.DataFrame, train_y, test_y – pandas.core.series.Series.
Перед обучением из фреймов извлекаются массивы типа numpy.ndarray:
arr_train_x = train_x.values
arr_test_x = test_x.values
Далее форма массивов приводится к виду, необходимому для слоя LSTM:
dim0 = arr_train_x.shape[0]
dim1 = arr_train_x.shape[1]
arr_train_x = arr_train_x.reshape((dim0, dim1, 1))
dim0 = arr_test_x.shape[0]
arr_test_x = arr_test_x.reshape((dim0, dim1, 1))
Массив arr_test_x до изменения своей формы:
[[115. 125. 135.] [ 25. 35. 45.] [105. 115. 125.] [ 55. 65. 75.] [175. 185. 195.] [ 45. 55. 65.]]
Массив arr_test_x после изменения своей формы:
[[[115.] [125.] [135.]] [[ 25.] [ 35.] [ 45.]] [[105.] [115.] [125.]] [[ 55.] [ 65.] [ 75.]] [[175.] [185.] [195.]] [[ 45.] [ 55.] [ 65.]]]
После обучения проверка сети на тестовой выборке показала следующий результат (жирно выделен самый неудачный прогноз):
Прогноз – точное значение:
145.25986 – 145
57.204433 – 55
135.0096 – 135
84.314 – 85
205.25085 – 205
75.30037 - 75
Среднеквадратичная ошибка: 0.9251
Код на Pyton:
# Прогнозирование одномерного временного ряда рекуррентной нейронной сетью
import numpy as np
from numpy import array
from keras.utils import plot_model
from keras.models import Model
from keras.layers import Input, LSTM
from keras.layers import Dense
from sklearn.model_selection import train_test_split
from pandas import DataFrame
from pandas import concat
import sys # Для sys.exit()
#
def series_to_supervised(data, n_in = 1, n_out = 1, dropnan = True):
"""
Формирует серии данных.
Параметры:
data - наблюдения в виде списка или NumPy-массива;
n_in - длина входной серии;
n_out - длина выходной серии;
dropnan: если True, то будут удалены NaN-величины.
Возвращает серию данных, приспособленную для обучения (Pandas DataFrame).
"""
n_vars = 1 if type(data) is list else data.shape[1]
df = DataFrame(data)
cols, names = list(), list()
# Входная последовательность (t - n, ... t - 1)
for i in range(n_in, 0, -1):
cols.append(df.shift(i))
names += [('var%d(t-%d)' % (j + 1, i)) for j in range(n_vars)]
# Истинный прогноз (t, t + 1, ... t + n)
for i in range(0, n_out):
cols.append(df.shift(-i))
if i == 0:
names += [('var%d(t)' % (j+1)) for j in range(n_vars)]
else:
names += [('var%d(t+%d)' % (j + 1, i)) for j in range(n_vars)]
# Создаем фрейм данных (class 'pandas.core.frame.DataFrame')
agg = concat(cols, axis = 1) # Столбцы
agg.columns = names # Имена столбцов
# Удаляем NaN-величины
if dropnan:
agg.dropna(inplace = True)
return agg
#
def makeListData(a1, d, n):
listData = [a1]
for i in range(2, n):
listData.append(a1 + (i - 1) * d)
return listData
#
def createModel(n_in):
visible = Input(shape = (n_in, 1))
hidden1 = LSTM(40, activation = 'relu')(visible)
output = Dense(1)(hidden1)
model = Model(inputs = visible, outputs = output)
model.compile(optimizer = 'adam', loss = 'mse')
model.summary()
## plot_model(model, to_file = 'lstm_graph.png')
return model
#
# Задание затравки датчика случайных чисел обеспечит повторяемость результата
np.random.seed(348)
#
# Формируем арифметическую прогрессию
listData = makeListData(5, 10, 30)
n_in = 3
dataFrame = series_to_supervised(listData, n_in)
#
# Делим dataFrame на обучающую и тестовую выборки
train, test = train_test_split(dataFrame, test_size = 0.2) # sklearn
# Выделяем в каждой выборке фреймы var1(t-3), var1(t-2), var1(t-1) и серию var1(t)
train_x, train_y = train.iloc[:, :n_in], train.iloc[:, n_in]
test_x, test_y = test.iloc[:, :n_in], test.iloc[:, n_in]
#
# Получаем массивы типа numpy.ndarray
arr_train_x = train_x.values
arr_test_x = test_x.values
#
# Изменяем форму [образец, интервал] на [образец, интервал, характеристика]
dim0 = arr_train_x.shape[0]
dim1 = arr_train_x.shape[1]
arr_train_x = arr_train_x.reshape((dim0, dim1, 1))
dim0 = arr_test_x.shape[0]
arr_test_x = arr_test_x.reshape((dim0, dim1, 1))
##sys.exit()
# Создаем модель
model = createModel(n_in)
#
print('\nОбучение')
model.fit(arr_train_x, train_y, epochs = 1500, verbose = 0)
print('\nТестирование')
scores = model.evaluate(arr_test_x, test_y, verbose = 2, )
print("%s: %.2f%%" % (model.metrics_names[0], scores)) # loss (потери)
#
print('\nПрогноз - точное значение:')
y_pred = model.predict(arr_test_x, verbose = 0)
err = 0.0
for i in range(len(y_pred)):
print(str(y_pred[i][0]) + ' - ' + str(test_y.iloc[i]))
dif = y_pred[i][0] - test_y.iloc[i]
err += dif * dif
err /= len(test)
print('Среднеквадратичная ошибка: ', round(err, 4))