介绍
信不信由你,人类不断地被动地预测事物——即使是最微小或看似琐碎的事情。在过马路时,我们预测汽车会在哪里,以便安全地过马路,或者我们试图预测在我们试图接住球时球会在哪里。我们不需要知道汽车的确切速度或影响球的精确风向,就能执行这些任务——它们对我们来说更加自然和明显。这些能力是通过少数事件进行调整的,在多年的经验和实践中,它们使我们能够在不可预测的现实中导航。我们在这方面失败的地方,是当我们在积极预测大规模现象,如天气或经济在一年内的表现时,有太多因素需要考虑。
这就是计算机的能力发挥作用的地方——填补我们无法将即使是最看似随机的事件与未来事件联系起来的能力差距。众所周知,计算机非常擅长在多次迭代中执行特定的任务——我们可以利用这一点来预测未来。
什么是“时间序列”?
时间序列是指在一段时间内发生的任何可量化的度量或事件。尽管这听起来很琐碎,但几乎任何事情都可以被认为是时间序列。一个月内每小时的平均心率、一年内某只股票的每日收盘价或一年内某个城市每周的车辆事故数量。在任何统一的时间段内记录这些信息都被视为时间序列。敏锐的人会注意到,在这些例子中,每个例子都有一个频率(每天、每周、每小时等)和一个时间长度(一个月、一年、一天等)。
对于时间序列,指标应该在我们观察指标的时间长度内以统一的频率记录。换句话说,每个记录之间的时间应该是相同的。
在本教程中,我们将探讨如何使用过去的时间序列数据来预测未来可能发生的情况。
目标
该算法的目标是能够接受一系列值,并预测序列中的下一个值。最简单的方法是使用一个自回归模型,然而,这已经被其他作者广泛介绍了,因此我们将专注于一种更深度学习的方法,使用循环神经网络。我已经在这里链接了实现笔记本。本教程中使用的数据集是Kaggle竞赛中使用的数据集,可以在此处找到。
数据准备
让我们看一下样本时间序列。下图显示了2013年至2018年油价的一些数据。
图片由作者提供
这只是在日期轴上绘制的一系列数字。下表显示了此时间序列的前10个条目。仅从日期列看,很明显我们有每日频率的价格数据。
date dcoilwtico
2013-01-01 NaN
2013-01-02 93.14
2013-01-03 92.97
2013-01-04 93.12
2013-01-07 93.20
2013-01-08 93.21
2013-01-09 93.08
2013-01-10 93.81
2013-01-11 93.60
2013-01-14 94.27
许多机器学习模型在规范化数据上表现更好。规范化数据的标准方法是将其转换为每列的平均值为0,标准差为1。下面的代码提供了一种使用scikit-learn库实现这一点的方法。
from sklearn.preprocessing import StandardScaler
# Fit scalers
scalers = {}
for x in df.columns:
scalers[x] = StandardScaler().fit(df[x].values.reshape(-1, 1))
# Transform data via scalers
norm_df = df.copy()
for i, key in enumerate(scalers.keys()):
norm = scalers[key].transform(norm_df.iloc[:, i].values.reshape(-1, 1))
norm_df.iloc[:, i] = norm
我们还要确保我们的数据具有统一的频率——在这个例子中,我们有这5年中每天的油价,所以这很好。如果对于你的数据来说不是这种情况,Pandas有几种不同的方法来重新采样数据以适应统一的频率。
排序
完成这个步骤后,我们将使用时间序列并生成固定长度的片段或序列。在记录这些序列的同时,我们还将记录紧随该序列发生的值。例如:假设我们有一个序列:[1,2,3,4,5,6]。
通过选择长度为3的序列,我们可以生成以下序列及其相关的目标:
[序列]:目标
[1,2,3] → 4
[2,3,4] → 5
[3,4,5] → 6
另一种看待这个问题的方法是,我们定义了多少步来预测下一个值。我们将称这个值为训练窗口,要预测的值的数量为预测窗口。在这个例子中,它们分别是3和1。下面的函数详细说明了如何完成这一任务。
# Defining a function that creates sequences and targets as shown above
def generate_sequences(df: pd.DataFrame, tw: int, pw: int, target_columns, drop_targets=False):
'''
df: Pandas DataFrame of the univariate time-series
tw: Training Window - Integer defining how many steps to look back
pw: Prediction Window - Integer defining how many steps forward to predict
returns: dictionary of sequences and targets for all sequences
'''
data = dict() # Store results into a dictionary
L = len(df)
for i in range(L-tw):
# Option to drop target from dataframe
if drop_targets:
df.drop(target_columns, axis=1, inplace=True)
# Get current sequence
sequence = df[i:i+tw].values
# Get values right after the current sequence
target = df[i+tw:i+tw+pw][target_columns].values
data[i] = {'sequence': sequence, 'target': target}
return data
PyTorch 要求我们以以下方式将数据存储在数据集类中:
class SequenceDataset(Dataset):
def __init__(self, df):
self.data = df
def __getitem__(self, idx):
sample = self.data[idx]
return torch.Tensor(sample['sequence']), torch.Tensor(sample['target'])
def __len__(self):
return len(self.data)
然后,我们可以使用 PyTorch DataLoader 迭代数据。使用 DataLoader 的好处是它在内部处理批处理和洗牌,因此我们不必为自己实现这些功能而担心。
经过以下代码,训练批次最终准备好了:
# Here we are defining properties for our model
BATCH_SIZE = 16 # Training batch size
split = 0.8 # Train/Test Split ratio
sequences = generate_sequences(norm_df.dcoilwtico.to_frame(), sequence_len, nout, 'dcoilwtico')
dataset = SequenceDataset(sequences)
# Split the data according to our split ratio and load each subset into a
# separate DataLoader object
train_len = int(len(dataset)*split)
lens = [train_len, len(dataset)-train_len]
train_ds, test_ds = random_split(dataset, lens)
trainloader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)
testloader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)
在每次迭代中,DataLoader 将产生16(批量大小)个序列及其相关的目标,我们将将其传递到模型中。
模型架构
下面的类在_PyTorch_中定义了这个架构。我们将使用单个LSTM层,然后使用一些密集层来进行模型的回归部分,并在它们之间使用了一些丢弃层。该模型将为每个训练输入输出单个值。
class LSTMForecaster(nn.Module):
def __init__(self, n_features, n_hidden, n_outputs, sequence_len, n_lstm_layers=1, n_deep_layers=10, use_cuda=False, dropout=0.2):
'''
n_features: number of input features (1 for univariate forecasting)
n_hidden: number of neurons in each hidden layer
n_outputs: number of outputs to predict for each training example
n_deep_layers: number of hidden dense layers after the lstm layer
sequence_len: number of steps to look back at for prediction
dropout: float (0 < dropout < 1) dropout ratio between dense layers
'''
super().__init__()
self.n_lstm_layers = n_lstm_layers
self.nhid = n_hidden
self.use_cuda = use_cuda # set option for device selection
# LSTM Layer
self.lstm = nn.LSTM(n_features,
n_hidden,
num_layers=n_lstm_layers,
batch_first=True) # As we have transformed our data in this way
# first dense after lstm
self.fc1 = nn.Linear(n_hidden * sequence_len, n_hidden)
# Dropout layer
self.dropout = nn.Dropout(p=dropout)
# Create fully connected layers (n_hidden x n_deep_layers)
dnn_layers = []
for i in range(n_deep_layers):
# Last layer (n_hidden x n_outputs)
if i == n_deep_layers - 1:
dnn_layers.append(nn.ReLU())
dnn_layers.append(nn.Linear(nhid, n_outputs))
# All other layers (n_hidden x n_hidden) with dropout option
else:
dnn_layers.append(nn.ReLU())
dnn_layers.append(nn.Linear(nhid, nhid))
if dropout:
dnn_layers.append(nn.Dropout(p=dropout))
# compile DNN layers
self.dnn = nn.Sequential(*dnn_layers)
def forward(self, x):
# Initialize hidden state
hidden_state = torch.zeros(self.n_lstm_layers, x.shape[0], self.nhid)
cell_state = torch.zeros(self.n_lstm_layers, x.shape[0], self.nhid)
# move hidden state to device
if self.use_cuda:
hidden_state = hidden_state.to(device)
cell_state = cell_state.to(device)
self.hidden = (hidden_state, cell_state)
# Forward Pass
x, h = self.lstm(x, self.hidden) # LSTM
x = self.dropout(x.contiguous().view(x.shape[0], -1)) # Flatten lstm out
x = self.fc1(x) # First Dense
return self.dnn(x) # Pass forward through fully connected DNN.
这个类是我构建的一个插件式 Python 类,可以根据我们选择的参数动态构建一个神经网络(这种类型的),从而添加或删除模型参数。更多的参数意味着更多的模型复杂性和更长的训练时间,因此请务必参考你的用例,确定对于你的数据最好的是什么。作为一个随意选择,让我们创建一个具有 5 个完全连接的层和每个层有 50 个神经元的长短期记忆模型,每个批次中的每个训练示例都以单个输出值结束。这里,“sequence_len”是训练窗口,“nout”定义了要预测的步骤数;将“sequence_len”设置为 180,将“nout”设置为 1,意味着模型将查看 180 天(半年)的历史数据来预测明天会发生什么。
nhid = 50 # Number of nodes in the hidden layer
n_dnn_layers = 5 # Number of hidden fully connected layers
nout = 1 # Prediction Window
sequence_len = 180 # Training Window
# Number of features (since this is a univariate timeseries we'll set
# this to 1 -- multivariate analysis is coming in the future)
ninp = 1
# Device selection (CPU | GPU)
USE_CUDA = torch.cuda.is_available()
device = 'cuda' if USE_CUDA else 'cpu'
# Initialize the model
model = LSTMForecaster(ninp, nhid, nout, sequence_len, n_deep_layers=n_dnn_layers, use_cuda=USE_CUDA).to(device)
模型训练
定义了模型后,我们可以选择损失函数和优化器,设置学习率和迭代次数,并开始训练循环。由于这是一个回归问题(即我们试图预测一个连续值),因此损失函数的一个安全选择是均方误差。这提供了一种强大的方法来计算实际值和模型预测值之间的误差。具体为:
来自 Google 的图片。
优化器对象存储和计算所有需要进行反向传播的梯度。
# Set learning rate and number of epochs to train over
lr = 4e-4
n_epochs = 20
# Initialize the loss function and optimizer
criterion = nn.MSELoss().to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
以下是训练循环。在每次训练迭代中,我们将计算先前创建的训练集和验证集上的损失:
# Lists to store training and validation losses
t_losses, v_losses = [], []
# Loop over epochs
for epoch in range(n_epochs):
train_loss, valid_loss = 0.0, 0.0
# train step
model.train()
# Loop over train dataset
for x, y in trainloader:
optimizer.zero_grad()
# move inputs to device
x = x.to(device)
y = y.squeeze().to(device)
# Forward Pass
preds = model(x).squeeze()
loss = criterion(preds, y) # compute batch loss
train_loss += loss.item()
loss.backward()
optimizer.step()
epoch_loss = train_loss / len(trainloader)
t_losses.append(epoch_loss)
# validation step
model.eval()
# Loop over validation dataset
for x, y in testloader:
with torch.no_grad():
x, y = x.to(device), y.squeeze().to(device)
preds = model(x).squeeze()
error = criterion(preds, y)
valid_loss += error.item()
valid_loss = valid_loss / len(testloader)
v_losses.append(valid_loss)
print(f'{epoch} - train: {epoch_loss}, valid: {valid_loss}')
plot_losses(t_losses, v_losses)
显示每个 epoch 中的训练和验证损失的训练循环的示例输出。
现在,模型已经训练好了,我们可以评估我们的预测结果。
推理
在这里,我们只需调用训练好的模型来预测我们的未洗牌数据,并查看预测结果与真实观测值的差异。
def make_predictions_from_dataloader(model, unshuffled_dataloader):
model.eval()
predictions, actuals = [], []
for x, y in unshuffled_dataloader:
with torch.no_grad():
p = model(x)
predictions.append(p)
actuals.append(y.squeeze())
predictions = torch.cat(predictions).numpy()
actuals = torch.cat(actuals).numpy()
return predictions.squeeze(), actuals
历史上规范化的预测价格与实际价格的比较。图片由作者提供。
对于初次尝试,我们的预测结果看起来还不错!并且我们的验证损失和训练损失一样低,表明我们没有过度拟合模型,因此模型可以被认为是具有良好泛化能力的——这对于任何预测系统都是重要的。
有了对于这段时间内油价的一个相当不错的估计器,让我们看看我们是否可以使用它来预测未来会发生什么。
预测
如果我们将历史数据定义为预测时刻之前的序列,那么算法就很简单:
- 从历史数据中获取最新的有效序列(长度为训练窗口)。
- 将该序列输入模型并预测下一个值。
- 将预测值附加到历史数据中。
- 重复执行步骤 1,直到预测所需的步骤数量。 这里的一个注意点是,根据训练模型时选择的参数,预测的时间越长,模型受到自身偏差的影响就越大,开始预测平均值。因此,如果不必要,我们不希望总是预测太远,因为这会影响预测的准确性。
以下是实现此功能的函数:
def one_step_forecast(model, history):
'''
model: PyTorch model object
history: a sequence of values representing the latest values of the time
series, requirement -> len(history.shape) == 2
outputs a single value which is the prediction of the next value in the
sequence.
'''
model.cpu()
model.eval()
with torch.no_grad():
pre = torch.Tensor(history).unsqueeze(0)
pred = self.model(pre)
return pred.detach().numpy().reshape(-1)
def n_step_forecast(data: pd.DataFrame, target: str, tw: int, n: int, forecast_from: int=None, plot=False):
'''
n: integer defining how many steps to forecast
forecast_from: integer defining which index to forecast from. None if
you want to forecast from the end.
plot: True if you want to output a plot of the forecast, False if not.
'''
history = data[target].copy().to_frame()
# Create initial sequence input based on where in the series to forecast
# from.
if forecast_from:
pre = list(history[forecast_from - tw : forecast_from][target].values)
else:
pre = list(history[self.target])[-tw:]
# Call one_step_forecast n times and append prediction to history
for i, step in enumerate(range(n)):
pre_ = np.array(pre[-tw:]).reshape(-1, 1)
forecast = self.one_step_forecast(pre_).squeeze()
pre.append(forecast)
# The rest of this is just to add the forecast to the correct time of
# the history series
res = history.copy()
ls = [np.nan for i in range(len(history))]
# Note: I have not handled the edge case where the start index + n is
# before the end of the dataset and crosses past it.
if forecast_from:
ls[forecast_from : forecast_from + n] = list(np.array(pre[-n:]))
res['forecast'] = ls
res.columns = ['actual', 'forecast']
else:
fc = ls + list(np.array(pre[-n:]))
ls = ls + [np.nan for i in range(len(pre[-n:]))]
ls[:len(history)] = history[self.target].values
res = pd.DataFrame([ls, fc], index=['actual', 'forecast']).T
return res
让我们尝试一些情况。
让我们从系列中的不同位置进行预测,以便我们可以将预测与实际发生的情况进行比较。我们编码预测程序的方式使我们可以从任何地方进行预测并预测任何合理数量的步骤。红线显示预测结果。请记住,图表显示了 y 轴上的规范化价格。
从 2013 年 Q3 预测 200 天。图片由作者提供。
从 2014/15 年度末预测 200 天。图片由作者提供。
从 2016 年 Q1 预测 200 天。图片由作者提供。
从数据的最后一天预测 200 天。图片由作者提供。
这只是我们尝试的第一个模型配置!通过更多地尝试架构和实现,肯定可以让模型训练得更好,预测更准确。
结论
我们有了一个可以预测单变量时间序列中接下来会发生什么的模型。想想这可以应用的所有方式和场合,真是太酷了。是的,本文仅处理了单变量时间序列,其中只有一系列值。但是,有方法可以将多个测量不同事物的系列结合起来进行预测。这称为多变量时间序列预测,它主要只需要对模型架构进行一些微调,我将在未来的文章中进行介绍。
这种预测模型的真正魔力在于模型的 LSTM 层,以及它如何作为神经网络的循环层处理和记忆序列。有关不同类型的神经网络的更多信息,我强烈推荐观看 3blue1browns 视频。他有一个很棒的系列详细介绍这些算法的内部工作方式,非常直观。
感谢阅读,请务必查看我的其他文章!
参考文献:
时间序列数据 — https://www.kaggle.com/competitions/store-sales-time-series-forecasting/data?select=oil.csv抱歉,你提供的信息太少了,无法进行翻译。请提供具体的技术文章或内容。
评论(0)