识别Polymarket上的套利机会
你可以在不同平台之间进行跨市场套利,或者在Polymarket内部,搜索跨事件找到不同的市场,从而产生套利机会。

一键发币: Aptos | X Layer | SUI | SOL | BNB | ETH | BASE | ARB | OP | Polygon | Avalanche | 用AI学区块链开发
如你所知,在现实世界中没有绝对无风险的套利,但有些方法可以利用概率理论来限制风险——荷兰赌注套利肯定就是其中之一:
荷兰赌注套利发生在市场定价不正确时。基本思路是:如果你能以低于1美元的价格购买一组互斥的结果,那么你就能获得保证的利润。
这是来自Polymarket的一个真实例子。假设有一个事件“候选人X会赢得选举吗?”有以下市场:
- “X获胜”交易价格为0.45美元
- “X失败”交易价格为0.50美元
总成本:0.95美元。因为一个结果必须发生,你会得到1美元。这就是0.05美元的保证利润。
但在Polymarket上有一个陷阱。你不能做空或使用杠杆。你只能购买股份并管理你的退出。因此,你在寻找概率篮子,其总和小于100%,而不是更多。这改变了你寻找机会和管理头寸的方式。
然而,在一个有效的市场中,这些简单的套利机会最多只是短暂存在。高频交易机器人和复杂的套利算法不断扫描这些差异,在它们出现的毫秒内捕捉利润。这创造了一场军备竞赛,只有最快和最先进技术的参与者才能受益,迅速消除任何定价低效率,并在零售交易者甚至发现它们之前就平仓这些机会。
因此,你需要更深入地扫描事件和市场,或者更好,跨事件寻找满足套利条件的市场对。你可以在不同平台之间进行跨市场套利,或者在Polymarket内部,搜索跨事件找到不同的市场,从而产生套利机会(在Polymarket的Market API中,事件和市场被结构化为嵌套数组,彼此相互关联)
1、构建扫描器:首先获取市场数据,其次获取价格
与其手动检查市场,我构建了一个扫描器。方法很简单:首先获取所有活跃的事件和市场,然后一次性获取所有价格。
我使用Polymarket的Gamma API来获取事件和市场。这里是核心流程:
# importer/events.py
async def import_pipeline(logger):
"""Main import pipeline with fallback strategies"""
importer = EventsImporter(logger)
# fetch events with end date max 3 months from now in format: YYYY-MM-DD
end_date_min_after_7_days = datetime.now() + timedelta(days=7)
end_date_max_after_49_days = datetime.now() + timedelta(days=49)
params = {
"active": "true",
"order": "volume",
"ascending": "false",
"end_date_min": end_date_min_after_7_days.strftime("%Y-%m-%d"),
"end_date_max": end_date_max_after_49_days.strftime("%Y-%m-%d"),
}
# I use the parameters to filter out the most volumn market within 7-49 days
一旦获取了数据,你可以像这样导入到PostgreSQL表中:
# Schema:
# Each events contains mutiple markets
class Event(Base):
__tablename__ = "events"
id = Column(String, primary_key=True)
ticker = Column(String)
slug = Column(String)
title = Column(String)
start_date = Column(DateTime(timezone=True))
creation_date = Column(DateTime(timezone=True))
end_date = Column(DateTime(timezone=True))
volume = Column(NUMERIC(20, 6))
markets = Column(ARRAY(String))
raw = Column(JSONB)
@classmethod
def from_api_data(cls, event_data: Dict) -> "Event":
markets = event_data.get("markets", [])
market_ids = [market["id"] for market in markets if market.get("id")]
return cls(
id=event_data["id"],
ticker=event_data.get("ticker"),
slug=event_data.get("slug"),
title=event_data.get("title"),
start_date=cls._parse_datetime(event_data.get("startDate")),
creation_date=cls._parse_datetime(event_data.get("creationDate")),
end_date=cls._parse_datetime(event_data.get("endDate")),
volume=Decimal(str(event_data.get("volume", 0))),
markets=market_ids,
raw=event_data,
)
@staticmethod
def _parse_datetime(date_str: str) -> Optional[datetime.datetime]:
if not date_str:
return None
try:
if date_str.endswith("Z"):
return datetime.datetime.fromisoformat(date_str.replace("Z", "+00:00"))
return datetime.datetime.fromisoformat(date_str)
except ValueError:
return None
@classmethod
def extract_markets_from_event(cls, event_data: Dict) -> List[Dict]:
markets = event_data.get("markets", [])
for market in markets:
market["event_id"] = event_data["id"]
return markets
@classmethod
def batch_from_api_data(
cls, events_data: List[Dict]
) -> Tuple[List["Event"], List["Market"]]:
events = []
all_markets = []
for event_data in events_data:
event = cls.from_api_data(event_data)
events.append(event)
markets_data = cls.extract_markets_from_event(event_data)
for market_data in markets_data:
market = Market.from_api_data(market_data)
all_markets.append(market)
return events, all_markets
2、为什么嵌入和向量搜索对于套利至关重要
问题在于:Polymarket有数千个市场。其中许多是近似重复的,或者用略有不同的措辞覆盖相同的底层事件。
例子:
- “特朗普赢得2024年选举”
- “唐纳德·特朗普当选总统2024年”
- “特朗普2024年11月胜利”
这些应该被分组在一起。如果你在一个中发现套利,你希望立即检查其他市场。
这些应该被分组在一起。如果你在一个中发现套利,你希望立即检查其他市场。
但是扫描所有市场与所有其他市场是O(n²)。如果有10,000个市场,那就是1亿次比较。向量搜索使这变得可行。
我选择了Chroma作为向量数据库。它轻量级,可在本地运行,并且在Mac M3上运行良好。对于嵌入,我使用了sentence-transformers的e5-large-v2
。这是一个质量和速度的良好平衡。在Apple Silicon上,该模型运行高效,不需要MLX优化。
设计从事件开始,而不是市场。每个事件通常有2-10个市场。因此,与其单独分类10,000个市场,不如先分类大约2,000个事件,然后扩展到它们的市场。快得多。
这里是嵌入工作的方式:
def batch_embedding(self, data: List[Tuple], collection_name: str):
"""Process data in batches with progress tracking"""
print(f"Starting embedding process for {len(data)} {collection_name}...")
# Get existing collection and filter duplicates
client = self._get_chroma_client()
collection = client.get_or_create_collection(
name=collection_name,
metadata={"description": f"Collection for {collection_name} embeddings"},
)
existing_ids = set(collection.get()["ids"])
filtered_data = [item for item in data if item[0] not in existing_ids]
if len(existing_ids) > 0:
print(f"Found {len(existing_ids)} existing items in collection")
if not filtered_data:
print(f"All {len(data)} {collection_name} already embedded, skipping.")
return
print(
f"Processing {len(filtered_data)} new {collection_name} (skipping {len(data) - len(filtered_data)} duplicates)..."
)
# Process in batches with progress bar
total_batches = (
len(filtered_data) + self.embedding_batch_size - 1
) // self.embedding_batch_size
with tqdm(
total=len(filtered_data), desc=f"Embedding {collection_name}", unit="items"
) as pbar:
for i in range(0, len(filtered_data), self.embedding_batch_size):
batch = filtered_data[i : i + self.embedding_batch_size]
batch_num = (i // self.embedding_batch_size) + 1
try:
ids = [item[0] for item in batch]
texts = [item[1] for item in batch]
metadatas = [item[2] for item in batch]
embeddings = self._generate_embeddings(texts)
collection.add(
embeddings=embeddings.tolist(),
documents=texts,
metadatas=metadatas,
ids=ids,
)
pbar.update(len(batch))
pbar.set_postfix({"batch": f"{batch_num}/{total_batches}"})
except Exception as e:
self.logger.error(f"Error in batch {batch_num}: {e}")
print(f"Error in batch {batch_num}: {e}")
raise
print(
f"Successfully embedded {len(filtered_data)} new {collection_name} into collection '{collection_name}'"
)
def _generate_embeddings(self, texts: List[str]) -> np.ndarray:
model = self._get_model()
prefixed_texts = [f"query: {text}" for text in texts]
return model.encode(prefixed_texts, normalize_embeddings=True)
3、寻找相似的事件
一旦所有内容都被嵌入,我就可以将相似的事件分组。算法很简单:
- 获取每个事件的嵌入
- 找出其他在相似性阈值内的事件(我使用0.8)
- 将它们分组
- 从Postgres中提取每组的市场细节(因为获取市场细节需要时间),所以我宁愿在事件分组后丰富细节。
4、执行的现实
找到套利机会是容易的部分。有利可图地执行它们要困难得多。
滑点会吞噬小边缘:当你发现机会和你的交易执行之间市场移动。对于多个步骤的套利交易,这种风险会叠加。
流动性很薄:热门市场有不错的成交量,但小众事件可能有宽价差或低深度。你的1000美元套利交易可能只在好价格下成交100美元。
速度很重要:其他机器人也在扫描同样的机会。好的机会会在几秒钟内消失。
这就像:
在蒸汽机前捡硬币
5、我实际构建的是什么
这是两部分系列的第一部分。下周我将分解复合投注推荐引擎的完整设计:
第二篇文章将涵盖LLM集成、凯利 sizing 算法和风险管理框架,将原始市场数据转化为可操作的投注策略。
DefiPlot翻译整理,转载请标明出处
免责声明:本站资源仅用于学习目的,也不应被视为投资建议,读者在采取任何行动之前应自行研究并对自己的决定承担全部责任。