算法交易从零到壹
在本文中,我将分享目前支持我的算法交易业务的软件栈。我将介绍我的开源自动化交易引擎——alchemist,以及数据管道——trading-data,并带您了解我是如何在日常工作中使用它们的。

一键发币: SUI | SOL | BNB | ETH | BASE | ARB | OP | POLYGON | 跨链桥/跨链兑换
当我开始认真交易时,我尝试获取一些最基本的组件。我认同Jim Simons的观点,即消除情绪是金融交易中最重要的方面之一。此外,在寻找模式和回测时,我需要一个历史数据池以及能够从池中加载特定数据并运行回测的引擎。
为了满足我的交易需求,我确定了三个基本组件:
- 自动化交易引擎——无需手动干预或情感偏见即可部署策略。
- 数据管道——收集和管理历史数据以进行回测和策略分析。
- 回测引擎——在实盘部署之前严格测试策略。
1、构建自己的自动化交易引擎
对于任何交易业务来说,主要的重点应该是构建超越市场的策略。我最初使用Backtrader作为我的自动化交易引擎。然而,我发现Backtrader在构建策略时很难调试——也许这值得单独写一篇文章。
作为一名资深量化开发人员,我有信心构建自己的自动化交易引擎。我决定维护自己的自动化交易引擎。我可以确保我对系统有完全的控制权,并且系统可以满足我的要求。
2、Alchemist——自动化交易引擎
Alchemist是一个高性能、分布式的自动化交易系统,旨在无缝地将定量研究转化为全自动化的算法交易策略。
基于Ray构建,Alchemist提供了一个可扩展的基础设施,用于策略执行、实时订单管理以及与市场数据和经纪商连接的无缝集成。无论是定量研究人员、算法交易员还是对冲基金,Alchemist都能帮助您高效地部署、管理和扩展交易策略。
Alchemist弥合了我所说的“研究到执行差距”——从有前景的回测到实际部署之间痛苦的飞跃。
以下是一个简化的示例,展示我是如何使用Alchemist在Interactive Brokers上部署交易策略的。
from alchemist.account import Account
from alchemist.data_card import DataCard
from alchemist.products.stock_product import StockProduct
from alchemist.gateways.ib_gateway import IbGateway
accounts = [
Account(**account)
]
products = [
StockProduct(
name=products[i]['name'],
base_currency=products[i]['base_currency'],
exch=products[i]['exch']
) for i in range(len(products))
]
data_cards = [
DataCard(
product=products[i],
freq=data_cards[i]['freq'],
aggregation=data_cards[i]['aggregation']
) for i in range(len(data_cards))
]
monitor_actor = TelegramMonitor.remote(
bot_token=os.getenv('TELEGRAM_BOT_TOKEN'),
chat_id=os.getenv('TELEGRAM_CHAT_ID'),
flush_interval=60.0,
is_test=False,
)
strategy_actor = BBReversalStrategy.remote(
name=strategy['name'],
zmq_send_port=strategy['zmq_send_port'],
zmq_recv_ports=strategy['zmq_recv_ports'],
products=products,
data_cards=data_cards,
data_pipeline='IBDataPipeline', # REMINDER: this must be a class name. it will be imported inside the actor process
data_pipeline_kwargs={
'account': accounts[0],
},
monitor_actor=monitor_actor
)
gateway_actor = IbGateway.remote(
subscriptions=gateway['subscriptions'],
zmq_send_port=gateway['zmq_send_port'],
zmq_recv_ports=gateway['zmq_recv_ports'],
accounts=accounts,
products=products,
)
# 开始回填
backfill_start_date = (datetime.now() - timedelta(days=2)).strftime('%Y-%m-%d') # TEMP: 填充2天的分钟级数据
ray.get(strategy_actor.start_backfilling.remote(
backfilling_start_date=backfill_start_date,
))
gateway_actor.start.remote()
strategy_actor.start.remote()
# 被动运行循环(可以演变为监控系统)
market_close_time = datetime.now().replace(hour=16, minute=0, second=0, microsecond=0)
try:
while datetime.now() < market_close_time:
time.sleep(60)
# 可选地在此处轮询Actor健康状况
print("市场收盘时间已到。正在关闭策略和网关...")
ray.shutdown()
sys.exit(0)
except KeyboardInterrupt:
print("手动关闭请求。")
ray.shutdown()
sys.exit(0)
要创建一个策略,您可以编写一个类似风格的策略类,就像在backtrader中一样:
import typing
import ray
from alchemist.strategies.base_strategy import BaseStrategy
from alchemist.datas.bar_data import BarData
from alchemist.products.base_product import BaseProduct
from alchemist.indicators.sma_indicator import SmaIndicator
@ray.remote
class MovingAverageStrategy(BaseStrategy):
"""
一种使用简单移动平均生成买入/卖出信号的基准策略。
"""
STRAT = 'strat-baseline-1'
PARAMS = {
'period': 10
}
def __init__(self, name, zmq_send_port, zmq_recv_ports, products: typing.List[BaseProduct], data_cards, data_pipeline=None, data_pipelin_kwargs=None, max_len=1000, params=None, monitor_actor=None):
super().__init__(name, zmq_send_port, zmq_recv_ports, products, data_cards, data_pipeline, data_pipelin_kwargs, max_len, params, monitor_actor)
self.product = self.products[0]
data_card = self.data_cards[0]
data_card_index = self.create_data_index(data_card.product.exch, data_card.product.name, data_card.freq, aggregation=data_card.aggregation)
self.data = self.datas[data_card_index]
self.sma_indicator = SmaIndicator(
data=self.data,
min_period=self.params['period']
)
def next(self):
"""
覆盖on_bar方法以处理条形更新并生成信号。
"""
# if len(self.sma_indicator.sma) > 1:
# print('close[-1]={} sma[-1]={} close[-2]={} sma[-2]={}'.format(self.data[-1].close, self.sma_indicator.sma[-1], self.data[-2].close, self.sma_indicator.sma[-2]))
# else:
# print('len sma={}'.format(len(self.sma_indicator.sma)))
if self.data[-1].close > self.sma_indicator.sma[-1] and self.data[-2].close <= self.sma_indicator.sma[-2]:
self.buy(
gateway='mock_gateway',
product=self.products[0],
price=self.data[-1].close,
size=1,
order_type='MARKET',
time_in_force='GTC',
)
elif self.data[-1].close < self.sma_indicator.sma[-1] and self.data[-2].close >= self.sma_indicator.sma[-2]:
self.sell(
gateway='mock_gateway',
product=self.products[0],
price=self.data[-1].close,
size=1,
order_type='MARKET',
time_in_force='GTC',
)
def get_signals(self):
return {
'sma - close': self.sma_indicator.sma[-1] - self.data[-1].close,
}
有关更多详细信息,请查看此处的文档:这里。
3、构建自己的数据管道
研究的成功与否取决于数据质量。我构建了一个名为trading-data
的小型CLI驱动服务,用于收集和组织我可能交易的所有内容:来自Binance和Bybit的加密货币行情、CME期货的分钟线、股票和ETF的日OHLCV数据。
数据湖会将每个数据源写入~/.trading-data
下的文件夹树中,一条像这样的命令:
trading-data datalake update--name ib --start-date 2024-01-01
拉取新鲜的日内数据,更新目录,并记录谱系,以便几个月后我可以重现任何回测。
目前我已经为Bybit、Binance、Firstrate数据和Traders工作站构建了管道。你可以在我的存储库的README.md中查看详细信息。
4、使用Backtrader回测策略
尽管我发现Backtrader在实时交易中难以调试,但它仍然是广泛采用的回测引擎。我选择暂时不重新发明轮子。我主要用它来回测想法,然后将成功的策略转移到Alchemist中进行实盘执行。
我将数据管道集成到Backtrader的cerebro中以加载历史数据。你可以像这样使用数据管道加载历史数据:
import pandas as pd
from trading_data.datalake_client import DatalakeClient
from trading_data.common.date_ranges import get_dates
def load_data(pdt: str, start_date: str, end_date: str, data_source: str) -> pd.DataFrame:
dlc = DatalakeClient()
dates = get_dates(start_date, end_date)
dfs = []
for date in dates:
try:
df = dlc.get_table(data_source, pdt, ver_name='min_bar', set_index=True, date=date)
dfs.append(df)
except:
continue # TODO 可以记录消息以供INFO
df = pd.concat(dfs)
return df
然后可以像下面这样构建策略和主脚本:
import pandas as pd
import backtrader as bt
# `quant-factory` 是我的私人存储库,用于开发交易策略
# 所有策略都基于我在本文中提到的开源存储库构建
from quant_factory.strategies.common import load_data
from quant_factory.strategies.intraday_bb_reversal_strategy.create_backtest_stratetgy import create_strategy
from quant_factory.analyzers import MetricsAnalyzer, TradeAnalyzer
from quant_factory.commissions import IBStockCommission
DEFAULT_STRAT_PARAMS = {
'bollinger_period': 60,
'bollinger_dev': 2.5,
'bollinger_dev_exit': 2,
'fast_sma_period': 20,
'slow_sma_period': 60,
'atr_period': 15,
'min_trade_equity': 2000,
}
BAR_TYPE = 'min_bar'
def run_backtest(asset, strat_params, start_date, end_date, initial_cash=100000, data_source='yfinance', plot=False):
"""
对单个资产运行反转策略的回测。
返回最终的投资组合价值和各种性能指标。
"""
# 加载数据
df = load_data(asset, start_date, end_date, data_source)
# 转换为Backtrader数据源
data_feed = bt.feeds.PandasData(dataname=df)
# 创建Backtrader引擎
cerebro = bt.Cerebro()
cerebro.adddata(data_feed)
strat_params = {name: strat_params.get(name, DEFAULT_STRAT_PARAMS.get(name)) for name in DEFAULT_STRAT_PARAMS}
# 添加策略
StrategyClass = create_strategy(strat_params)
cerebro.addstrategy(StrategyClass)
# 设置初始现金
cerebro.broker.set_cash(initial_cash)
# 设置佣金
cerebro.broker.addcommissioninfo(IBStockCommission())
# 添加指标分析器
cerebro.addanalyzer(MetricsAnalyzer, _name='metrics')
cerebro.addanalyzer(TradeAnalyzer, _name='trades')
# 运行回测
results = cerebro.run()
strat = results[0]
if plot:
cerebro.plot()
# 获取最终投资组合价值
final_value = cerebro.broker.getvalue()
# 计算指标
metrics = strat.analyzers.metrics.get_metrics()
trades = strat.analyzers.trades.get_analysis()
return final_value, metrics, trades
5、我的策略开发哲学
当我设计策略时,我始终专注于从开发到部署的速度。我相信没有比实盘交易更重要的过程。回测仍然很重要,因为它能给你一些关于策略表现的见解。然而,回测只基于历史数据。你的策略与实时数据的交互数据比回测更有价值。
因此,我当前的开发策略很简单:
发射火箭,让它坠毁,然后学习。
这与SpaceX的Elon Musk的方法相呼应:快速发射,快速失败,快速学习。我尝试了一种更积极的研究方法来开发策略,但结果证明这花费了太多时间和资源,直到我得到一个运行中的策略。这种详尽的研究对于一个人的操作来说并不现实。在我发布第一个实盘策略之前,这种方法就会把我耗尽。
原文链接:How I Started My Algorithmic Trading Business
DefiPlot翻译整理,转载请标明好处
免责声明:本站资源仅用于学习目的,也不应被视为投资建议,读者在采取任何行动之前应自行研究并对自己的决定承担全部责任。