卡尔曼滤波配对交易策略
一、交易策略解释
核心思想
配对交易策略基于两个相关资产价格之间存在长期均衡关系的假设。当这种关系发生暂时偏离时,通过同时买入低估资产和卖出高估资产,等待价格关系回归均衡时获利。
卡尔曼滤波配对交易策略(Kalman Filter Pairs Trading Strategy)的核心创新点在于不使用固定的对冲比率,而是利用卡尔曼滤波算法动态估计最优对冲比率,使策略能够适应市场变化。
理论基础
卡尔曼滤波配对交易策略建立在以下几个理论基础之上:
-
统计套利理论:源自金融统计学,假设市场中存在可利用的统计规律。Eugene Fama的有效市场假说认为,价格已包含所有信息,但现实市场中,价格偏离基本面的情况时有发生,为统计套利提供机会。
-
协整理论:由Engle和Granger于1987年提出,描述了非平稳时间序列之间可能存在的长期均衡关系。两个资产价格可能各自遵循随机游走过程,但它们的线性组合可能是平稳的,这为配对交易提供了理论依据。
-
卡尔曼滤波:由Rudolf Kalman在1960年代开发的递归状态估计算法,能够在噪声环境中估计动态系统的状态。在金融领域,它被用来估计时变参数,如配对交易中的动态对冲比率。相比传统OLS回归,卡尔曼滤波可以捕捉参数随时间的变化,提高策略适应性。
策略适用场景
-
相关性强的品种对:适用于具有产业链关联或替代关系的商品期货对,如不锈钢/镍、螺纹钢/铁矿石、焦煤/焦炭等。
-
区间震荡市场:在大趋势不明显的区间震荡市场中表现最佳,因为价格更可能围绕均衡水平波动。
-
市场非理性波动时期:当短期供需冲击导致价格关系暂时失衡时,策略能够捕捉回归机会。
-
低波动率环境:在市场波动较小时,价差的偏离和回归更为明显,信号质量更高。
二、天勤介绍
天勤平台概述
天勤(TqSdk)是一个由信易科技开发的开源量化交易系统,为期货、期权等衍生品交易提供专业的量化交易解决方案。平台具有以下特 点:
- 丰富的行情数据: 提供所有可交易合约的全部Tick和K线数据,基于内存数据库实现零延迟访问。
- 一站式的解决方案: 从历史数据分析到实盘交易的完整工具链,打通开发、回测、模拟到实盘的全流程。
- 专业的技术支持: 近百个技术指标源码,深度集成pandas和numpy,采用单线程异步模型保证性能。
策略开发流程
- 环境准备
- 安装Python环境(推荐Python 3.6或以上版本)
- 安装tqsdk包:pip install tqsdk
- 注册天勤账户获取访问密钥
- 数据准备
- 订阅近月与远月合约行情
- 获取历史K线或者Tick数据(用于分析与行情推进)
- 策略编写
- 设计信号生成逻辑(基于价差、均值和标准差)
- 编写交易执行模块(开仓、平仓逻辑)
- 实现风险控制措施(止损、资金管理)
- 回测验证
- 设置回测时间区间和初始资金
- 运行策略获取回测结果
- 分析绩效指标(胜率、收益率、夏普率等)
- 策略优化
- 调整参数(标准差倍数、窗口大小等)
- 添加过滤条件(成交量、波动率等)
- 完善风险控制机制
三、天勤实现策略
策略原理
卡尔曼滤波配对交易策略的实现基于以下核心计算方法:
卡尔曼滤波算法:该算法用于动态估计不锈钢与沪镍之间的对冲比率(β)。算法由预测和更新两步组成:
- 预测步骤:
预测状态: β̂t|t-1 = β̂t-1|t-1
预测协方差: Pt|t-1 = Pt-1|t-1 + Q
- 更新步骤:
卡尔曼增益: Kt = Pt|t-1 * Xt / (Pt|t-1 * Xt^2 + R)
更新状态: β̂t|t = β̂t|t-1 + Kt * (Yt - β̂t|t-1 * Xt)
更新协方差: Pt|t = (1 - Kt * Xt) * Pt|t-1
其中:
- β̂t|t-1:t时刻的预测状态(对冲比率)
- Pt|t-1:预测状态的协方差
- Q:过程噪声协方差(STATE_VAR)
- R:观测噪声协方差(OBS_VAR)
- Kt:卡尔曼增益
- Xt:沪镍价格
- Yt:不锈钢价格
价差计算: 使用当前估计的对冲比率计算价差
Spreadt = Yt - β̂t|t * Xt
Z分数标准化:将价差转换为Z分数,使其具有均值0、标准差1的分布特性
Zt = (Spreadt - μt) / σt
其中μt和σt是过去WIN期间价差的均值和标准差。
交易逻辑
策略的交易逻辑简洁明了,基于Z分数的偏离和回归:
开仓信号:
- 当Z分数 < OPEN_L(-2.0)时:价差被低估,做多不锈钢、做空沪镍
- 当Z分数 > OPEN_H(2.0)时:价差被高估,做空不锈钢、做多沪镍
平仓信号:
- 回归平仓:Z分数回到CLOSE_L(-0.5)与CLOSE_H(0.5)之间
- 止损平仓:Z分数向不利方向移动超过STOP_SPREAD(3.0)
- 超时平仓:持仓时间超过MAX_HOLD(10天)
仓位计算:
- 不锈钢仓位:计算风险资金对应的不锈钢手数
- 沪镍仓位:根据对冲比率和价格比例计算平衡的沪镍手数
回测
回测初始设置
- 测试周期: 2022 年 7 月 4 日 - 2022 年 8 月 31 日
- 交易品种: SHFE.ss2209/SHFE.ni2209
- 初始资金: 1000万元
回测结果
上述回测累计收益走势图
完整代码示例
import numpy as np
import pandas as pd
from tqsdk import TqApi, TqAuth, TargetPosTask, TqBacktest, BacktestFinished
from datetime import date
# === 全局参数 ===
SYMBOL_Y = "SHFE.ss2209"
SYMBOL_X = "SHFE.ni2209"
OBS_VAR = 0.01
STATE_VAR = 0.0001
INIT_MEAN = 1.0
INIT_VAR = 1.0
WIN = 60
OPEN_H = 2.0
OPEN_L = -2.0
CLOSE_H = 0.5
CLOSE_L = -0.5
STOP_SPREAD = 3.0
MAX_HOLD = 10
POS_PCT = 0.05
INIT_CAP = 10000000
# === 全局变量 ===
state_mean = INIT_MEAN
state_var = INIT_VAR
prices_y, prices_x, hedge_ratios, spreads, zscores = [], [], [], [], []
position = 0
entry_z = 0
entry_time = None
trade_count = 0
win_count = 0
total_profit = 0
total_loss = 0
hold_days = 0
last_day = None
# === API初始化 ===
api = TqApi(backtest=TqBacktest(start_dt=date(2022, 7, 4), end_dt=date(2022, 8, 31)),
auth=TqAuth("快期账号", "快期密码"))
quote_y = api.get_quote(SYMBOL_Y)
quote_x = api.get_quote(SYMBOL_X)
klines_y = api.get_kline_serial(SYMBOL_Y, 60*60)
klines_x = api.get_kline_serial(SYMBOL_X, 60*60)
target_y = TargetPosTask(api, SYMBOL_Y)
target_x = TargetPosTask(api, SYMBOL_X)
try:
while True:
api.wait_update()
if api.is_changing(klines_y.iloc[-1], "datetime") or api.is_changing(klines_x.iloc[-1], "datetime"):
price_y = klines_y.iloc[-1]["close"]
price_x = klines_x.iloc[-1]["close"]
now = pd.to_datetime(klines_y.iloc[-1]["datetime"], unit="ns")
today = now.date()
if last_day and today != last_day and position != 0:
hold_days += 1
last_day = today
prices_y.append(price_y)
prices_x.append(price_x)
if len(prices_y) > 10:
# 卡尔曼滤波
pred_mean = state_mean
pred_var = state_var + STATE_VAR
k_gain = pred_var / (pred_var * price_x**2 + OBS_VAR)
state_mean = pred_mean + k_gain * (price_y - pred_mean * price_x)
state_var = (1 - k_gain * price_x) * pred_var
hedge_ratios.append(state_mean)
spread = price_y - state_mean * price_x
spreads.append(spread)
if len(spreads) >= WIN:
recent = spreads[-WIN:]
mean = np.mean(recent)
std = np.std(recent)
z = (spread - mean) / std if std > 0 else 0
zscores.append(z)
print(f"时间:{now}, Y:{price_y}, X:{price_x}, 对冲比:{state_mean:.4f}, Z:{z:.4f}")
# 开仓
if position == 0:
if z < OPEN_L:
lots = int(INIT_CAP * POS_PCT / quote_y.margin)
lots_x = int(lots * state_mean * price_y * quote_y.volume_multiple / (price_x * quote_x.volume_multiple))
if lots > 0 and lots_x > 0:
target_y.set_target_volume(lots)
target_x.set_target_volume(-lots_x)
position = 1
entry_z = z
entry_time = now
print(f"开多Y空X, Y:{lots}, X:{lots_x}, 入场Z:{z:.4f}")
elif z > OPEN_H:
lots = int(INIT_CAP * POS_PCT / quote_y.margin)
lots_x = int(lots * state_mean * price_y * quote_y.volume_multiple / (price_x * quote_x.volume_multiple))
if lots > 0 and lots_x > 0:
target_y.set_target_volume(-lots)
target_x.set_target_volume(lots_x)
position = -1
entry_z = z
entry_time = now
print(f"开空Y多X, Y:{lots}, X:{lots_x}, 入场Z:{z:.4f}")
# 平仓
else:
profit_cond = CLOSE_L < z < CLOSE_H
stop_cond = (position == 1 and z < entry_z - STOP_SPREAD) or (position == -1 and z > entry_z + STOP_SPREAD)
max_hold = hold_days >= MAX_HOLD
if profit_cond or stop_cond or max_hold:
target_y.set_target_volume(0)
target_x.set_target_volume(0)
trade_count += 1
pnl = (z - entry_z) * position
if pnl > 0:
win_count += 1
total_profit += pnl
else:
total_loss -= pnl
reason = "回归" if profit_cond else "止损" if stop_cond else "超期"
print(f"平仓:{reason}, 出场Z:{z:.4f}, 收益:{pnl:.4f}, 持有天:{hold_days}")
position = 0
entry_z = 0
entry_time = None
hold_days = 0
except BacktestFinished as e:
print("回测结束")
api.close()