识别Polymarket上的套利机会

你可以在不同平台之间进行跨市场套利,或者在Polymarket内部,搜索跨事件找到不同的市场,从而产生套利机会。

识别Polymarket上的套利机会
一键发币: Aptos | X Layer | SUI | SOL | BNB | ETH | BASE | ARB | OP | Polygon | Avalanche | 用AI学区块链开发

如你所知,在现实世界中没有绝对无风险的套利,但有些方法可以利用概率理论来限制风险——荷兰赌注套利肯定就是其中之一:

荷兰赌注套利发生在市场定价不正确时。基本思路是:如果你能以低于1美元的价格购买一组互斥的结果,那么你就能获得保证的利润。

这是来自Polymarket的一个真实例子。假设有一个事件“候选人X会赢得选举吗?”有以下市场:

  • “X获胜”交易价格为0.45美元
  • “X失败”交易价格为0.50美元

总成本:0.95美元。因为一个结果必须发生,你会得到1美元。这就是0.05美元的保证利润。

但在Polymarket上有一个陷阱。你不能做空或使用杠杆。你只能购买股份并管理你的退出。因此,你在寻找概率篮子,其总和小于100%,而不是更多。这改变了你寻找机会和管理头寸的方式。

然而,在一个有效的市场中,这些简单的套利机会最多只是短暂存在。高频交易机器人和复杂的套利算法不断扫描这些差异,在它们出现的毫秒内捕捉利润。这创造了一场军备竞赛,只有最快和最先进技术的参与者才能受益,迅速消除任何定价低效率,并在零售交易者甚至发现它们之前就平仓这些机会。

因此,你需要更深入地扫描事件和市场,或者更好,跨事件寻找满足套利条件的市场对。你可以在不同平台之间进行跨市场套利,或者在Polymarket内部,搜索跨事件找到不同的市场,从而产生套利机会(在Polymarket的Market API中,事件和市场被结构化为嵌套数组,彼此相互关联)

1、构建扫描器:首先获取市场数据,其次获取价格

与其手动检查市场,我构建了一个扫描器。方法很简单:首先获取所有活跃的事件和市场,然后一次性获取所有价格。

我使用Polymarket的Gamma API来获取事件和市场。这里是核心流程:

# importer/events.py  
async def import_pipeline(logger):  
    """Main import pipeline with fallback strategies"""  
    importer = EventsImporter(logger)  

    # fetch events with end date max 3 months from now in format: YYYY-MM-DD  
    end_date_min_after_7_days = datetime.now() + timedelta(days=7)  
    end_date_max_after_49_days = datetime.now() + timedelta(days=49)  
    params = {  
        "active": "true",  
        "order": "volume",  
        "ascending": "false",  
        "end_date_min": end_date_min_after_7_days.strftime("%Y-%m-%d"),  
        "end_date_max": end_date_max_after_49_days.strftime("%Y-%m-%d"),  
    }  
# I use the parameters to filter out the most volumn market within 7-49 days   

一旦获取了数据,你可以像这样导入到PostgreSQL表中:

# Schema:  
# Each events contains mutiple markets  
class Event(Base):  
    __tablename__ = "events"  

    id = Column(String, primary_key=True)  
    ticker = Column(String)  
    slug = Column(String)  
    title = Column(String)  
    start_date = Column(DateTime(timezone=True))  
    creation_date = Column(DateTime(timezone=True))  
    end_date = Column(DateTime(timezone=True))  
    volume = Column(NUMERIC(20, 6))  
    markets = Column(ARRAY(String))  
    raw = Column(JSONB)  

    @classmethod  
    def from_api_data(cls, event_data: Dict) -> "Event":  
        markets = event_data.get("markets", [])  
        market_ids = [market["id"] for market in markets if market.get("id")]  

        return cls(  
            id=event_data["id"],  
            ticker=event_data.get("ticker"),  
            slug=event_data.get("slug"),  
            title=event_data.get("title"),  
            start_date=cls._parse_datetime(event_data.get("startDate")),  
            creation_date=cls._parse_datetime(event_data.get("creationDate")),  
            end_date=cls._parse_datetime(event_data.get("endDate")),  
            volume=Decimal(str(event_data.get("volume", 0))),  
            markets=market_ids,  
            raw=event_data,  
        )  

    @staticmethod  
    def _parse_datetime(date_str: str) -> Optional[datetime.datetime]:  
        if not date_str:  
            return None  
        try:  
            if date_str.endswith("Z"):  
                return datetime.datetime.fromisoformat(date_str.replace("Z", "+00:00"))  
            return datetime.datetime.fromisoformat(date_str)  
        except ValueError:  
            return None  

    @classmethod  
    def extract_markets_from_event(cls, event_data: Dict) -> List[Dict]:  
        markets = event_data.get("markets", [])  
        for market in markets:  
            market["event_id"] = event_data["id"]  
        return markets  

    @classmethod  
    def batch_from_api_data(  
        cls, events_data: List[Dict]  
    ) -> Tuple[List["Event"], List["Market"]]:  
        events = []  
        all_markets = []  

        for event_data in events_data:  
            event = cls.from_api_data(event_data)  
            events.append(event)  

            markets_data = cls.extract_markets_from_event(event_data)  
            for market_data in markets_data:  
                market = Market.from_api_data(market_data)  
                all_markets.append(market)  

        return events, all_markets  

2、为什么嵌入和向量搜索对于套利至关重要

问题在于:Polymarket有数千个市场。其中许多是近似重复的,或者用略有不同的措辞覆盖相同的底层事件。

例子:

  • “特朗普赢得2024年选举”
  • “唐纳德·特朗普当选总统2024年”
  • “特朗普2024年11月胜利”

这些应该被分组在一起。如果你在一个中发现套利,你希望立即检查其他市场。

这些应该被分组在一起。如果你在一个中发现套利,你希望立即检查其他市场。

但是扫描所有市场与所有其他市场是O(n²)。如果有10,000个市场,那就是1亿次比较。向量搜索使这变得可行。

我选择了Chroma作为向量数据库。它轻量级,可在本地运行,并且在Mac M3上运行良好。对于嵌入,我使用了sentence-transformers的e5-large-v2。这是一个质量和速度的良好平衡。在Apple Silicon上,该模型运行高效,不需要MLX优化。

设计从事件开始,而不是市场。每个事件通常有2-10个市场。因此,与其单独分类10,000个市场,不如先分类大约2,000个事件,然后扩展到它们的市场。快得多。

这里是嵌入工作的方式:


def batch_embedding(self, data: List[Tuple], collection_name: str):  
    """Process data in batches with progress tracking"""  
    print(f"Starting embedding process for {len(data)} {collection_name}...")  

    # Get existing collection and filter duplicates  
    client = self._get_chroma_client()  
    collection = client.get_or_create_collection(  
        name=collection_name,  
        metadata={"description": f"Collection for {collection_name} embeddings"},  
    )  

    existing_ids = set(collection.get()["ids"])  
    filtered_data = [item for item in data if item[0] not in existing_ids]  

    if len(existing_ids) > 0:  
        print(f"Found {len(existing_ids)} existing items in collection")  

    if not filtered_data:  
        print(f"All {len(data)} {collection_name} already embedded, skipping.")  
        return  

    print(  
        f"Processing {len(filtered_data)} new {collection_name} (skipping {len(data) - len(filtered_data)} duplicates)..."  
    )  

    # Process in batches with progress bar  
    total_batches = (  
        len(filtered_data) + self.embedding_batch_size - 1  
    ) // self.embedding_batch_size  

    with tqdm(  
        total=len(filtered_data), desc=f"Embedding {collection_name}", unit="items"  
    ) as pbar:  
        for i in range(0, len(filtered_data), self.embedding_batch_size):  
            batch = filtered_data[i : i + self.embedding_batch_size]  
            batch_num = (i // self.embedding_batch_size) + 1  

            try:  
                ids = [item[0] for item in batch]  
                texts = [item[1] for item in batch]  
                metadatas = [item[2] for item in batch]  

                embeddings = self._generate_embeddings(texts)  

                collection.add(  
                    embeddings=embeddings.tolist(),  
                    documents=texts,  
                    metadatas=metadatas,  
                    ids=ids,  
                )  

                pbar.update(len(batch))  
                pbar.set_postfix({"batch": f"{batch_num}/{total_batches}"})  

            except Exception as e:  
                self.logger.error(f"Error in batch {batch_num}: {e}")  
                print(f"Error in batch {batch_num}: {e}")  
                raise  

    print(  
        f"Successfully embedded {len(filtered_data)} new {collection_name} into collection '{collection_name}'"  
    )  

def _generate_embeddings(self, texts: List[str]) -> np.ndarray:  
    model = self._get_model()  
    prefixed_texts = [f"query: {text}" for text in texts]  
    return model.encode(prefixed_texts, normalize_embeddings=True)

3、寻找相似的事件

一旦所有内容都被嵌入,我就可以将相似的事件分组。算法很简单:

  1. 获取每个事件的嵌入
  2. 找出其他在相似性阈值内的事件(我使用0.8)
  3. 将它们分组
  4. 从Postgres中提取每组的市场细节(因为获取市场细节需要时间),所以我宁愿在事件分组后丰富细节。

4、执行的现实

找到套利机会是容易的部分。有利可图地执行它们要困难得多。

滑点会吞噬小边缘:当你发现机会和你的交易执行之间市场移动。对于多个步骤的套利交易,这种风险会叠加。

流动性很薄:热门市场有不错的成交量,但小众事件可能有宽价差或低深度。你的1000美元套利交易可能只在好价格下成交100美元。

速度很重要:其他机器人也在扫描同样的机会。好的机会会在几秒钟内消失。

这就像:

在蒸汽机前捡硬币

5、我实际构建的是什么

这是两部分系列的第一部分。下周我将分解复合投注推荐引擎的完整设计:

第二篇文章将涵盖LLM集成、凯利 sizing 算法和风险管理框架,将原始市场数据转化为可操作的投注策略。


原文链接:How to Programmatically Identify Arbitrage Opportunities on Polymarket (And Why I Built a Portfolio Betting Agent Instead)

DefiPlot翻译整理,转载请标明出处

免责声明:本站资源仅用于学习目的,也不应被视为投资建议,读者在采取任何行动之前应自行研究并对自己的决定承担全部责任。