构建实时股票筛选器
本教程将向你展示如何构建一个实时的股票筛选器,它会不断分析流式实时市场数据馈送,以检测所有美国股票——约9,000只NMS股票和ETF的变化。

一键发币: SUI | SOL | BNB | ETH | BASE | ARB | OP | POLYGON | AVAX | FTM | OK
本教程将向你展示如何构建一个实时的股票筛选器(或扫描器),它会不断分析流式实时市场数据馈送,以检测所有美国股票——约9,000只NMS股票和ETF的变化。
在本教程结束时,你将能够获得时间戳到纳秒级别的输出,如下所示:
[2025-04-23T04:00:01.688717194-04:00] NVDA 涨幅为 4.40%(当前:98.1900,之前:102.7100)
[2025-04-23T04:00:02.121450448-04:00] BABA 涨幅为 10.34%(当前:131.2750,之前:118.9700)
[2025-04-23T06:36:23.507827237-04:00] AMZN 涨幅为 7.36%(当前:193.8850,之前:180.6000)
你也可以直接跳到GitHub获取完整代码 这里。
1、现有股票筛选工具的问题是什么?
实现股票扫描器的一个主要挑战是它需要监听市场上所有股票。很少有市场数据馈送能够持续流式传输所有~9,000只美国股票的实时价格,尤其是在通过互联网进行广域网(WAN)传输的情况下。
许多零售公司提供此类服务作为商业服务,但它们通常专注于零售交易场景,并且对触发警报的程序化定制较少。它们在交付保证和时间戳准确性方面也没有很强的保障。
另一个问题是大多数零售工具在处理盘前价格方面表现不佳,尤其是在上午9:30开盘之前。

特斯拉股份有限公司(TSLA)股票价格来自Google财经——盘前走势未显示,且难以与前一天的价格进行比较。
2、股票筛选示例
我们的股票筛选器应该:
- 高效地处理整个美国股票市场约9,000个符号。
- 具有中位数小于毫秒级的馈送延迟至纽约/东部时间(广域网形状)。
- 在不到5秒的时间内完成历史数据获取、订阅和开始扫描。
- 监控所有美国股票和ETF的价格变动超过可配置阈值。
- 将当前价格与前一天的收盘价进行比较。
- 当检测到显著波动时显示警报。
3、状态管理
首先,我们将编写一个类来保存此股票筛选器的所有状态。
我们需要几个常量:
PX_SCALE
用于以小数单位表示价格。默认情况下,Databento 使用固定整数表示法来避免精度损失,每个单位代表 1“纳美元”或1e-9
美元。PX_NULL
用于处理空价格。当订单簿的一侧为空时会发生这种情况——在盘前时段或在消费整个美国股票和ETF集合时,这种情况比您想象的更频繁。PCT_MOVE_THRESHOLD
将是我们触发警报的任意阈值。在这种情况下,我们将编写一个简单的警报,当股票在任一方向上移动 3% 或更多时触发。
此外,我们将创建 3 个字典以便我们可以查找每个符号:
self.symbol_directory
将跟踪符号的数字指标ID,像Nasdaq Basic这样的原始交易所馈送每天都会更改。self.last_day_lookup
将查找股票的最后收盘价。self.is_signal_lit
帮助我们指示是否已经触发了警报,因此当阈值首次超过时,我们只会打印一次。
from datetime import datetime, timedelta
from typing import Dict, Any
import databento as db
import pandas as pd
import pytz
class PriceMovementScanner:
"""用于检测所有美国股票大幅价格变动的扫描器。"""
# 常量
PX_SCALE: float = 1e-9
PX_NULL: int = 2**63 - 1
PCT_MOVE_THRESHOLD: float = 0.03
def __init__(self, pct_threshold: float = None, today: str = TODAY) -> None:
"""初始化扫描器,使用可配置的阈值和日期。"""
self.pct_threshold = pct_threshold or self.PCT_MOVE_THRESHOLD
self.today = today
self.today_midnight_ns = int(pd.Timestamp(today).timestamp() * 1e9)
self.symbol_directory: Dict[int, str] = {}
self.last_day_lookup: Dict[str, float] = self.get_last_day_lookup()
self.is_signal_lit: Dict[str, bool] = {symbol: False for symbol in self.last_day_lookup}
4、获取前一天的股票和ETF价格
我们将添加一个类方法,使用Databento的历史客户端填充self.last_day_lookup
。让我们理解它在做什么:
dataset="EQUS.SUMMARY"
— 这使用Databento的美国股票汇总馈送,该馈送提供了跨所有NMS交易所和ATS的综合日终价格(OHLCVs)。其设计目的是最大化CTA/UTP SIP覆盖范围,而无需许可费用,并提供适合每日日终使用的官方价格和成交量。schema="ohlcv-1d"
— 这获取每日OHLCV聚合。symbols="ALL_SYMBOLS"
— 这获取数据集中的所有股票代码。
def get_last_day_lookup(self) -> Dict[str, float]:
"""获取所有股票昨天的收盘价。"""
client = db.Historical()
now = pd.Timestamp(self.today).date()
yesterday = (pd.Timestamp(self.today) - timedelta(days=1)).date()
# 获取昨天的收盘价
data = client.timeseries.get_range(
dataset="EQUS.SUMMARY",
schema="ohlcv-1d",
symbols="ALL_SYMBOLS",
start=yesterday,
end=now,
)
# 请求符号映射:这是必需的,因为ALL_SYMBOLS请求不会自动将仪器ID映射到原始股票代码
symbology_json = data.request_symbology(client)
data.insert_symbology_json(symbology_json, clear_existing=True)
df = data.to_df()
# TODO: 在此处调整隔夜拆分,例如,使用Databento公司行为API
return dict(zip(df["symbol"], df["close"]))
如内联注释中所述,更完整的实现将使用公司行为数据来调整隔夜拆分后的收盘价。此方法灵活,您可以集成其他重大事件或基本面,如股息、市值、EPS和P/E比率更新。
您可能还想在书籍稳定后开始筛选,这可以通过使用交易价格而不是中间价、要求价差达到一定数量的基点,或者在某个固定时间之后来实现。
5、添加扫描和警报
首先,这会填充符号目录,用于符号映射消息,这些消息是在您订阅Databento上的实时馈送时最初接收到的。
然后,我们在每个MBP-1消息(顶级订单簿更新)上计算中间价,并将其与前一天的收盘价进行比较。
def scan(self, event: Any) -> None:
"""
扫描市场数据事件中的大幅价格变动。
"""
if isinstance(event, db.SymbolMappingMsg):
self.symbol_directory[event.hd.instrument_id] = event.stype_out_symbol
return
if not isinstance(event, db.MBP1Msg):
return
# 如果事件是从今天之前的回放订阅参数`start=...`,则跳过
#if event.hd.ts_event < self.today_midnight_ns:
# return
symbol = self.symbol_directory[event.instrument_id]
bid = event.levels[0].bid_px
ask = event.levels[0].ask_px
if bid == self.PX_NULL or ask == self.PX_NULL:
# 处理订单簿一侧为空的情况
return
mid = (event.levels[0].bid_px + event.levels[0].ask_px) * self.PX_SCALE * 0.5
last = self.last_day_lookup[symbol]
abs_r = abs(mid - last) / last
if abs_r > self.pct_threshold and not self.is_signal_lit[symbol]:
ts = pd.Timestamp(event.hd.ts_event, unit='ns').tz_localize('UTC').tz_convert('US/Eastern')
print(
f"[{ts.isoformat()}] {symbol} 移动了 {abs_r * 100:.2f}% "
f"(当前:{mid:.4f},之前:{last:.4f})"
)
self.is_signal_lit[symbol] = True
6、运行你的股票筛选器
现在,我们只需要运行代码。观察我们对这些参数的使用:
dataset="EQUS.MINI"
— 这使用Databento股票迷你馈送,这是一种成本效益高的综合馈送。您还可以通过交换此数据集ID来修改此代码以扫描所有期权或期货。schema="mbp-1"
— MBP-1 提供每条更新最佳出价或出价(BBO)的订单簿事件,并包括每笔交易以及书深度的变化,同时包含BBO处的总规模和订单数量。symbols="ALL_SYMBOLS"
— 这指示客户端订阅馈送中的每个符号。如果您只有特定的股票代码在您的观察列表中,您也可以在这里指定它们,例如["AMZN", "AAPL", "GOOG", "MSFT", "NVDA"]
表示亚马逊、苹果、谷歌、微软和英伟达。默认情况下,Databento 使用纳斯达克符号表示美国股票。start=0
— 日内回放 对于在非市场时间内调试此示例很有帮助。
def main() -> None:
scanner = PriceMovementScanner()
live = db.Live()
live.subscribe(
dataset="EQUS.MINI",
schema="mbp-1",
symbols="ALL_SYMBOLS",
start=0,
)
live.add_callback(scanner.scan)
live.start()
live.block_for_close()
if __name__ == "__main__":
main()
7、结果
[2025-04-24T04:00:00.007704938-04:00] TSLA 涨幅为 5.43%(当前:245.4300,之前:259.5100)
[2025-04-24T04:00:00.008339140-04:00] TQQQ 涨幅为 11.78%(当前:46.0000,之前:52.1400)
[2025-04-24T04:00:00.009881258-04:00] DBC 涨幅为 5.84%(当前:22.5650,之前:21.3200)
[2025-04-24T04:00:00.011338853-04:00] DBA 涨幅为 4.81%(当前:25.9400,之前:27.2500)
[2025-04-24T04:00:00.019717308-04:00] WEAT 涨幅为 3.35%(当前:4.4650,之前:4.6200)
[2025-04-24T04:00:00.048734686-04:00] RSST 涨幅为 4.37%(当前:19.5950,之前:20.4900)
[2025-04-24T04:00:00.052540506-04:00] BCI 涨幅为 17.21%(当前:24.3800,之前:20.8000)
[2025-04-24T04:00:00.111035469-04:00] SOYB 涨幅为 4.26%(当前:22.8100,之前:21.8777)
[2025-04-24T04:00:00.158550097-04:00] FTGC 涨幅为 3.49%(当前:25.6250,之前:24.7600)
[2025-04-24T04:00:00.158736420-04:00] CMDY 涨幅为 3.03%(当前:51.7200,之前:50.2000)
您可以看到,我们的筛选器在4月24日东部时间凌晨4点7.7毫秒后准确捕捉到了特斯拉股份有限公司(TSLA)的盘前波动,远早于当天早上5:30东部时间发布的彭博新闻的第一篇报道。


8、完整代码
你可以在GitHub上获取此示例的完整脚本 这里。
如果有兴趣仅在历史数据上扫描股票,请参阅我们的文档中关于寻找盘前最大动量者的示例。
或者了解更多有关Databento美国股票覆盖的信息,自成立以来已涵盖超过20,000+只股票和ETF,跨越所有美国交易所和ATS。我们是所有主要专有馈送的官方分销商,如纳斯达克TotalView、NYSE Integrated、NYSE Arca Integrated等。
原文链接:How to build a blazing fast real-time stock screener with Python and Databento
DefiPlot翻译整理,转载请标明出处
免责声明:本站资源仅用于学习目的,也不应被视为投资建议,读者在采取任何行动之前应自行研究并对自己的决定承担全部责任。