加密货币价格预测AI模型
飞行和预测股价:人类最伟大的两大梦想。虽然长出翅膀似乎是不可能的,但让我们专注于用深度学习预测“加密货币”的价格!
一键发币: SOL | BNB | ETH | BASE | Blast | ARB | OP | POLYGON | AVAX | FTM | OK
本教程将介绍如何将学习如何利用机器学习库/框架 PyTorch 来构建学习算法。我们的重点将放在加密货币 ADA 上,它在 Cardano 区块链上运行。
与大多数仅将价格视为模型输入特征的教程不同,我们将结合交易量和交易量。通过实施滑动窗口方法,我们将引入展望差距(outlook gap),这是一种不太常用的技术。探索各种模型架构和优化器,我将粗略地解释它们的工作原理。
此外,我们的目标是提高模型的性能。最后,我将谈谈为什么简单的脚本和数值数据可能不足以准确预测价格:股票、加密货币或黄油。随便。
我的目的不是专门推广加密货币。访问它们的历史价格很方便。主要关注的是 PyTorch 和机器学习。
如果你想跳过讲解直接看代码,请查看这个 Colab笔记本。
1、让我们获取一些数据
我们将利用 ADA 的股票价格,ADA 是一种在 Cardano 区块链上运行的加密货币
Kraken 提供了一个广泛的历史数据档案,涵盖了不同时间范围内的数十种货币。我选择了 60 分钟的时间分辨率。将这些数据加载到 Pandas 数据框中并不是一个重大挑战:
df = pd.read_csv("data/ADAEUR_60.csv")
df['date'] = pd.to_datetime(df['timestamp'], unit='s', errors='coerce')
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)
df
2、可视化,可视化,可视化
这是 Daniel Bourke 最重要的教导之一——他是一位出色的 PyTorch 和机器学习导师,可能还有更多。作为可视化的爱好者,让我们通过将收盘价和交易量绘制到一张图表上来开始,以深入了解我们的数据。
# Downsample the data to not crash the plotting mechanism, we don't need to plot everything in the dataset
downsampled_df = df.resample('1D').mean()
# close price to the left y axis
plt.plot(downsampled_df.index, downsampled_df['close'], label='Close', color='blue')
plt.ylabel('Close', color='blue')
plt.tick_params(axis='y', labelcolor='blue')
# duplicate to get a second y axis on the right and plot the volune
ax2 = plt.twinx()
ax2.plot(downsampled_df.index, downsampled_df['volume'], label='Volume', color='red')
ax2.set_ylabel('Volume', color='red')
ax2.tick_params(axis='y', labelcolor='red')
# Title and legend
plt.title('Close Price vs. Volume')
plt.show()py
输出结果如下:
我可能不是图表分析师,但我的看法是:即使我在 2020 年购买了 100 万 ADA,并在 2021 年卖出,我可能仍会在这里写这篇文章。写作给我带来快乐。
3、准备
让我们深入研究编写机器学习代码的领域。我将从设置一些超参数开始。你可能还没有掌握其中的大部分,但我们稍后会详细介绍它们。就目前而言,直接承认它们的存在是有益的。
hidden_units = 64
num_layers = 4
learning_rate = 0.001
learning_rate_step_size=5
learning_rate_gamma=0.9
num_epochs = 100
batch_size = 32
window_size = 14
prediction_steps = 7
dropout_rate = 0.2
# stats and metrics
epoch_count = []
train_loss_values = []
test_loss_values = []
train_rmse_values = []
test_rmse_values = []
duration = 0
features = ['close', 'volume', 'trades']
target = 'close'
hidden_units
:隐藏单元,我们模型中的内部记忆单元的数量 — 将它们可视化为辅助学习过程的小算法。num_layers
:表示模型中的层数,其中每层都有自己的记忆单元。learning_rate
:学习率,表示模型在每个步骤后调整其权重和偏差(参数)的幅度。learning_rate_step_size
:学习率步长,确定学习率应增加或减少的频率。learning_rate_gamma
:学习率伽玛,用作调节学习率的乘数。num_epochs
:表示我们的主循环在训练过程中将经历的迭代次数。batch_size
:批次大小,指定在给定实例中处理的数据量。window_size
:窗口大小,定义我们预测时考虑的过去数据点的数量。prediction_steps
:预测步骤,表示在到达我们要预测的数据点之前要跳过的数据点数。dropout_rate
:确定模型中设置为零的节点比例。
最后,我们将指定用于模型输入的列:收盘价、交易量、交易量,当然还有包含我们想要预测的价格的列:收盘价。
4、数据标准化
另一个重要步骤是将数据分成训练数据集和测试数据集。首先,我们对数据进行标准化,这是一个关键过程,不仅可以加速计算,还可以提高模型的整体质量。
# right now start with a small sample, as soon as we have enough computing power, we can skip this step
df_sampled = df[features].head(1000).copy()
#scaler = MinMaxScaler() # MinMax would work, too, but in fact a stock price has not really "min/max values", except the 0 ;)
scaler = StandardScaler()
# Extract the selected features and transform them
selected_features = df_sampled[features].values.reshape(-1, len(features))
scaled_features = scaler.fit_transform(selected_features)
# Replace the original features with the scaled features in the DataFrame
df_sampled[features] = scaled_features
5、滑动窗口法
仅考虑当前价格来预测后续数据点可能会引入偏差,并由于缺乏足够的背景信息而限制模型的学习能力。仅依靠前一个数据点可能会导致模型过度适应最近的变化,从而可能阻碍其对未来值的预测能力。
为了缓解这种情况,我们实施了滑动窗口法,将多个数据点收集在一起。此外,我们引入了预测差距。我们不再预测下一小时的价格,因为这需要快速响应并可能导致过度拟合,而是将重点转移到预测十小时后的价格。这种长期预测可以做出明智的买卖决策。
因此,我们生成两个 NumPy 数组:一个包含特征 (X),另一个 (y) 包含所有标签/目标,代表我们要预测的价格。
大小写混合可能会造成混淆。通常,表示标量和向量的变量使用小写名称,而矩阵和张量则用大写名称表示。
def create_sequences(data, window_size, prediction_steps, features, label):
X = []
y = []
for i in range(len(data) - window_size - prediction_steps + 1):
sequence = data.iloc[i:i + window_size][features]
target = data.iloc[i + window_size + prediction_steps - 1][label]
X.append(sequence)
y.append(target)
return np.array(X), np.array(y)
X, y = create_sequences(df_sampled, window_size, prediction_steps, features, target)
6、拆分和批处理
现在,我们将数据分为训练数据集和测试数据集,这是机器学习中的常见步骤。接下来,我们将通过将数据组织到辅助对象中来满足一些技术要求。然后,该对象将为训练过程提供批量数据点。
# SPLITTING
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
# BATCHING
X_train_tensor = torch.Tensor(X_train)
y_train_tensor = torch.Tensor(y_train)
X_test_tensor = torch.Tensor(X_test)
y_test_tensor = torch.Tensor(y_test)
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
7、验证
为了确保在此过程中不会丢失任何数据,我们可以轻松地逆转该过程。我不会在这里发布代码;你可以在笔记本中找到它。此外,我还添加了一个小部分,用于生成合成数据以复制整个数据整理过程。我们稍后会重新讨论这一点,所以请记住这一点。
最初的 14 行包含我们从“收盘价”、“成交量”和“交易”列中得出的特征。还记得我们如何将滑动窗口大小设置为 14 吗?值得注意的是,7 步之后,在位置 21,根据预测步骤,我们的收盘价标记为 21——这是我们的目标!
此外,我还实现了几个函数来恢复规范化、拆分和批处理过程。这些有助于我们验证数据准备是否准确执行。此处的 DataFrame
展示了将提供给 DataLoader
的输入。
前三列代表特征,第四列标记为“收盘价”,用作我们的模型试图预测的目标收盘价或“标签”。虽然由于舍入误差,它读起来是 2.999 左右而不是精确的 3,但看起来我们正朝着正确的方向前进。
8、初始化模型、损失函数和优化器
要训练模型,我们需要三个主要组件:模型、损失函数和优化器:
- 模型在第一层循环(又称 epoch)中执行预测工作,通常它使用完全随机的参数。还记得上面的 X 吗?它获取 X(也称为输入或特征)并计算 y,也称为输出、目标或标签。
- 损失函数计算预测输出与实际输出之间的差异,实际输出也是训练数据集的一部分。
- 优化器获取损失并决定如何调整模型参数。
- 然后一切重新开始,直到我们达到循环限制(num_epochs)
9、模型类
我们将开始尝试 LSTM(长短期记忆)方法,这是一种循环神经网络 (RNN)。与通过层处理数据以生成输出的典型神经网络不同,循环神经网络可以整合过去的信息。这种能力由其记忆单元实现。以下是模型类:
class StockPriceLSTM(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size=1):
super(StockPriceLSTM, self).__init__()
self.hidden_size = hidden_size # Size of the hidden state in the LSTM
self.num_layers = num_layers # Number of LSTM layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) # LSTM layer
self.fc = nn.Linear(hidden_size, output_size) # Fully connected layer for output prediction
def forward(self, input_data):
# Initialize hidden and cell states for LSTM
initial_hidden = torch.zeros(self.num_layers, input_data.size(0), self.hidden_size).to(input_data.device)
initial_cell = torch.zeros(self.num_layers, input_data.size(0), self.hidden_size).to(input_data.device)
# Forward propagate through LSTM
lstm_output, _ = self.lstm(input_data, (initial_hidden, initial_cell)) # Output shape: (batch_size, seq_length, hidden_size)
# Pass the output of the last time step through the fully connected layer
last_time_step_output = lstm_output[:, -1, :] # Extract the output from the last time step
output = self.fc(last_time_step_output) # Output shape: (batch_size, output_size)
return output
从 nn.Module
继承时,必须定义两个方法: __init__
和 forward
。`init` 方法设置模型的结构。在本例中,它配置 LSTM 部分和用于输出的线性层。 forward
方法处理计算。它接收输入数据(我们的特征),通过我们模型的模块迭代处理它,并返回输出。
10、损失函数
为了测量损失,我们计算均方误差 (MSE)。听起来很复杂,但基本上它只是模型输出和实际值之间的差,平方并除以数据点的数量:
有几种可用于计算损失的选项。选择适当的损失函数是微调建模过程的关键方面,取决于目标和数据的性质。例如,在图像分类中,通常使用交叉熵损失。
11、优化器
优化器确定如何调整模型的参数。根据具体用例,有多种优化器可供选择。简而言之,优化器的作用如下:它根据学习率和损失修改模型的参数。
12、运行循环
在开始循环之前,我们是这样初始化它们:
model_1 = PricePredictionLSTM(input_size=len(features), hidden_size=hidden_units, num_layers=num_layers)
loss_fn = nn.MSELoss()
optimizer = torch.optim.AdamW(model_1.parameters(), lr=learning_rate)
当模型是大脑时,这就是我们工作的核心“循环”。根据“非官方优化循环歌曲”,步骤始终相同:
- 激活训练模式:如果你的模型包含在训练和推理期间表现不同的组件(如 dropout 或批量规范化),请将模型设置为训练模式。
- 计算预测:通过模型传递输入数据以获得预测。
- 计算损失:计算预测和实际目标之间的损失。
- 将梯度重置为零:使用
optimizer.zero_grad()
清除上一次迭代的梯度。 - 反向传播(计算梯度):通过调用
loss.backward()
执行反向传播,以计算相对于模型参数的损失梯度。 - 更新参数:通过调用
optimizer.step()
根据计算出的梯度更新模型的参数。 - 重新开始……
以下代码还包含几个附加功能,用于跟踪损失值、时间测量以及最后但并非最不重要的一点:根据我们的测试数据集验证模型!
这里的关键方面是使用 torch.inference_mode()
。当你的模型不在训练模式时,它应该在此上下文中运行以避免干扰梯度。有时,你可能会遇到 torch.no_grad()
,它本质上具有相同的用途,但是一种较旧的实现。
我们还计算了准确度指标。与损失类似,它涉及计算均方误差 (MSE),然后取平方根。但是,这并不是完全必要的,因为它可能不会提供损失值以外的其他见解。
start = time.time()
start_epoch = 0 if len(epoch_count) == 0 else epoch_count[-1] # helpful if you start over this particular cell
for epoch in tqdm(range(start_epoch, start_epoch + num_epochs)): # tqdm is our progress bar wrapper
model_1.train() # activate training mode
# handle loss monitoring
total_train_loss = 0.0
all_train_targets = []
all_train_outputs = []
# process batches in the training dataloader
for batch_idx, (inputs, targets) in enumerate(train_dataloader):
optimizer.zero_grad() # reset gradients
outputs = model_1(inputs) # calculate predictions
loss = loss_fn(outputs.squeeze(), targets) # calculat the loss
loss.backward() # backward propagation
optimizer.step() # update parameters
total_train_loss += loss.item()
all_train_targets.extend(targets.numpy())
all_train_outputs.extend(outputs.detach().numpy())
# scheduler.step()
model_1.eval() # activate eval mode
# handle loss monitoring
total_test_loss = 0.0
all_test_targets = []
all_test_outputs = []
# process batches in the testing dataloader
for i, (inputs, targets) in enumerate(test_dataloader):
with torch.inference_mode(): # activate inference mode/no grad
outputs = model_1(inputs) # calculate predictions
loss = loss_fn(outputs.squeeze(), targets) # calculate loss
# monitor loss
total_test_loss += loss.item()
all_test_targets.extend(targets.numpy())
all_test_outputs.extend(outputs.detach().numpy())
# calculate average epoch losses
average_epoch_train_loss = total_train_loss / len(train_dataloader)
average_epoch_test_loss = total_test_loss / len(test_dataloader)
# caculate accuracy
train_rmse = math.sqrt(mean_squared_error(all_train_targets, all_train_outputs))
test_rmse = math.sqrt(mean_squared_error(all_test_targets, all_test_outputs))
# VISUALIZE
epoch_count.append(epoch)
train_loss_values.append(average_epoch_train_loss)
test_loss_values.append(average_epoch_test_loss)
train_rmse_values.append(train_rmse)
test_rmse_values.append(test_rmse)
# LOG
if epoch % int(num_epochs / 10) == 0 or epoch == num_epochs - 1:
current_lr = scheduler.get_last_lr()[0]
print(f"Epoch [{epoch + 1}/{start_epoch + num_epochs}], "
f"Train Loss: {average_epoch_train_loss:.4f} | "
f"Test Loss: {average_epoch_test_loss:.4f} | "
f"Train RMSE: {train_rmse:.4f} | "
f"Test RMSE: {test_rmse:.4f} | "
f"Current LR: {current_lr:.8f} | "
f"Duration: {time.time() - start:.0f} seconds")
duration += time.time() - start
13、等等……停……那是什么?学习率调度程序?
如果你仔细阅读了循环,你可能会注意到一个注释掉的命令: scheduler.step()
。它有什么用?
学习率通常保持不变,并决定算法如何在每一步中调整模型的参数。调度程序有助于动态改变这个步长。虽然并非总是必要的,但尝试使用调度程序可能会有所帮助,有可能在较短的时间内改善结果。
这是初始化它的方式:
scheduler = lr_scheduler.StepLR(
optimizer,
step_size=learning_rate_step_size,
gamma=learning_rate_gamma # Adjust step_size and gamma as needed
)
14、运行
现在我们已准备好运行所有内容并查看训练/优化过程的实际效果。为了获得第一印象,一切运行得如何,我建议从我们上面定义的虚拟数据集开始。我不会在这里发布绘图代码。
你的绘图应该是这样的:
结果还不错,而且是在 27 秒内完成的。但考虑到源数据,这并不令人意外。
让我们通过模型运行我们的现实世界数据。这不会花费太长时间, 100 个 epoch 之后的结果看起来……有点不对劲:
让我们面对现实:在第一次尝试中,你不能指望在像这样的小数据集上仅进行 4 分钟的训练后就能获得实质性的结果。
此过程涉及广泛的超参数调整和选择网络架构。现在,让我们探索另一种方法:GRU,代表门控循环单元,是另一种循环神经网络。与 LSTM 相反,GRU 具有简化的架构。具体而言,GRU 使用两个门来管理数据流(重置和更新),而 LSTM 使用三个门(输入、遗忘和输出)。LSTM 非常适合处理复杂的依赖关系,而 GRU 则降低了计算复杂度。这是我们的模型类:
class PricePredictionGRU(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size=1):
super(PricePredictionGRU, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
# Initialize hidden state
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
# Forward propagate GRU
out, _ = self.gru(x, h0) # out: tensor of shape (batch_size, seq_length, hidden_size)
# Pass the output of the last time step through the fully connected layer
out = self.fc(out[:, -1, :]) # Output shape: (batch_size, output_size)
return out
如你所见,它的结构与之前完全相同。这是因为 PyTorch 为模型提供了“模板”,我们只需正确地输入它们即可。
这是 200 个 epoch 之后的结果:
尽管运行时间几乎是预期时间的两倍,与承诺的计算要求减少相矛盾,但结果似乎明显改善。然而,它们仍然不足以准确预测股价。
15、未来的想法
首先,让我举一个例子来展示许多“价格预测”讨论中的常见缺陷:它们通常专注于预测下一个数据点,而不考虑未来的数据点。
这种方法大大简化了模型的任务。例如,如果给出今天的收盘价,预测明天的收盘价会变得相对容易,因为价格波动的典型范围。
下图显示了将预测差距设置为 1 时的预测,这意味着预测下一个数据点:
这里我将验证数据的样本量增加到 10.000:
这并不可怕;至少曲线在某种程度上与总体趋势一致。最终,模型的质量取决于分配的计算能力和选择的超参数。
原文链接:How to predict stock or crypto prices. In 2024. With Python. And PyTorch.
DefiPlot翻译整理,转载请标明出处