用Python实现算法交易 (DEX)

在本教程中,我们将学习如何利监控各种池子上的交易活动。此外,我们将监控我们的加密货币钱包余额,并在DEX上执行交换交易,例如Uniswap。

用Python实现算法交易 (DEX)
一键发币: Aptos | X Layer | SUI | SOL | BNB | ETH | BASE | ARB | OP | Polygon | Avalanche | 用AI学区块链开发

在去中心化交易所(DEX)如Uniswap上进行交易与在中心化交易所(CEX)如Coinbase或Binance上的交易本质上有所不同。DEX上的用户通过他们的加密货币钱包连接,并直接与智能合约互动,以便在给定的网络上交换不同的代币。最常用的DEX基于自动做市商(AMM)模型,用户可以通过一个池子访问流动性。代币的交换发生在池子中,并根据智能合约中定义的费用进行收费。因此,对于相同的交易对,可能会存在多个高流动性池,有时仅在费用上有所不同。

尽管DEX近年来变得非常流行,但它们通常缺乏CEX那样用户友好的监控界面。用户还需要手动使用自己的钱包连接到DEX,这在执行多笔交易时会增加额外的复杂性。因此,使用DEX API来自动化监控和执行交易是有用的。

在本教程中,我们将学习如何利用CoinGecko API中的链上DEX端点来监控各种池子上的交易活动。此外,我们将使用Python来监控我们的加密货币钱包余额,并在DEX上执行交换交易,例如Uniswap。

让我们开始吧!

1、先决条件

我们将使用Jupyter笔记本作为主要的开发环境。此外,需要安装以下包:

pip install uniswap

要在新的笔记本中开始编码,请在终端中执行以下命令:

jupyter lab

这应该会在浏览器中打开一个新标签页。如果你想使用通过GitHub仓库共享的笔记本,请先克隆仓库,然后在Jupyter环境中打开它。确保将API密钥文件的路径替换为你的路径。

💡 小贴士: 不要将API密钥直接插入笔记本中。这是一种不良的安全实践,如果你将笔记本通过公共GitHub仓库共享,可能会暴露密钥。

你还需要一个支持ERC-20(以太坊)代币的加密货币钱包。MetaMask易于设置为浏览器扩展,因此推荐使用。我们稍后将使用从MetaMask中生成的账户地址和主密钥。要查看主密钥,请点击账户摘要页面右上角的三个点(...)。选择“账户详情”,然后点击“显示主密钥”。你将被提示输入钱包密码,然后按住显示按钮。

一旦主密钥可见,将其复制到一个文件中,并以以下JSON格式保存:

{ “primary_key” : “XXYYZZ”}

💡 小贴士: 钱包账户的主密钥提供了对该地址关联的所有资金的完全访问权限,因此你应极其小心地处理它。不要与任何人分享主密钥,并避免将JSON文件上传到仓库或云备份中。如果你想测试本教程中的代码,最好创建一个新的钱包账户/地址,并向其中转入少量加密货币。

最后,我们需要一个RPC端点来与以太坊区块链交互。我们可以使用Ankr提供的免费RPC端点:https://rpc.ankr.com/eth

虽然这个端点有速率限制,但对于我们的演示用例来说已经足够。如果你需要更高的速率限制,可以考虑切换到他们的付费计划。另一个选项是使用Infura的免费计划,它提供了一个API密钥和大量的每日信用额度。

2、CoinGecko API链上端点

我们将使用以下链上DEX端点,这些端点在所有CoinGecko API付费计划中都可用:

我在之前的文章中概述了如何设置项目环境并保护API访问。

3、如何监控趋势DEX池

由于DEX上的交易是基于由加密货币池提供的流动性执行的,因此能够跟踪不同池中的代币活动以制定明智的交易策略至关重要。我们现在将深入探讨一些示例,使用CoinGecko API的链上DEX数据结合Python。

让我们首先编写一个方便函数,帮助我们进行API请求。

def get_response(endpoint, headers, params, URL):
	url = "".join((URL, endpoint))
	response = rq.get(url, headers = headers, params = params)
	if response.status_code == 200:
               data = response.json()
    	    return data
	else:
    	    print(f"Failed to fetch data, check status code {response.status_code}")

接下来,我们将使用以下辅助函数,该函数将根据所需的API端点生成正确的URL。

def get_url(url_type,
        	network,
        	dex = "",
        	pool_address = "",
        	token_address = ""):   

	url_dict = {
    	"trending_pools": f"/onchain/networks/{network}/trending_pools",
    	"top_pools": f"/onchain/networks/{network}/pools",
    	"top_pools_dex": f"/onchain/networks/{network}/dexes/{dex}/pools",
    	"specific_pool_dex": f"/onchain/networks/{network}/pools/{pool_address}",
    	"top_pools_add": f"/onchain/networks/{network}/tokens/{token_address}/pools"
	}

	return url_dict[url_type]

虽然许多网络得到支持,但我们主要专注于“ETH”。根据调用的API,可以将其他输入参数传递给此函数。

从CoinGecko API收到的响应将通过以下函数解析为pandas DataFrame的列:

def collect_response(list_response):    

	response_all = []

	for response in list_response["data"]:
    	all_attributes = response["attributes"]
    	daily_tx = all_attributes["transactions"]["h24"]
    	rel = response["relationships"]
   	 
    	temp_dict = dict(
        	pair = all_attributes["name"],
        	dex = rel["dex"]["data"]["id"],
        	add = all_attributes["address"],
        	fdv_usd = all_attributes["fdv_usd"],
        	market_cap_usd = all_attributes["market_cap_usd"],
        	daily_volume = all_attributes["volume_usd"]["h24"],
        	daily_price_change = all_attributes["price_change_percentage"]["h24"],
        	daily_buys = daily_tx["buys"],
        	daily_sells = daily_tx["sells"],
        	daily_buyers = daily_tx["buyers"],
        	daily_sellers = daily_tx["sellers"]
    	)
   	 
    	response_all.append(temp_dict)

	return response_all

现在可以将上述所有内容组合起来,列出趋势池,并按所需列排序。

def get_trending_pools(network, sort_by_col):

	target_url = get_url("trending_pools", network)

	trendpool_list_response = get_response(target_url,
                                       	use_pro,
                                       	"",
                                       	PRO_URL)
    
	trendpool_all = collect_response(trendpool_list_response)    

	return pd.DataFrame(trendpool_all).sort_values(by =
                                                [f"{sort_by_col}"],
                                                ascending = False)

让我们按“daily_volume”降序列出趋势池。

4、如何跟踪顶级DEX池

与前面的示例类似,我们也可以跟踪特定网络的顶级池。

def get_top_pools_network(network, sort_by_col):

	target_url = get_url("top_pools", network)

	toppool_list_response = get_response(target_url,
                                     	use_pro,
                                     	"",
                                     	PRO_URL)

	toppool_all = collect_response(toppool_list_response)   

	return pd.DataFrame(toppool_all).sort_values(by = [f"{sort_by_col}"],
                                             	ascending = False)

让我们按美元市值降序列出它们。

如上面的例子所示,Uniswap似乎是一个受欢迎的DEX。对于同一交易对,池子可能因实施的费用而有所不同,通常在配对名称旁边显示为百分比。看到Uniswap的顶级池会很有帮助。幸运的是,CoinGecko API有一个这样的端点!我们在下一个例子中使用这个端点,其中可以将特定的DEX作为输入参数提供。

def get_top_pools_dex(network, dex, sort_by_col):

	target_url = get_url("top_pools_dex", network, dex)

	toppool_list_response = get_response(target_url,
                                     	use_pro,
                                     	"",
                                     	PRO_URL)

	toppool_all = collect_response(toppool_list_response)   

	return pd.DataFrame(toppool_all).sort_values(by = [f"{sort_by_col}"],
                                             	ascending = False)

现在我们可以轻松比较“uniswap_v2”和“uniswap_v3”之间的顶级池(按市值降序排列)。

5、如何获取特定池地址的数据

有时,调查特定池以确定与其日常活动相关的更有用的指标会很有用。使用池地址作为额外输入,可以使用以下函数来获得进一步的见解:

def collect_pool_response(list_response):    

	response = list_response["data"]
	all_attributes = response["attributes"]
	daily_tx = all_attributes["transactions"]["h24"]
	rel = response["relationships"]
   	 
	response_dict = dict(
    	pair = all_attributes["name"],
    	dex = rel["dex"]["data"]["id"],
    	add = all_attributes["address"],
    	fdv_usd = all_attributes["fdv_usd"],
    	market_cap_usd = all_attributes["market_cap_usd"],
    	daily_volume = all_attributes["volume_usd"]["h24"],
    	daily_price_change =  
                         all_attributes["price_change_percentage"]["h24"],
    	daily_buys = daily_tx["buys"],
    	daily_sells = daily_tx["sells"],
    	daily_buyers = daily_tx["buyers"],
    	daily_sellers = daily_tx["sellers"]
	)

	return response_dict
def get_pool_data(network, dex, pool_address):

	target_url = get_url("specific_pool_dex", network, dex, pool_address)

	pool_list_response = get_response(target_url,
                                  	use_pro,
                                  	"",
                                  	PRO_URL)

	pool_all = collect_pool_response(pool_list_response)   

	return pool_all

6、如何监控给定代币的价格

到目前为止,我们已经查看并比较了不同池的各种聚合指标。然而,我们也需要比较代币价格数据。这可以让用户在不同池之间开发和测试套利策略。

代币通常分为基础代币和报价代币。以ETH/USDT池为例,ETH将是基础代币,USDT将是报价代币。我们感兴趣的是ETH到USDT的转换价格,这反映在下面的“base_token_price_quote_token”字段中:

def collect_response_token(list_response):    

	response_all = []

	for response in list_response["data"]:
    	all_attributes = response["attributes"]
    	daily_tx = all_attributes["transactions"]["h24"]
    	rel = response["relationships"]
   	 
    	temp_dict = dict(
        	pair = all_attributes["name"],
        	dex = rel["dex"]["data"]["id"],
        	add = all_attributes["address"],
        	base_token_price_quote_token = 
                all_attributes["base_token_price_quote_token"],
        	fdv_usd = all_attributes["fdv_usd"],
        	market_cap_usd = all_attributes["market_cap_usd"],
        	daily_volume = all_attributes["volume_usd"]["h24"],
        	daily_price_change = 
                all_attributes["price_change_percentage"]["h24"],
        	daily_buys = daily_tx["buys"],
        	daily_sells = daily_tx["sells"],
        	daily_buyers = daily_tx["buyers"],
        	daily_sellers = daily_tx["sellers"]
    	)
   	 
    	response_all.append(temp_dict)

	return response_all

本文的其余部分还将使用代币合同地址,该地址唯一地代表其原生区块链上的代币。这可以通过访问DEX上的池摘要页面找到。例如,我们可以在Uniswap 池摘要页面的左下角(“链接”面板)找到USDT代币合同地址,如下所示:

使用代币合同地址作为输入,我们可以使用以下函数来获取给定网络和代币地址的顶级池列表。

def get_top_pools_token(network,
                    	  token_address,
                    	  sort_by_col):

	target_url = get_url("top_pools_add",
                     	   network,
                     	   "",
                     	   "",
                     	   token_address)

	toppool_list_response = get_response(target_url,
                                     	 use_pro,
                                     	"",
                                     	PRO_URL)
    
	toppool_all = collect_response_token(toppool_list_response)    

	return pd.DataFrame(toppool_all).sort_values(by = [f"{sort_by_col}"],
                                               ascending = False)

请注意,Uniswap使用WETH(包装以太坊),这是用于与dApps交互的ETH的代币化形式。因此,交易对名称包含WETH。我们将过滤上述函数的输出,只列出名称中包含“WETH /”的行。此外,可以根据“base_token_price_quote_token”对行进行排序。


usdt = "0xdAC17F958D2ee523a2206206994597C13D831ec7"

df_token = get_top_pools_token("eth", usdt, "fdv_usd")

(
    df_token[df_token["pair"]
    .str.contains("WETH /")]
    .sort_values(by = ["base_token_price_quote_token"], ascending = True)
)

上述列表提供了一个清晰的套利机会概览——不仅在不同的DEX之间,而且在同一个DEX的不同版本之间(如Uniswap v2、v3等)。接下来,我们将学习如何使用Python在Uniswap上执行交换交易。为此,我们将针对突出显示的池(红色)卖出一些ETH并获得USDT。然后,我们将针对价格较低的池(蓝色)来潜在地以较少的USDT购买相同数量的ETH。

7、算法交易:如何在Uniswap上交换代币

我们将使用Uniswap Python库,它提供了各种方便的功能来检查价格和执行交换交易。这对于抽象掉最终用户的大部分web3实现非常有用。此外,它还可以用来监控用户钱包地址中的余额。

正如本文前面讨论的,为了使用钱包地址进行交易,需要提供关联的主密钥。这可以保存在一个本地文件中,然后如下所示读取:

# Get wallet private key
def get_private_key():
    f = open("/home/vikas/Documents/MetaMask_private_key.json")
    key_dict = json.load(f)
    return key_dict["private_key"]

接下来,需要设置Uniswap类:

from uniswap import Uniswap

# Use None for address and private_key when not doing a transaction
address = "0xAf418C54351BA8a0Aa15Ba4A5C99C46C122B3DBC" 	 
private_key = get_private_key()    
version = 3
provider = "https://rpc.ankr.com/eth"

uniswap = Uniswap(address = address,
              	  private_key = private_key,
              	  version = version,
              	  provider = provider)

如前所述,相关代币的合同地址也需要。

# Token contract address
https://support.uniswap.org/hc/en-us/articles/26757826138637-What-is-a-token-contract-address
eth = "0x0000000000000000000000000000000000000000"
usdt = "0xdAC17F958D2ee523a2206206994597C13D831ec7"

函数 get_price_output 返回在交换后您需要获得 X 数量的 USDT 所需的 ETH 金额(以 wei 为单位)。请注意,由于 1 ETH = 10¹⁸ wei,我们需要相应地进行除法运算,以便将结果转换回 ETH。在下面的例子中,我们想知道为了获得 3 USDT 需要交换多少 ETH。参数是以代币的最小单位给出的,这就是为什么 USDT 输入将是 3 x 10⁶。fee = 100 确保使用 0.01% 费用的池。对于 Uniswap v3,建议首先研究各种池,并选择费用低且流动性高的池。

uniswap.get_price_output(eth, usdt, 3 * 106, fee = 100) / (1018)

💡 小贴士: 预览交换价格是非常推荐的,如上所示。这将有助于验证代币的输入金额是否符合预期,从而防止意外订单。

常见代币的小数位数:

  • ETH、DAI、UNI、BAT、LINK 使用 18 位小数
  • WBTC 使用 8 位小数
  • USDC、USDT 使用 6 位小数

小数位数始终可以在 Etherscan 上查找。

💡 小贴士: Uniswap Python 库中的函数使用常见的小数格式,这意味着代币输入或输出都是最小单位。

要执行上述交换,我们使用与上述相同的一组参数调用 make_trade_output 函数。

uniswap.make_trade_output(eth, usdt, 3 * 10**6, fee = 100)

要检查钱包余额,使用以下函数:

上面的数据与 MetaMask 钱包显示的视图一致。

此外,我们还可以确认已收到预期数量的 USDT 代币(= 3)。交易也会出现在 Uniswap 的网络界面上的 目标池 中。

通过 钱包地址在 Etherscan 上查找,我们可以看到交易的更多细节。请记住,Uniswap 使用 WETH。然而,我们最初钱包里有 ETH。因此,到 USDT 的交换路径首先包括转移到 WETH。

接下来,我们将尝试使用 USDT 以相同的金额(约 0.001135)买回 ETH。这次,我们的目标是突出显示的池。

请注意,这个池是在 Uniswap 协议的 v2 上,所以我们需要在 Uniswap 类的定义中更新版本 (= 2)。我们现在可以检查为此交换所需的 USDT 数量。输入的 ETH 金额(以 wei 为单位)需要是整数。由于这是唯一一个在 v2 上的池,所以不需要使用 fee 参数。

与之前的交换相比,这次买回相同数量的 ETH 所需的 USDT 更少。这在意料之中,因为该池中 ETH 的价格(以 USDT 计)较低。然而,重要的是要记住,这些交换是链上交易,它们也会消耗 gas。gas 价格会根据网络拥堵情况而变化。因此,任何潜在的收益都需要仔细考虑交易成本的变化。

要执行交换,我们应用之前相同的函数。

输出是交易哈希,可以在 Etherscan 上验证。在“交易操作”部分可以找到简要摘要。

通过进一步滚动可以找到有关交换路径的更多细节:

该交易也出现在 池汇总页面 上的 Uniswap 上。

8、结束语

在本文中,我们学习了如何使用 CoinGecko API 的 链上 DEX 数据 和 Python 监控各种 DEX 池的交易活动。以 Uniswap 为例,我们演示了如何在不同的池中编程交换代币,可能针对套利机会。更进一步,我们还在 Uniswap 和 Etherscan 上验证了交易,从而确保交换路径符合我们的预期。

这里提供的脚本可以帮助交易者简化和自动化他们的交易策略,从跟踪钱包余额开始,扩展到在 DEX 上执行代币交换。


原文链接:Algorithmic Trading with Python: How to Execute Trades on a DEX

DefiPlot翻译整理,转载请标明出处

免责声明:本站资源仅用于学习目的,也不应被视为投资建议,读者在采取任何行动之前应自行研究并对自己的决定承担全部责任。