加密货币价格预测AI模型

飞行和预测股价:人类最伟大的两大梦想。虽然长出翅膀似乎是不可能的,但让我们专注于用深度学习预测“加密货币”的价格!

加密货币价格预测AI模型
一键发币: SOL | BNB | ETH | BASE | Blast | ARB | OP | POLYGON | AVAX | FTM | OK

本教程将介绍如何将学习如何利用机器学习库/框架 PyTorch 来构建学习算法。我们的重点将放在加密货币 ADA 上,它在 Cardano 区块链上运行。

与大多数仅将价格视为模型输入特征的教程不同,我们将结合交易量和交易量。通过实施滑动窗口方法,我们将引入展望差距(outlook gap),这是一种不太常用的技术。探索各种模型架构和优化器,我将粗略地解释它们的工作原理。

此外,我们的目标是提高模型的性能。最后,我将谈谈为什么简单的脚本和数值数据可能不足以准确预测价格:股票、加密货币或黄油。随便。

我的目的不是专门推广加密货币。访问它们的历史价格很方便。主要关注的是 PyTorch 和机器学习。

如果你想跳过讲解直接看代码,请查看这个 Colab笔记本

1、让我们获取一些数据

我们将利用 ADA 的股票价格,ADA 是一种在 Cardano 区块链上运行的加密货币

Kraken 提供了一个广泛的历史数据档案,涵盖了不同时间范围内的数十种货币。我选择了 60 分钟的时间分辨率。将这些数据加载到 Pandas 数据框中并不是一个重大挑战:

df = pd.read_csv("data/ADAEUR_60.csv")
df['date'] = pd.to_datetime(df['timestamp'], unit='s', errors='coerce')
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)
df
从 2018 年开始,ADA @ Cardano 的几种价格

2、可视化,可视化,可视化

这是 Daniel Bourke 最重要的教导之一——他是一位出色的 PyTorch 和机器学习导师,可能还有更多。作为可视化的爱好者,让我们通过将收盘价和交易量绘制到一张图表上来开始,以深入了解我们的数据。

# Downsample the data to not crash the plotting mechanism, we don't need to plot everything in the dataset
downsampled_df = df.resample('1D').mean()

# close price to the left y axis
plt.plot(downsampled_df.index, downsampled_df['close'], label='Close', color='blue')
plt.ylabel('Close', color='blue')
plt.tick_params(axis='y', labelcolor='blue')

# duplicate to get a second y axis on the right and plot the volune
ax2 = plt.twinx()
ax2.plot(downsampled_df.index, downsampled_df['volume'], label='Volume', color='red')
ax2.set_ylabel('Volume', color='red')
ax2.tick_params(axis='y', labelcolor='red')

# Title and legend
plt.title('Close Price vs. Volume')
plt.show()py

输出结果如下:

ADA 每小时收盘价和交易量

我可能不是图表分析师,但我的看法是:即使我在 2020 年购买了 100 万 ADA,并在 2021 年卖出,我可能仍会在这里写这篇文章。写作给我带来快乐。

3、准备

让我们深入研究编写机器学习代码的领域。我将从设置一些超参数开始。你可能还没有掌握其中的大部分,但我们稍后会详细介绍它们。就目前而言,直接承认它们的存在是有益的。

hidden_units = 64
num_layers = 4
learning_rate = 0.001
learning_rate_step_size=5
learning_rate_gamma=0.9
num_epochs = 100
batch_size = 32
window_size = 14
prediction_steps = 7
dropout_rate = 0.2

# stats and metrics
epoch_count = []
train_loss_values = []
test_loss_values = []
train_rmse_values = []
test_rmse_values = []
duration = 0

features = ['close', 'volume', 'trades']
target = 'close'
  • hidden_units:隐藏单元,我们模型中的内部记忆单元的数量 — 将它们可视化为辅助学习过程的小算法。
  • num_layers:表示模型中的层数,其中每层都有自己的记忆单元。
  • learning_rate :学习率,表示模型在每个步骤后调整其权重和偏差(参数)的幅度。
  • learning_rate_step_size :学习率步长,确定学习率应增加或减少的频率。
  • learning_rate_gamma :学习率伽玛,用作调节学习率的乘数。
  • num_epochs:表示我们的主循环在训练过程中将经历的迭代次数。
  • batch_size :批次大小,指定在给定实例中处理的数据量。
  • window_size:窗口大小,定义我们预测时考虑的过去数据点的数量。
  • prediction_steps :预测步骤,表示在到达我们要预测的数据点之前要跳过的数据点数。
  • dropout_rate:确定模型中设置为零的节点比例。

最后,我们将指定用于模型输入的列:收盘价、交易量、交易量,当然还有包含我们想要预测的价格的列:收盘价。

4、数据标准化

另一个重要步骤是将数据分成训练数据集和测试数据集。首先,我们对数据进行标准化,这是一个关键过程,不仅可以加速计算,还可以提高模型的整体质量。

# right now start with a small sample, as soon as we have enough computing power, we can skip this step
df_sampled = df[features].head(1000).copy()

#scaler = MinMaxScaler() # MinMax would work, too, but in fact a stock price has not really "min/max values", except the 0 ;)
scaler = StandardScaler()

# Extract the selected features and transform them
selected_features = df_sampled[features].values.reshape(-1, len(features))
scaled_features = scaler.fit_transform(selected_features)

# Replace the original features with the scaled features in the DataFrame
df_sampled[features] = scaled_features

5、滑动窗口法

仅考虑当前价格来预测后续数据点可能会引入偏差,并由于缺乏足够的背景信息而限制模型的学习能力。仅依靠前一个数据点可能会导致模型过度适应最近的变化,从而可能阻碍其对未来值的预测能力。

为了缓解这种情况,我们实施了滑动窗口法,将多个数据点收集在一起。此外,我们引入了预测差距。我们不再预测下一小时的价格,因为这需要快速响应并可能导致过度拟合,而是将重点转移到预测十小时后的价格。这种长期预测可以做出明智的买卖决策。

因此,我们生成两个 NumPy 数组:一个包含特征 (X),另一个 (y) 包含所有标签/目标,代表我们要预测的价格。

大小写混合可能会造成混淆。通常,表示标量和向量的变量使用小写名称,而矩阵和张量则用大写名称表示。
def create_sequences(data, window_size, prediction_steps, features, label):
    X = []
    y = []
    for i in range(len(data) - window_size - prediction_steps + 1):
        sequence = data.iloc[i:i + window_size][features]
        target = data.iloc[i + window_size + prediction_steps - 1][label]
        X.append(sequence)
        y.append(target)
    return np.array(X), np.array(y)

X, y = create_sequences(df_sampled, window_size, prediction_steps, features, target)

6、拆分和批处理

现在,我们将数据分为训练数据集和测试数据集,这是机器学习中的常见步骤。接下来,我们将通过将数据组织到辅助对象中来满足一些技术要求。然后,该对象将为训练过程提供批量数据点。

# SPLITTING
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

# BATCHING
X_train_tensor = torch.Tensor(X_train)
y_train_tensor = torch.Tensor(y_train)
X_test_tensor = torch.Tensor(X_test)
y_test_tensor = torch.Tensor(y_test)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

7、验证

为了确保在此过程中不会丢失任何数据,我们可以轻松地逆转该过程。我不会在这里发布代码;你可以在笔记本中找到它。此外,我还添加了一个小部分,用于生成合成数据以复制整个数据整理过程。我们稍后会重新讨论这一点,所以请记住这一点。

设置了虚拟数据的DataFrame

最初的 14 行包含我们从“收盘价”、“成交量”和“交易”列中得出的特征。还记得我们如何将滑动窗口大小设置为 14 吗?值得注意的是,7 步之后,在位置 21,根据预测步骤,我们的收盘价标记为 21——这是我们的目标!

此外,我还实现了几个函数来恢复规范化、拆分和批处理过程。这些有助于我们验证数据准备是否准确执行。此处的 DataFrame 展示了将提供给 DataLoader的输入。

包含虚拟特征和标签的一个批次

前三列代表特征,第四列标记为“收盘价”,用作我们的模型试图预测的目标收盘价或“标签”。虽然由于舍入误差,它读起来是 2.999 左右而不是精确的 3,但看起来我们正朝着正确的方向前进。

8、初始化模型、损失函数和优化器

要训练模型,我们需要三个主要组件:模型、损失函数和优化器:

  • 模型在第一层循环(又称 epoch)中执行预测工作,通常它使用完全随机的参数。还记得上面的 X 吗?它获取 X(也称为输入或特征)并计算 y,也称为输出、目标或标签。
  • 损失函数计算预测输出与实际输出之间的差异,实际输出也是训练数据集的一部分。
  • 优化器获取损失并决定如何调整模型参数。
  • 然后一切重新开始,直到我们达到循环限制(num_epochs)
优化过程的简化可视化

9、模型类

我们将开始尝试 LSTM(长短期记忆)方法,这是一种循环神经网络 (RNN)。与通过层处理数据以生成输出的典型神经网络不同,循环神经网络可以整合过去的信息。这种能力由其记忆单元实现。以下是模型类:

class StockPriceLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size=1):
        super(StockPriceLSTM, self).__init__()
        self.hidden_size = hidden_size  # Size of the hidden state in the LSTM
        self.num_layers = num_layers    # Number of LSTM layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)  # LSTM layer
        self.fc = nn.Linear(hidden_size, output_size)  # Fully connected layer for output prediction

    def forward(self, input_data):
        # Initialize hidden and cell states for LSTM
        initial_hidden = torch.zeros(self.num_layers, input_data.size(0), self.hidden_size).to(input_data.device)
        initial_cell = torch.zeros(self.num_layers, input_data.size(0), self.hidden_size).to(input_data.device)
        
        # Forward propagate through LSTM
        lstm_output, _ = self.lstm(input_data, (initial_hidden, initial_cell))  # Output shape: (batch_size, seq_length, hidden_size)
        
        # Pass the output of the last time step through the fully connected layer
        last_time_step_output = lstm_output[:, -1, :]  # Extract the output from the last time step
        output = self.fc(last_time_step_output)  # Output shape: (batch_size, output_size)
        return output

nn.Module 继承时,必须定义两个方法: __init__forward。`init` 方法设置模型的结构。在本例中,它配置 LSTM 部分和用于输出的线性层。 forward 方法处理计算。它接收输入数据(我们的特征),通过我们模型的模块迭代处理它,并返回输出。

10、损失函数

为了测量损失,我们计算均方误差 (MSE)。听起来很复杂,但基本上它只是模型输出和实际值之间的差,平方并除以数据点的数量:

有几种可用于计算损失的选项。选择适当的损失函数是微调建模过程的关键方面,取决于目标和数据的性质。例如,在图像分类中,通常使用交叉熵损失。

11、优化器

优化器确定如何调整模型的参数。根据具体用例,有多种优化器可供选择。简而言之,优化器的作用如下:它根据学习率和损失修改模型的参数。

12、运行循环

在开始循环之前,我们是这样初始化它们:

model_1 = PricePredictionLSTM(input_size=len(features), hidden_size=hidden_units, num_layers=num_layers)
loss_fn = nn.MSELoss()
optimizer = torch.optim.AdamW(model_1.parameters(), lr=learning_rate)

当模型是大脑时,这就是我们工作的核心“循环”。根据“非官方优化循环歌曲”,步骤始终相同:

  • 激活训练模式:如果你的模型包含在训练和推理期间表现不同的组件(如 dropout 或批量规范化),请将模型设置为训练模式。
  • 计算预测:通过模型传递输入数据以获得预测。
  • 计算损失:计算预测和实际目标之间的损失。
  • 将梯度重置为零:使用 optimizer.zero_grad() 清除上一次迭代的梯度。
  • 反向传播(计算梯度):通过调用 loss.backward()执行反向传播,以计算相对于模型参数的损失梯度。
  • 更新参数:通过调用 optimizer.step()根据计算出的梯度更新模型的参数。
  • 重新开始……

以下代码还包含几个附加功能,用于跟踪损失值、时间测量以及最后但并非最不重要的一点:根据我们的测试数据集验证模型!

这里的关键方面是使用 torch.inference_mode()。当你的模型不在训练模式时,它应该在此上下文中运行以避免干扰梯度。有时,你可能会遇到 torch.no_grad(),它本质上具有相同的用途,但是一种较旧的实现。

我们还计算了准确度指标。与损失类似,它涉及计算均方误差 (MSE),然后取平方根。但是,这并不是完全必要的,因为它可能不会提供损失值以外的其他见解。

start = time.time()

start_epoch = 0 if len(epoch_count) == 0 else epoch_count[-1] # helpful if you start over this particular cell

for epoch in tqdm(range(start_epoch, start_epoch + num_epochs)): # tqdm is our progress bar wrapper

    model_1.train() # activate training mode

    # handle loss monitoring
    total_train_loss = 0.0
    all_train_targets = []
    all_train_outputs = []
    
    # process batches in the training dataloader
    for batch_idx, (inputs, targets) in enumerate(train_dataloader):
        
        optimizer.zero_grad() # reset gradients
        outputs = model_1(inputs) # calculate predictions
        loss = loss_fn(outputs.squeeze(), targets) # calculat the loss
        loss.backward() # backward propagation
        optimizer.step() # update parameters
        
        total_train_loss += loss.item()

        all_train_targets.extend(targets.numpy())
        all_train_outputs.extend(outputs.detach().numpy())

    # scheduler.step()

    model_1.eval() # activate eval mode

    # handle loss monitoring
    total_test_loss = 0.0
    all_test_targets = []
    all_test_outputs = []

    # process batches in the testing dataloader
    for i, (inputs, targets) in enumerate(test_dataloader):
        with torch.inference_mode(): # activate inference mode/no grad
            outputs = model_1(inputs) # calculate predictions
            loss = loss_fn(outputs.squeeze(), targets) # calculate loss

            # monitor loss
            total_test_loss += loss.item()
            all_test_targets.extend(targets.numpy())
            all_test_outputs.extend(outputs.detach().numpy())

    # calculate average epoch losses
    average_epoch_train_loss = total_train_loss / len(train_dataloader)
    average_epoch_test_loss = total_test_loss / len(test_dataloader)
    
    # caculate accuracy
    train_rmse = math.sqrt(mean_squared_error(all_train_targets, all_train_outputs))
    test_rmse = math.sqrt(mean_squared_error(all_test_targets, all_test_outputs))

    # VISUALIZE
    epoch_count.append(epoch)
    train_loss_values.append(average_epoch_train_loss)
    test_loss_values.append(average_epoch_test_loss)
    train_rmse_values.append(train_rmse)
    test_rmse_values.append(test_rmse)

    # LOG
    if epoch % int(num_epochs / 10) == 0 or epoch == num_epochs - 1:
        current_lr = scheduler.get_last_lr()[0]
        print(f"Epoch [{epoch + 1}/{start_epoch + num_epochs}], "
        f"Train Loss: {average_epoch_train_loss:.4f} | "
        f"Test Loss: {average_epoch_test_loss:.4f} | "
        f"Train RMSE: {train_rmse:.4f} | "
        f"Test RMSE: {test_rmse:.4f} | "
        f"Current LR: {current_lr:.8f} | "
        f"Duration: {time.time() - start:.0f} seconds")

duration += time.time() - start

13、等等……停……那是什么?学习率调度程序?

如果你仔细阅读了循环,你可能会注意到一个注释掉的命令: scheduler.step()。它有什么用?

学习率通常保持不变,并决定算法如何在每一步中调整模型的参数。调度程序有助于动态改变这个步长。虽然并非总是必要的,但尝试使用调度程序可能会有所帮助,有可能在较短的时间内改善结果。

这是初始化它的方式:

scheduler = lr_scheduler.StepLR(
	optimizer, 
    step_size=learning_rate_step_size, 
    gamma=learning_rate_gamma           # Adjust step_size and gamma as needed
)  

14、运行

现在我们已准备好运行所有内容并查看训练/优化过程的实际效果。为了获得第一印象,一切运行得如何,我建议从我们上面定义的虚拟数据集开始。我不会在这里发布绘图代码。

你的绘图应该是这样的:

结果还不错,而且是在 27 秒内完成的。但考虑到源数据,这并不令人意外。

让我们通过模型运行我们的现实世界数据。这不会花费太长时间, 100 个 epoch 之后的结果看起来……有点不对劲:

使用 LSTM 进行 100 次训练后的结果

让我们面对现实:在第一次尝试中,你不能指望在像这样的小数据集上仅进行 4 分钟的训练后就能获得实质性的结果。

此过程涉及广泛的超参数调整和选择网络架构。现在,让我们探索另一种方法:GRU,代表门控循环单元,是另一种循环神经网络。与 LSTM 相反,GRU 具有简化的架构。具体而言,GRU 使用两个门来管理数据流(重置和更新),而 LSTM 使用三个门(输入、遗忘和输出)。LSTM 非常适合处理复杂的依赖关系,而 GRU 则降低了计算复杂度。这是我们的模型类:

class PricePredictionGRU(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size=1):
        super(PricePredictionGRU, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Initialize hidden state
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # Forward propagate GRU
        out, _ = self.gru(x, h0)  # out: tensor of shape (batch_size, seq_length, hidden_size)
        
        # Pass the output of the last time step through the fully connected layer
        out = self.fc(out[:, -1, :])  # Output shape: (batch_size, output_size)
        return out

如你所见,它的结构与之前完全相同。这是因为 PyTorch 为模型提供了“模板”,我们只需正确地输入它们即可。

这是 200 个 epoch 之后的结果:

使用 GRU 进行 100 次训练后的结果

尽管运行时间几乎是预期时间的两倍,与承诺的计算要求减少相矛盾,但结果似乎明显改善。然而,它们仍然不足以准确预测股价。

15、未来的想法

首先,让我举一个例子来展示许多“价格预测”讨论中的常见缺陷:它们通常专注于预测下一个数据点,而不考虑未来的数据点。

这种方法大大简化了模型的任务。例如,如果给出今天的收盘价,预测明天的收盘价会变得相对容易,因为价格波动的典型范围。

下图显示了将预测差距设置为 1 时的预测,这意味着预测下一个数据点:

这里我将验证数据的样本量增加到 10.000:

使用 10,000 个数据点预测未来的第 7 个数据点

这并不可怕;至少曲线在某种程度上与总体趋势一致。最终,模型的质量取决于分配的计算能力和选择的超参数。


原文链接:How to predict stock or crypto prices. In 2024. With Python. And PyTorch.

DefiPlot翻译整理,转载请标明出处

通过 NowPayments 打赏