Crypto套利机器人开发指南

在本教程中,我们将介绍如何构建基于 Python 的套利机器人,通过跟踪加密货币价格和其他有用的指标来识别套利机会

Crypto套利机器人开发指南
一键发币: SOL | BNB | ETH | BASE | Blast | ARB | OP | POLYGON | AVAX | FTM | OK

在金融世界中,套利是一种策略,即在一个市场购买证券、商品或货币,然后在另一个市场以更高的价格出售。两个市场之间的价格差异减去任何交易费即为利润。

对于加密货币,大多数交易活动都是 24/7 在众多交易所进行的。资产的价格由供需经济决定,而不同交易所的价格可能不一定相同。跨交易所套利策略可以利用这种差异,从而为交易者提供潜在获利的机会。

在本教程中,我们将介绍如何构建基于 Python 的套利机器人,通过跟踪加密货币价格和其他有用的指标来识别套利机会。请注意,我们将构建的机器人不会涵盖在交易所执行交易。

我们将使用 CoinGecko API 中的演示端点来检索加密货币价格数据。这些端点可以免费访问,但需要密钥身份验证。

  • /exchanges - 用于获取所有受支持的交易所列表以及相关数据(ID、名称、国家/地区等)的端点。请注意,只有在 CoinGecko 上有活跃交易量的交易所才会列出。
  • /exchanges/{id}/tickers - 用于根据给定交易所 ID 查询特定代码的端点。响应包含有用的数据,例如上次交易的时间、最新价格和交易量。
  • /exchange_rates - 获取从 BTC 到其他货币的汇率列表。
  • /exchanges/{id}/volume_chart - 获取选定交易所的历史总交易量数据(以 BTC 为单位),带有 unix 时间戳。

💡专业提示:任何人都可以注册 CoinGecko API 演示计划(免费!)并生成 API 密钥,每月调用次数上限为 10,000 次,调用速率限制为 30 次/分钟。

但是,由于我们的加密套利机器人旨在一直运行,因此可能会遇到速率限制。在这种情况下,专业 API 密钥可能会有所帮助。

1、先决条件

我们将使用 Jupyter 笔记本来创建和运行机器人。确保还安装了 Python 3 和以下附加软件包:

pip install jupyterlab
pip install pandas
pip install numpy
pip install pytz

要在新笔记本中开始编码,请执行以下命令:

jupyter lab

这应该会在浏览器窗口中打开一个新选项卡。如果你想使用通过 GitHub 存储库共享的笔记本,请先克隆存储库,然后在 Jupyter 环境中打开它。

2、设置项目环境和 API 访问

可以按如下所示加载 Python 包:

import requests as rq
import json
import pandas as pd
pd.set_option('display.precision', 4,
              'display.colheader_justify', 'center')
import numpy as np
import warnings
import pytz
import datetime
import time
from IPython.display import clear_output

请注意,我们已使用 pd.set_option 为 pandas 库设置了其他显示选项。这在以后会很有用,因为我们将使用 DataFrames 来可视化所有数据。此外,一旦我们开始运行机器人并希望我们的数据表在笔记本中定期刷新,就需要 clear_output。

可以从本地文件中读取演示 API 密钥。 use_demo 标头用于 get_response 函数,它将为我们发出请求。状态代码 200 表示请求成功。

def get_demo_key():
    f = open("/home/vikas/Documents/CG_demo_key.json")
    key_dict = json.load(f)
    return key_dict["key"]

use_demo = {
           "accept": "application/json",
           "x-cg-demo-api-key" : get_demo_key() 
}

def get_response(endpoint, headers, params, URL):
    url = "".join((URL, endpoint))
    response = rq.get(url, headers = headers, params = params)
    if response.status_code == 200:
        data = response.json()
        return data
    else:
        print(f"Failed to fetch data, check status code {response.status_code}")

可以在这里查看完整的响应代码和对应的状态。

3、如何获取所有加密货币交易所的列表

要获取所有加密货币交易所的完整列表,请对 CoinGecko API 中的 /exchanges 端点进行 API 调用。以下查询参数将确保我们在单个页面中获得所有结果。

# Valid values for results per page is between 1-250
exchange_params = {
            "per_page": 250,
            "page": 1
}

使用之前定义的函数,我们现在可以发出请求。然后可以将结果转换为 Pandas DataFrame 以便于可视化。


exchange_list_response = get_response("/exchanges", use_demo, exchange_params, PUB_URL)
df_ex = pd.DataFrame(exchange_list_response)

要按交易量列出交易所,可以轻松地根据“trade_volume_24h_btc”列对上述 DataFrame 进行排序。


df_ex_subset = df_ex[["id", "name", "country", "trade_volume_24h_btc"]]
df_ex_subset = df_ex_subset.sort_values(by = ["trade_volume_24h_btc"], ascending = False)

还可以根据注册国家/地区筛选交易所。这在以后会很有用,因为我们可以定位特定市场。

4、如何获取加密货币交易所代码

对于每个交易所,都有关于多个加密货币代码(或交易对)的数据。我们希望筛选列表并找到我们感兴趣的代码的最新交易。向相关端点发出请求后,我们可以循环遍历响应并找到相关的基础货币和目标货币。如果未找到匹配项,则返回空字符串。这种情况可能发生,因为并非所有交易所都提供所有交易对。

def get_trade_exchange(id, base_curr, target_curr):
    
    exchange_ticker_response = get_response(f"/exchanges/{id}/tickers",
                                            use_demo,
                                            {},
                                            PUB_URL)
    
    found_match = ""
    
    for ticker in exchange_ticker_response["tickers"]:
        if ticker["base"] == base_curr and ticker["target"] == target_curr:
            found_match = ticker
            break
            
    if found_match == "":
        warnings.warn(f"No data found for {base_curr}-{target_curr} pair in {id}")
    
    return found_match

以交易所“gdax”(Coinbase)的交易对(或代码)ETH-USD 为例:

5、如何将时间戳转换为本地时区 (Python)

如上所示,返回的数据带有不同时区的时间戳。在监控我们的机器人活动时,将其转换为我们自己的时区会很有帮助。这可以使用 pytz 库轻松实现,例如针对“欧洲/阿姆斯特丹”所做的那样。

def convert_to_local_tz(old_ts):
    
    new_tz = pytz.timezone("Europe/Amsterdam")
    old_tz = pytz.timezone("UTC")
    
    format = "%Y-%m-%dT%H:%M:%S+00:00"
    datetime_obj = datetime.datetime.strptime(old_ts, format)
    
    localized_ts = old_tz.localize(datetime_obj)
    new_ts = localized_ts.astimezone(new_tz)
    
    return new_ts

6、如何获取多个交易所的加密货币行情数据

现在我们知道如何获取单个交易所的行情数据,我们可以扩展相同的逻辑来收集来自多个交易所的数据。为了专注于某个市场,我们还可以添加针对该国家/地区的过滤器。从行情响应中,我们收集最后交易价格、最后交易量、价差和交易时间(转换为当地时区)。如果交易对未在给定交易所上市,则会显示警告。

def get_trade_exchange_per_country(country,
                                   base_curr,
                                   target_curr):
    
    df_all = df_ex_subset[(df_ex_subset["country"] == country)]    
    
    exchanges_list = df_all["id"]
    ex_all = []    
       
    for exchange_id in exchanges_list:
        found_match = get_trade_exchange(exchange_id, base_curr, target_curr)
        if found_match == "":
            continue
        else:
            temp_dict = dict(
                             exchange = exchange_id,
                             last_price = found_match["last"],
                             last_vol   = found_match["volume"],
                             spread     = found_match["bid_ask_spread_percentage"],
                             trade_time = convert_to_local_tz(found_match["last_traded_at"])
                             )
            ex_all.append(temp_dict)
            
    return pd.DataFrame(ex_all)

买卖价差百分比是卖方要求的资产最低价格与潜在买方的最高出价之间的差额。价差值越低,表示交易所中给定资产的流动性和交易量越高。相反,价差越大通常表示流动性越低。因此,可以使用此指标来判断是否应考虑在特定交易所执行套利交易。

7、如何获取多种货币的比特币汇率

各种端点的数据(例如,交易量)以 BTC 为单位报告。对于我们的机器人来说,另外确定给定代码的交易量占总交易量的百分比将会很有趣,这可以让我们进一步了解给定交易所的流动性。

为了将 BTC 转换为不同的目标货币,我们可以通过 CoinGecko API 获取汇率,如下所示:

def get_exchange_rate(base_curr):
    
    # This returns current BTC to base_curr exchange rate    
    exchange_rate_response = get_response(f"/exchange_rates",
                                          use_demo,
                                          {},
                                          PUB_URL)
    rate = ""
    try:
        rate = exchange_rate_response["rates"][base_curr.lower()]["value"]
    except KeyError as ke:
        print("Currency not found in the exchange rate API response:", ke)
        
    return rate  

8、如何获取历史交易量数据

使用给定时间段的历史交易量数据,我们可以使用 7 天窗口内的简单移动平均线确定最新交易量。然后可以使用上一节中确定的汇率将此交易量(默认为 BTC)转换为我们感兴趣的货币。一旦我们知道总交易量(所有股票代码的总和),就可以轻松确定我们股票代码交易量的百分比,稍后将显示。

def get_vol_exchange(id, days, base_curr):
    
    vol_params = {"days": days}
    
    exchange_vol_response = get_response(f"/exchanges/{id}/volume_chart",
                                         use_demo,
                                         vol_params,
                                         PUB_URL)
    
    time, volume = [], []
    
    # Get exchange rate when base_curr is not BTC
    ex_rate = 1.0
    if base_curr != "BTC":
        ex_rate = get_exchange_rate(base_curr)
        
        # Give a warning when exchange rate is not found
        if ex_rate == "":
            print(f"Unable to find exchange rate for {base_curr}, vol will be reported in BTC")
            ex_rate = 1.0
    
    for i in range(len(exchange_vol_response)):
        # Convert to seconds
        s = exchange_vol_response[i][0] / 1000
        time.append(datetime.datetime.fromtimestamp(s).strftime('%Y-%m-%d'))
        
        # Default unit for volume is BTC
        volume.append(float(exchange_vol_response[i][1]) * ex_rate)
                      
    df_vol = pd.DataFrame(list(zip(time, volume)), columns = ["date", "volume"])
    
    # Calculate SMA for a specific window
    df_vol["volume_SMA"] = df_vol["volume"].rolling(7).mean()
    
    return df_vol.sort_values(by = ["date"], ascending = False).reset_index(drop = True)

Kraken 上 30 天交易量的示例(为简洁起见,仅显示 15 行)如下所示:

9、如何汇总和显示加密货币交易所的交易

在运行我们的机器人之前,重要的是要考虑如何随时间汇总数据。本质上,我们的机器人将定期获取最新的交易数据。对于某些交易所,在此期间可能不会发生新的交易;对于其他交易所,可能会执行许多交易。因此,删除重复项至关重要。然后可以使用唯一值的数量来确定交易数量。此外,要建立可靠的套利策略,收集一段时间内的统计数据会有所帮助。因此,我们将对交易所 ID 执行分组操作,并计算所有相关列的平均值。

此外,还将添加一个包含总交易所交易量百分比的新列(如前几节所示)。

def display_agg_per_exchange(df_ex_all, base_curr):
    
    # Group data and calculate statistics per exchange    
    df_agg = (
        df_ex_all.groupby("exchange").agg
        (        
            trade_time_min = ("trade_time", 'min'),
            trade_time_latest = ("trade_time", 'max'),
            last_price_mean = ("last_price", 'mean'),
            last_vol_mean = ("last_vol", 'mean'),
            spread_mean = ("spread", 'mean'),
            num_trades = ("last_price", 'count')
        )
    )
    
    # Get time interval over which statistics have been calculated    
    df_agg["trade_time_duration"] = df_agg["trade_time_latest"] - df_agg["trade_time_min"]
    
    # Reset columns so that we can access exchanges below
    df_agg = df_agg.reset_index()
    
    # Calculate % of total volume for all exchanges
    last_vol_pert = []
    for i, row in df_agg.iterrows():
        try:
            df_vol = get_vol_exchange(row["exchange"], 30, base_curr)
            current_vol = df_vol["volume_SMA"][0]
            vol_pert = (row["last_vol_mean"] / current_vol) * 100
            last_vol_pert.append(vol_pert)
        except:
            last_vol_pert.append("")
            continue
            
    # Add % of total volume column
    df_agg["last_vol_pert"] = last_vol_pert
    
    # Remove redundant column
    df_agg = df_agg.drop(columns = ["trade_time_min"])
    
    # Round all float values
    # (seems to be overwritten by style below)
    df_agg = df_agg.round({"last_price_mean": 2,
                           "last_vol_mean": 2,
                           "spread_mean": 2
                          })
    
    display(df_agg.style.apply(highlight_max_min,
                               color = 'green',
                               subset = "last_price_mean")
           )
           
    return None

我们将使用以下功能进一步突出显示具有最高和最低价格的交易所,以便更好地了解情况:

def highlight_max_min(x, color):
    
    return np.where((x == np.nanmax(x.to_numpy())) |
                    (x == np.nanmin(x.to_numpy())),
                    f"color: {color};",
                    None)

10、运行加密货币交易所套利机器人

我们的机器人需要持续监控多个交易所的最新交易。因此,我们将使用 while 语句执行一个单元。这将使代码持续运行,直到用户停止。为了在更新之间引入一分钟的延迟,我们将使用 sleep 语句。由于 API 本身在演示计划中每分钟刷新一次,因此没有必要缩短延迟时间。

def run_bot(country,
            base_curr,
            target_curr):
    
    df_ex_all = get_trade_exchange_per_country(country, base_curr, target_curr)
    
    # Collect data every minute    
    while True:
        time.sleep(60)
        df_new = get_trade_exchange_per_country(country, base_curr, target_curr)
        
        # Merge to existing DataFrame
        df_ex_all = pd.concat([df_ex_all, df_new])
        
        # Remove duplicate rows based on all columns
        df_ex_all = df_ex_all.drop_duplicates()
        
        # Clear previous display once new one is available
        clear_output(wait = True)
        display_agg_per_exchange(df_ex_all, base_curr)        
        
    return None

举个例子,我们可以针对美国所有交易所的 ETH-USDT 对测试机器人。运行机器人约 2 小时后,我们可以看到以下内容:

绿色突出显示的是最低价格(Coinlist)和最高价格(Binance US)。因此,一个套利策略示例可能是在 Coinlist 购买 ETH,然后立即在 Binance US 上出售。

另一个值得注意的有趣点是买卖价差与交易数量之间的相关性。在 Gemini 上,价差相当高 - 表明该交易对的流动性较低。与其他交易所相比,同一时期的交易数量较低(只有 2 笔!)进一步证实了这一点。

要停止机器人,请导航到顶部的“内核”选项卡并选择“中断内核”

11、结束语

利用单个 Jupyter 笔记本中 CoinGecko 的 API 和 Python 编程的强大功能,我们能够持续监控来自各个加密货币交易所的最新交易,并突出跨交易所套利机会。


原文链接:How to Build a Crypto Arbitrage Bot (Python Guide)

DefiPlot翻译整理,转载请标明出处

通过 NowPayments 打赏