狙击机器人Python开发教程

在本教程中,我们将使用Python和Web3.py构建一个狙击机器人(Sniper Bot)来监控和交易流动性对,当 Uniswap 上创建新的流动性池时,狙击机器人将进行交易。

狙击机器人Python开发教程
一键发币: SOL | BNB | ETH | BASE | Blast | ARB | OP | POLYGON | AVAX | FTM | OK

构建一个狙击机器人(Sniper Bot)来监控和交易去中心化交易所的流动性对。当在去中心化交易所(例如 Uniswap)上创建新的流动性池时,如果交易者是第一个购买新代币然后将其出售给新买家的人,他们就可以获利。这种策略称为狙击(Sniping),关键是当流动性池添加到去中心化交易所时,第一个购买代币。最好在同一个区块或接下来的几个区块中购买这些代币,作为流动性池的创建和融资。

当你想购买 IDO(初始 DEX“去中心化交易所”产品)时,狙击是一种有效的策略。 IDO 是一种筹款方法,其中代币在去中心化交易所发行并出售给新买家。当流动性池添加到 DEX 时,狙击机器人可以进行交易以立即购买代币。

在本教程中,我们将构建一个狙击机器人来监控和交易流动性对。 Python 和 Web3.py 监控以太坊区块链。当 Uniswap 上创建新的流动性池时,狙击机器人将进行交易。

1、去中心化交易所狙击交易机器人流程

狙击交易机器人将执行以下功能:

  • 首先,Python 进程侦听新创建的代币对的 Uniswap 事件
  • 然后系统识别并处理新的代币对
  • 最后,机器人提交交易以使用智能合约购买新代币

2、在 DEX 上交换代币的智能合约

首先我们需要在以太坊区块链上创建一个智能合约。 snipe 机器人将使用此智能合约在 Uniswap 上交换代币。单击此处,了解如何使用下面的智能合约在 Uniswap 上交换代币的详细说明。阅读代码中的注释以更好地理解合约的功能。

// SPDX-License-Identifier: MIT
pragma solidity ^0.7.0;


//import the ERC20 interface

interface IERC20 {
    function totalSupply() external view returns (uint);
    function balanceOf(address account) external view returns (uint);
    function transfer(address recipient, uint amount) external returns (bool);
    function allowance(address owner, address spender) external view returns (uint);
    function approve(address spender, uint amount) external returns (bool);
    function transferFrom(
        address sender,
        address recipient,
        uint amount
    ) external returns (bool);
    event Transfer(address indexed from, address indexed to, uint value);
    event Approval(address indexed owner, address indexed spender, uint value);
}


//import the Uniswap router
//the contract needs to use swapExactTokensForTokens
//this will allow us to import swapExactTokensForTokens into our contract

interface IUniswapV2Router {
  function getAmountsOut(uint256 amountIn, address[] memory path)
    external
    view
    returns (uint256[] memory amounts);
  
  function swapExactTokensForTokens(
  
    //amount of tokens we are sending in
    uint256 amountIn,
    //the minimum amount of tokens we want out of the trade
    uint256 amountOutMin,
    //list of token addresses we are going to trade in.  this is necessary to calculate amounts
    address[] calldata path,
    //this is the address we are going to send the output tokens to
    address to,
    //the last time that the trade is valid for
    uint256 deadline
  ) external returns (uint256[] memory amounts);
}

interface IUniswapV2Pair {
  function token0() external view returns (address);
  function token1() external view returns (address);
  function swap(
    uint256 amount0Out,
    uint256 amount1Out,
    address to,
    bytes calldata data
  ) external;
}

interface IUniswapV2Factory {
  function getPair(address token0, address token1) external returns (address);
}



contract tokenSwap {
    
    //address of the Uniswap v2 router
    address private constant UNISWAP_V2_ROUTER = 0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D;
    
    //address of WETH token.  This is needed because some times it is better to trade through WETH.  
    //you might get a better price using WETH.  
    //example trading from token A to WETH then WETH to token B might result in a better price
    address private constant WETH = 0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2;
    

    //this swap function is used to trade from one token to another
    //the inputs are self explainatory
    //token in = the token address you want to trade out of
    //token out = the token address you want as the output of this trade
    //amount in = the amount of tokens you are sending in
    //amount out Min = the minimum amount of tokens you want out of the trade
    //to = the address you want the tokens to be sent to
    
   function swap(address _tokenIn, address _tokenOut, uint256 _amountIn, uint256 _amountOutMin, address _to) external {
      
    //first we need to transfer the amount in tokens from the msg.sender to this contract
    //this contract will have the amount of in tokens
    IERC20(_tokenIn).transferFrom(msg.sender, address(this), _amountIn);
    
    //next we need to allow the uniswapv2 router to spend the token we just sent to this contract
    //by calling IERC20 approve you allow the uniswap contract to spend the tokens in this contract 
    IERC20(_tokenIn).approve(UNISWAP_V2_ROUTER, _amountIn);

    //path is an array of addresses.
    //this path array will have 3 addresses [tokenIn, WETH, tokenOut]
    //the if statement below takes into account if token in or token out is WETH.  then the path is only 2 addresses
    address[] memory path;
    if (_tokenIn == WETH || _tokenOut == WETH) {
      path = new address[](2);
      path[0] = _tokenIn;
      path[1] = _tokenOut;
    } else {
      path = new address[](3);
      path[0] = _tokenIn;
      path[1] = WETH;
      path[2] = _tokenOut;
    }
        //then we will call swapExactTokensForTokens
        //for the deadline we will pass in block.timestamp
        //the deadline is the latest time the trade is valid for
        IUniswapV2Router(UNISWAP_V2_ROUTER).swapExactTokensForTokens(_amountIn, _amountOutMin, path, _to, block.timestamp);
    }
    
       //this function will return the minimum amount from a swap
       //input the 3 parameters below and it will return the minimum amount out
       //this is needed for the swap function above
     function getAmountOutMin(address _tokenIn, address _tokenOut, uint256 _amountIn) external view returns (uint256) {

       //path is an array of addresses.
       //this path array will have 3 addresses [tokenIn, WETH, tokenOut]
       //the if statement below takes into account if token in or token out is WETH.  then the path is only 2 addresses
        address[] memory path;
        if (_tokenIn == WETH || _tokenOut == WETH) {
            path = new address[](2);
            path[0] = _tokenIn;
            path[1] = _tokenOut;
        } else {
            path = new address[](3);
            path[0] = _tokenIn;
            path[1] = WETH;
            path[2] = _tokenOut;
        }
        
        uint256[] memory amountOutMins = IUniswapV2Router(UNISWAP_V2_ROUTER).getAmountsOut(_amountIn, path);
        return amountOutMins[path.length -1];
    
    }
    
}

3、构建 snipe bot来监控流动性对

接下来,构建一个 Python 程序,用于监听 Uniswap 去中心化交易所上创建的新流动性对。该程序将循环运行并每 2 秒检查 Uniswap 是否有新的流动性对。该程序将新的 Uniswap 流动性对信息打印到控制台。

请阅读下面 Python 代码中的注释,以更好地理解此过程的工作原理:

# import the following dependencies
import json
from web3 import Web3
import asyncio

# add your blockchain connection information
infura_url = 'ADDYOURINFURAURL'
web3 = Web3(Web3.HTTPProvider(infura_url))

# uniswap address and abi
uniswap_router = '0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D'
uniswap_factory = '0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f'
uniswap_factory_abi = json.loads('[{"inputs":[{"internalType":"address","name":"_feeToSetter","type":"address"}],"payable":false,"stateMutability":"nonpayable","type":"constructor"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"token0","type":"address"},{"indexed":true,"internalType":"address","name":"token1","type":"address"},{"indexed":false,"internalType":"address","name":"pair","type":"address"},{"indexed":false,"internalType":"uint256","name":"","type":"uint256"}],"name":"PairCreated","type":"event"},{"constant":true,"inputs":[{"internalType":"uint256","name":"","type":"uint256"}],"name":"allPairs","outputs":[{"internalType":"address","name":"","type":"address"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"allPairsLength","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"internalType":"address","name":"tokenA","type":"address"},{"internalType":"address","name":"tokenB","type":"address"}],"name":"createPair","outputs":[{"internalType":"address","name":"pair","type":"address"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"feeTo","outputs":[{"internalType":"address","name":"","type":"address"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"feeToSetter","outputs":[{"internalType":"address","name":"","type":"address"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"internalType":"address","name":"","type":"address"},{"internalType":"address","name":"","type":"address"}],"name":"getPair","outputs":[{"internalType":"address","name":"","type":"address"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"internalType":"address","name":"_feeTo","type":"address"}],"name":"setFeeTo","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[{"internalType":"address","name":"_feeToSetter","type":"address"}],"name":"setFeeToSetter","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"}]')

contract = web3.eth.contract(address=uniswap_factory, abi=uniswap_factory_abi)


# define function to handle events and print to the console
def handle_event(event):
    print(Web3.toJSON(event))
    # and whatever


# asynchronous defined function to loop
# this loop sets up an event filter and is looking for new entires for the "PairCreated" event
# this loop runs on a poll interval
async def log_loop(event_filter, poll_interval):
    while True:
        for PairCreated in event_filter.get_new_entries():
            handle_event(PairCreated)
        await asyncio.sleep(poll_interval)


# when main is called
# create a filter for the latest block and look for the "PairCreated" event for the uniswap factory contract
# run an async loop
# try to run the log_loop function above every 2 seconds
def main():
    event_filter = contract.events.PairCreated.createFilter(fromBlock='latest')
    #block_filter = web3.eth.filter('latest')
    # tx_filter = web3.eth.filter('pending')
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(
            asyncio.gather(
                log_loop(event_filter, 2)))
                # log_loop(block_filter, 2),
                # log_loop(tx_filter, 2)))
    finally:
        # close loop to free up system resources
        loop.close()


if __name__ == "__main__":
    main()

上面的代码会将新的流动性对事件打印到您的 IDE 控制台。以下是全天添加到 Uniswap 的 11 个流动性对的示例:

Uniswap 流动性对创建日志

在机器人识别出新创建的流动性对后,将交易发送到智能合约以执行代币交换。

4、集成到 snipe 交易机器人中

要完成此机器人,需要将以下步骤集成到上面的 Python 代码中。

  • 查看上面的 Uniswap 流动性对创建日志屏幕截图。你需要将 token0、token1 和配对地址保存为代码中的返回值。你将需要这些值来执行代币交换。
  • 确定哪个地址是新的代币地址。大多数货币对都是用 WETH 与新代币设置的。
  • 阅读这个教程,了解如何使用 Python 中的 Web3.py 将 ETH 交易发送到区块链
  • 使用新的代币地址(token0、token1)从 Python 代码中调用智能合约中的 getAmountOutMin 函数。这将返回作为交换输入所需的 amountOutMin。
  • 使用 getAmountOutMin 的返回值并从 Python 代码调用智能合约中的函数 swap。

完成这些步骤后,你就拥有了一个可以监控和交易流动性对的有效狙击交易机器人。

可以修改 Solidity 智能合约以支持狙击其他 Uniswap 克隆交易所上的代币。例如,币安智能链、Polygon、Ubiq、CheapEth、Fantom 都是以太坊区块链克隆。每个区块链都有 Uniswap 去中心化交易所的副本。考虑将下面的去中心化交易所添加到你的狙击机器人中。

  • Uniswap
  • Mooniswap
  • 1Inch Exchange
  • Sushiswap
  • Sashimiswap
  • Binance Smart Chain Pancake Swap
  • CheapEth CheapSwap
  • Ubiq Shinobi
  • Polygon SwapMatic

此代码仅用于学习和娱乐目的。该代码未经审核,使用时需自行承担风险。请记住,智能合约是实验性的,可能包含错误。


原文链接:Build a snipe bot to monitor and trade liquidity pairs

DefiPlot翻译整理,转载请标明出处

通过 NowPayments 打赏