构建实时Pump/Dump检测器

在这篇文章中,我们将通过仅使用 SQL 来构建一个强大的加密货币实时Pump/Dump检测系统。

构建实时Pump/Dump检测器
一键发币: x402兼容 | Aptos | X Layer | SUI | SOL | BNB | ETH | BASE | ARB | OP | Polygon | Avalanche

加密货币市场以其波动性而闻名,不幸的是,这可能被利用来“Pump-n-Dump”。这些操纵行为,其中一种代币的价格被人为地推高(“Pump”)然后大量抛售(“Dump”),可能在几分钟内发生,使不知情的交易者遭受重大损失。

实时检测这些事件是一个经典的密集数据挑战。你需要处理大量的交易数据流,在线计算复杂的分析,并立即触发警报。

在这篇文章中,我们将通过仅使用 RisingWave 中的 SQL 来构建一个强大的实时Pump/Dump检测系统。我们将构建一个系统,可以:

  • 接收实时的加密货币交易流。
  • 将数据聚合为标准化的每分钟条形图。
  • 计算关键异常信号:快速的价格变化、不寻常的成交量激增以及单边买卖压力。
  • 将这些信号组合成一个明确的规则来触发警报。
  • 以几秒的延迟将这些警报传递给下游系统,如 Kafka 或 webhook。

让我们开始吧。

1、建立基础 - 接收和结构化数据

在我们可以进行任何分析之前,我们需要将数据引入我们的系统并为其赋予结构。我们的原始数据是各种市场对的个别交易流。

1.1 创建交易源

首先,我们定义与数据源的连接——在这种情况下,是一个名为 trades 的 Kafka 主题。CREATE SOURCE 语句声明了传入交易数据的模式。

这里的一个关键元素是 WATERMARK。像任何现实世界的数据流一样,交易数据可能会稍微无序到达。WATERMARK FOR ts AS ts - INTERVAL '5 seconds' 子句告诉 RisingWave 预期事件最多晚到 5 秒。这对于在流式环境中确保正确和及时的计算至关重要。

CREATE SOURCE trades_src (
  pair_id   BIGINT,
  symbol    VARCHAR,
  ts        TIMESTAMP,           -- 事件时间
  side      VARCHAR,             -- 'BUY'/'SELL'
  price     DOUBLE PRECISION,
  qty       DOUBLE PRECISION,
    -- 允许轻微的无序;根据您的 P95 延迟进行调整
  WATERMARK FOR ts AS ts - INTERVAL '5 seconds'
) WITH (
  connector = 'kafka',
  topic = 'trades',
  properties.bootstrap.server = 'localhost:9092',
  scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

1.2 聚合为每分钟条形图

分析每一笔交易效率低下。金融分析的标准方法是将交易聚合为基于时间的“条形图”或“蜡烛图”(例如,开盘价、最高价、最低价、收盘价、成交量)。我们将创建一个 1 分钟的条形图,使用一个物化视图。

RisingWave 中的物化视图是一个查询,其结果被存储并随着新数据的到来自动且增量地保持更新。这是我们分析的基础。

CREATE MATERIALIZED VIEW bar_1m AS
SELECT
  t.pair_id,
  window_start AS bucket_start,
  FIRST_VALUE(price ORDER BY ts) AS open,
  MAX(price)         AS high,
  MIN(price)         AS low,
  LAST_VALUE(price ORDER BY ts)  AS close,
  SUM(qty)           AS vol,
  SUM(CASE WHEN UPPER(side)='BUY'  THEN qty ELSE 0 END) AS buy_vol,
  SUM(CASE WHEN UPPER(side)='SELL' THEN qty ELSE 0 END) AS sell_vol,
  COUNT(*) AS trades
FROM TUMBLE(trades_src, ts, INTERVAL '1 minute') t
GROUP BY t.pair_id, window_start;

在这里,TUMBLE 函数将交易分组为固定、非重叠的 1 分钟窗口。然后我们计算该分钟的开盘价、最高价、最低价和收盘价,以及总成交量、买入量和卖出量。

提示: 为了节省计算资源,您可以预先过滤活跃市场。这个视图确保您只对过去 24 小时内有交易的对进行计算。

CREATE MATERIALIZED VIEW active_pairs_24h AS
SELECT pair_id
FROM (
  SELECT pair_id, MAX(ts) AS last_ts
  FROM trades_src
  GROUP BY pair_id
) t
WHERE last_ts >= NOW() - INTERVAL '24 hours';

然后,您只需将 bar_1m 视图与 active_pairs_24h 进行 JOIN。

2、开发信号 - 核心检测特征

有了我们的 1 分钟条形图,我们现在可以构建检测器的“感官”。经典的泵-抛售有三个关键特征:快速的价格上涨、成交量的激增以及单边的买方压力。

信号 #1:快速价格变化(收益)

我们需要衡量价格最近的变化有多大。我们可以使用 LAG 窗口函数计算 1 分钟和 5 分钟的收益,它允许我们查看前一行的收盘价。

CREATE MATERIALIZED VIEW bar_1m_with_returns AS
SELECT
  pair_id,
  bucket_start,
  open, high, low, close, vol, buy_vol, sell_vol, trades,
  (close / NULLIF(LAG(close, 1)  OVER (PARTITION BY pair_id ORDER BY bucket_start), 0) - 1) AS ret_1m,
  (close / NULLIF(LAG(close, 5)  OVER (PARTITION BY pair_id ORDER BY bucket_start), 0) - 1) AS ret_5m
FROM bar_1m;

信号 #2:异常成交量(成交量激增)

价格变动只有在有成交量支持的情况下才重要。但什么是“高成交量”?这相对于市场的近期活动而言。我们可以使用 Z 分数来量化这种“异常性”,它测量当前成交量与近期平均值的偏离程度(以标准差为单位)。

首先,我们计算 30 分钟滚动基线的平均成交量和标准差。

CREATE MATERIALIZED VIEW vol_baseline_30m AS
SELECT
  pair_id,
  bucket_start,
  AVG(vol)  OVER (PARTITION BY pair_id ORDER BY bucket_start ROWS BETWEEN 30 PRECEDING AND 1 PRECEDING) AS vol_mean_30m,  
  STDDEV_POP(vol) OVER (PARTITION BY pair_id ORDER BY bucket_start ROWS BETWEEN 30 PRECEDING AND 1 PRECEDING) AS vol_std_30m
FROM bar_1m;

信号 #3:单边压力(买入/卖出比率)

泵是由积极的买入驱动的,而抛售是由卖出驱动的。buy_ratio(买入量 / 总成交量)给我们一个清晰的指标。接近 1.0 的比率表明购买狂潮,而接近 0 的比率表明抛售。

综合特征

最后,我们将这些信号整合成一个综合特征视图。此视图计算成交量的最终 Z 分数和买入比率,为我们提供了规则引擎所需的全部要素。

CREATE MATERIALIZED VIEW flow_features AS
SELECT
  b.pair_id,
  b.bucket_start,
  b.ret_1m, b.ret_5m,
  b.vol, b.buy_vol, b.sell_vol,
  CASE WHEN (b.buy_vol + b.sell_vol) > 0
       THEN b.buy_vol / (b.buy_vol + b.sell_vol) ELSE NULL END AS buy_ratio,
  v.vol_mean_30m, v.vol_std_30m,
  CASE
    WHEN v.vol_std_30m IS NULL OR v.vol_std_30m = 0 THEN NULL
    ELSE (b.vol - v.vol_mean_30m) / v.vol_std_30m
  END AS z_vol
FROM bar_1m_with_returns b
LEFT JOIN vol_baseline_30m v
  ON v.pair_id = b.pair_id AND v.bucket_start = b.bucket_start;

3、做出判断 - 评分和触发警报

现在我们有了特征,可以定义一个简单且可解释的规则来标记可疑活动。

Pump/Dump规则

我们的规则简单透明:

  • 如果 1 分钟回报率 ≥ 2% 并且成交量 Z 分数 ≥ 3 并且买入比率 ≥ 0.65,则可能是泵。
  • 如果 1 分钟回报率 ≤ -2% 并且成交量 Z 分数 ≥ 3 并且买入比率 ≤ 0.35,则可能是抛售。

这些阈值(2%,3,0.65)是可调参数,可以根据市场状况进行调整。我们可以使用 CASE WHEN 语句实现此逻辑。

CREATE MATERIALIZED VIEW pump_dump_signals AS
SELECT
  pair_id,
  bucket_start,
  ret_1m, ret_5m, vol, z_vol, buy_ratio,
  CASE WHEN ret_1m IS NOT NULL AND z_vol IS NOT NULL AND buy_ratio IS NOT NULL
            AND ret_1m >= 0.02 AND z_vol >= 3 AND buy_ratio >= 0.65
       THEN 1 ELSE 0 END AS is_pump,
  CASE WHEN ret_1m IS NOT NULL AND z_vol IS NOT NULL AND buy_ratio IS NOT NULL
            AND ret_1m <= -0.02 AND z_vol >= 3 AND buy_ratio <= 0.35
       THEN 1 ELSE 0 END AS is_dump
FROM flow_features;

防止警报疲劳(冷却期)

一次泵事件可能会在连续几小时内触发我们的规则。为了避免向用户或系统发送重复警报,我们必须实施“冷却期”或“去抖动”机制。如果某个市场在过去 15 分钟内没有警报,我们才会发出警报。

这个查询更高级,但优雅地处理了每个对的最后警报时间的状态逻辑。

CREATE MATERIALIZED VIEW pump_dump_alerts AS
WITH raw AS (
  SELECT * FROM pump_dump_signals
  WHERE is_pump = 1 OR is_dump = 1
),
ranked AS (
  SELECT
    pair_id,
    bucket_start,
    is_pump, is_dump,
    ROW_NUMBER() OVER (PARTITION BY pair_id ORDER BY bucket_start DESC) AS rn
  FROM raw
)
SELECT r.*
FROM ranked r
LEFT JOIN LATERAL (
  -- 此对的最后警报时间
  SELECT MAX(bucket_start) AS last_ts
  FROM raw r2
  WHERE r2.pair_id = r.pair_id AND r2.bucket_start < r.bucket_start
) prev ON TRUE
WHERE r.rn = 1 AND (prev.last_ts IS NULL OR r.bucket_start >= prev.last_ts + INTERVAL '15 minutes');

4、采取行动 - 传递最终警报

一旦潜在的泵或抛售事件被确认并通过我们的 pump_dump_alerts 视图去抖动,下一步就是将此信息传递给下游系统,以便采取行动。RisingWave 提供了两种强大的机制:通过 订阅 的直接推送模型和通过 Sink 的数据集成模型。

  • 使用 订阅 用于紧密耦合、低延迟、事件驱动的服务,您希望立即响应警报。
  • 使用 Sink 用于更广泛的数据集成,当您需要可靠地将警报传递给多个系统,或者当您需要 Kafka 等消息队列的持久性和重放功能时。

选项 1:使用 RisingWave 订阅进行直接推送

对于最低延迟和最简单的架构,您可以使用 订阅。此功能允许您的下游应用程序(如通知服务、Telegram 机器人或实时仪表板)直接“订阅”物化视图中的更改。当我们的 pump_dump_alerts 视图中生成新的警报时,RisingWave 会直接将更改推送到您的连接应用程序。

这种方法非常适合构建事件驱动的服务,因为它消除了中介消息队列的需要,从而降低了延迟和运营开销。您的应用程序将使用标准的 PostgreSQL 驱动程序连接到 RisingWave 并监听新的警报行。

要启用此功能,您首先需要在最终的警报视图上创建一个订阅:

CREATE SUBSCRIPTION alert_sub FROM alerts_payload;

您的应用程序可以连接并获取实时发生的警报。

选项 2:将数据下沉到消息队列

更传统的方法是使用 Sink,它将 RisingWave 中的数据更改推送到外部系统,如 Apache Kafka。这种方法非常适合解耦系统,提供消息队列中的持久存储,并将警报扇出到多个独立的消费者应用程序。

首先,我们为警报创建一个干净、丰富的负载。

CREATE MATERIALIZED VIEW alerts_payload AS
SELECT
  a.pair_id, a.bucket_start,
  a.is_pump, a.is_dump,
  f.ret_1m, f.ret_5m, f.vol, f.z_vol, f.buy_ratio
FROM pump_dump_alerts a
JOIN flow_features f
  ON f.pair_id = a.pair_id AND f.bucket_start = a.bucket_start;

接下来,我们创建一个 sink 将数据发送到 Kafka 主题。这也可以轻松配置为 webhook、对象存储或另一个数据库。

CREATE SINK pump_dump_alerts_sink
FROM alerts_payload
WITH (
  connector = 'kafka',
  topic = 'alerts.pump_dump',
  properties.bootstrap.server = 'localhost:9092'
) FORMAT PLAIN ENCODE JSON (
    force_append_only = 'true'
);

5、从代码到系统:生产就绪的考虑事项

虽然这提供了一个完整且可工作的系统,但在生产环境中还需要考虑以下几点:

  • 处理数据缺陷: 水印对于管理迟到的数据和确保正确性至关重要。根据您的源的典型延迟调整水印间隔。
  • 调整和回测: 评分规则中使用的阈值并非普遍适用。它们应根据历史数据进行调整。您可以将所有 flow_features 数据下沉到分析数据库中,以找到平衡不同市场精度和召回率的最佳参数。
  • 稀疏市场: 对于流动性差的对,成交量和回报指标可能非常嘈杂。考虑使用我们讨论过的 active_pairs 过滤器,或者在 Z 分数计算中使用更稳健的统计方法,如中位数和平均绝对偏差(MAD)而不是均值和标准差。

6、结束语

仅通过几个声明性的 SQL 查询,我们就构建了一个复杂且实时的事件检测系统。我们接收了一个高容量的数据流,进行了状态化、时间窗口化的计算,并将可操作的警报推送到外部系统。这展示了像 RisingWave 这样的流数据库处理复杂、实时分析任务的能力,这些任务曾经是复杂定制代码的领域。

从这里,这个系统可以进一步扩展:

  • 结合订单簿数据: 分析诸如买卖价差扩大、市场深度和价格滑点等特征,以获得更稳健的信号。
  • 集成外部信号: 将价格/成交量动作与社交媒体情绪或新闻公告流结合起来。
  • 应用机器学习: 使用计算出的特征训练一个轻量级分类模型(如逻辑回归或梯度提升树)以产生更细致的泵-抛售评分。

原文链接:Building a Real-Time Crypto Pump-and-Dump Detector with SQL

DefiPlot翻译整理,转载请标明出处

免责声明:本站资源仅用于学习目的,也不应被视为投资建议,读者在采取任何行动之前应自行研究并对自己的决定承担全部责任。