卡尔曼滤波配对交易策略

一、交易策略解释

核心思想

配对交易策略基于两个相关资产价格之间存在长期均衡关系的假设。当这种关系发生暂时偏离时,通过同时买入低估资产和卖出高估资产,等待价格关系回归均衡时获利。

卡尔曼滤波配对交易策略(Kalman Filter Pairs Trading Strategy)的核心创新点在于不使用固定的对冲比率,而是利用卡尔曼滤波算法动态估计最优对冲比率,使策略能够适应市场变化。

理论基础

卡尔曼滤波配对交易策略建立在以下几个理论基础之上:

  • 统计套利理论:源自金融统计学,假设市场中存在可利用的统计规律。Eugene Fama的有效市场假说认为,价格已包含所有信息,但现实市场中,价格偏离基本面的情况时有发生,为统计套利提供机会。

  • 协整理论:由Engle和Granger于1987年提出,描述了非平稳时间序列之间可能存在的长期均衡关系。两个资产价格可能各自遵循随机游走过程,但它们的线性组合可能是平稳的,这为配对交易提供了理论依据。

  • 卡尔曼滤波:由Rudolf Kalman在1960年代开发的递归状态估计算法,能够在噪声环境中估计动态系统的状态。在金融领域,它被用来估计时变参数,如配对交易中的动态对冲比率。相比传统OLS回归,卡尔曼滤波可以捕捉参数随时间的变化,提高策略适应性。

策略适用场景

  • 相关性强的品种对:适用于具有产业链关联或替代关系的商品期货对,如不锈钢/镍、螺纹钢/铁矿石、焦煤/焦炭等。

  • 区间震荡市场:在大趋势不明显的区间震荡市场中表现最佳,因为价格更可能围绕均衡水平波动。

  • 市场非理性波动时期:当短期供需冲击导致价格关系暂时失衡时,策略能够捕捉回归机会。

  • 低波动率环境:在市场波动较小时,价差的偏离和回归更为明显,信号质量更高。

二、天勤介绍

天勤平台概述

天勤(TqSdk)是一个由信易科技开发的开源量化交易系统,为期货、期权等衍生品交易提供专业的量化交易解决方案。平台具有以下特 点:

  • 丰富的行情数据 提供所有可交易合约的全部Tick和K线数据,基于内存数据库实现零延迟访问。
  • 一站式的解决方案 从历史数据分析到实盘交易的完整工具链,打通开发、回测、模拟到实盘的全流程。
  • 专业的技术支持 近百个技术指标源码,深度集成pandas和numpy,采用单线程异步模型保证性能。

策略开发流程

  • 环境准备
    • 安装Python环境(推荐Python 3.6或以上版本)
    • 安装tqsdk包:pip install tqsdk
    • 注册天勤账户获取访问密钥
  • 数据准备
    • 订阅近月与远月合约行情
    • 获取历史K线或者Tick数据(用于分析与行情推进)
  • 策略编写
    • 设计信号生成逻辑(基于价差、均值和标准差)
    • 编写交易执行模块(开仓、平仓逻辑)
    • 实现风险控制措施(止损、资金管理)
  • 回测验证
    • 设置回测时间区间和初始资金
    • 运行策略获取回测结果
    • 分析绩效指标(胜率、收益率、夏普率等)
  • 策略优化
    • 调整参数(标准差倍数、窗口大小等)
    • 添加过滤条件(成交量、波动率等)
    • 完善风险控制机制

三、天勤实现策略

策略原理

卡尔曼滤波配对交易策略的实现基于以下核心计算方法:

卡尔曼滤波算法:该算法用于动态估计不锈钢与沪镍之间的对冲比率(β)。算法由预测和更新两步组成:

  1. 预测步骤:
预测状态: β̂t|t-1 = β̂t-1|t-1
预测协方差: Pt|t-1 = Pt-1|t-1 + Q
  1. 更新步骤:
卡尔曼增益: Kt = Pt|t-1 * Xt / (Pt|t-1 * Xt^2 + R)
更新状态: β̂t|t = β̂t|t-1 + Kt * (Yt - β̂t|t-1 * Xt)
更新协方差: Pt|t = (1 - Kt * Xt) * Pt|t-1

其中:

  • β̂t|t-1:t时刻的预测状态(对冲比率)
  • Pt|t-1:预测状态的协方差
  • Q:过程噪声协方差(STATE_VAR)
  • R:观测噪声协方差(OBS_VAR)
  • Kt:卡尔曼增益
  • Xt:沪镍价格
  • Yt:不锈钢价格

价差计算: 使用当前估计的对冲比率计算价差

Spreadt = Yt - β̂t|t * Xt

Z分数标准化:将价差转换为Z分数,使其具有均值0、标准差1的分布特性

Zt = (Spreadt - μt) / σt

其中μt和σt是过去WIN期间价差的均值和标准差。

交易逻辑

策略的交易逻辑简洁明了,基于Z分数的偏离和回归:

开仓信号:

  • 当Z分数 < OPEN_L(-2.0)时:价差被低估,做多不锈钢、做空沪镍
  • 当Z分数 > OPEN_H(2.0)时:价差被高估,做空不锈钢、做多沪镍

平仓信号:

  • 回归平仓:Z分数回到CLOSE_L(-0.5)与CLOSE_H(0.5)之间
  • 止损平仓:Z分数向不利方向移动超过STOP_SPREAD(3.0)
  • 超时平仓:持仓时间超过MAX_HOLD(10天)

仓位计算:

  • 不锈钢仓位:计算风险资金对应的不锈钢手数
  • 沪镍仓位:根据对冲比率和价格比例计算平衡的沪镍手数

回测

回测初始设置

  • 测试周期: 2022 年 7 月 4 日 - 2022 年 8 月 31 日
  • 交易品种: SHFE.ss2209/SHFE.ni2209
  • 初始资金: 1000万元

回测结果

上述回测累计收益走势图

完整代码示例

import numpy as np
import pandas as pd
from tqsdk import TqApi, TqAuth, TargetPosTask, TqBacktest, BacktestFinished
from datetime import date

# === 全局参数 ===
SYMBOL_Y = "SHFE.ss2209"
SYMBOL_X = "SHFE.ni2209"
OBS_VAR = 0.01
STATE_VAR = 0.0001
INIT_MEAN = 1.0
INIT_VAR = 1.0
WIN = 60
OPEN_H = 2.0
OPEN_L = -2.0
CLOSE_H = 0.5
CLOSE_L = -0.5
STOP_SPREAD = 3.0
MAX_HOLD = 10
POS_PCT = 0.05
INIT_CAP = 10000000

# === 全局变量 ===
state_mean = INIT_MEAN
state_var = INIT_VAR
prices_y, prices_x, hedge_ratios, spreads, zscores = [], [], [], [], []
position = 0
entry_z = 0
entry_time = None
trade_count = 0
win_count = 0
total_profit = 0
total_loss = 0
hold_days = 0
last_day = None

# === API初始化 ===
api = TqApi(backtest=TqBacktest(start_dt=date(2022, 7, 4), end_dt=date(2022, 8, 31)),
                auth=TqAuth("快期账号", "快期密码"))
quote_y = api.get_quote(SYMBOL_Y)
quote_x = api.get_quote(SYMBOL_X)
klines_y = api.get_kline_serial(SYMBOL_Y, 60*60)
klines_x = api.get_kline_serial(SYMBOL_X, 60*60)
target_y = TargetPosTask(api, SYMBOL_Y)
target_x = TargetPosTask(api, SYMBOL_X)

try:
    while True:
        api.wait_update()
        if api.is_changing(klines_y.iloc[-1], "datetime") or api.is_changing(klines_x.iloc[-1], "datetime"):
            price_y = klines_y.iloc[-1]["close"]
            price_x = klines_x.iloc[-1]["close"]
            now = pd.to_datetime(klines_y.iloc[-1]["datetime"], unit="ns")
            today = now.date()
            if last_day and today != last_day and position != 0:
                hold_days += 1
            last_day = today
            prices_y.append(price_y)
            prices_x.append(price_x)
            if len(prices_y) > 10:
                # 卡尔曼滤波
                pred_mean = state_mean
                pred_var = state_var + STATE_VAR
                k_gain = pred_var / (pred_var * price_x**2 + OBS_VAR)
                state_mean = pred_mean + k_gain * (price_y - pred_mean * price_x)
                state_var = (1 - k_gain * price_x) * pred_var
                hedge_ratios.append(state_mean)
                spread = price_y - state_mean * price_x
                spreads.append(spread)
                if len(spreads) >= WIN:
                    recent = spreads[-WIN:]
                    mean = np.mean(recent)
                    std = np.std(recent)
                    z = (spread - mean) / std if std > 0 else 0
                    zscores.append(z)
                    print(f"时间:{now}, Y:{price_y}, X:{price_x}, 对冲比:{state_mean:.4f}, Z:{z:.4f}")
                    # 开仓
                    if position == 0:
                        if z < OPEN_L:
                            lots = int(INIT_CAP * POS_PCT / quote_y.margin)
                            lots_x = int(lots * state_mean * price_y * quote_y.volume_multiple / (price_x * quote_x.volume_multiple))
                            if lots > 0 and lots_x > 0:
                                target_y.set_target_volume(lots)
                                target_x.set_target_volume(-lots_x)
                                position = 1
                                entry_z = z
                                entry_time = now
                                print(f"开多Y空X, Y:{lots}, X:{lots_x}, 入场Z:{z:.4f}")
                        elif z > OPEN_H:
                            lots = int(INIT_CAP * POS_PCT / quote_y.margin)
                            lots_x = int(lots * state_mean * price_y * quote_y.volume_multiple / (price_x * quote_x.volume_multiple))
                            if lots > 0 and lots_x > 0:
                                target_y.set_target_volume(-lots)
                                target_x.set_target_volume(lots_x)
                                position = -1
                                entry_z = z
                                entry_time = now
                                print(f"开空Y多X, Y:{lots}, X:{lots_x}, 入场Z:{z:.4f}")
                    # 平仓
                    else:
                        profit_cond = CLOSE_L < z < CLOSE_H
                        stop_cond = (position == 1 and z < entry_z - STOP_SPREAD) or (position == -1 and z > entry_z + STOP_SPREAD)
                        max_hold = hold_days >= MAX_HOLD
                        if profit_cond or stop_cond or max_hold:
                            target_y.set_target_volume(0)
                            target_x.set_target_volume(0)
                            trade_count += 1
                            pnl = (z - entry_z) * position
                            if pnl > 0:
                                win_count += 1
                                total_profit += pnl
                            else:
                                total_loss -= pnl
                            reason = "回归" if profit_cond else "止损" if stop_cond else "超期"
                            print(f"平仓:{reason}, 出场Z:{z:.4f}, 收益:{pnl:.4f}, 持有天:{hold_days}")
                            position = 0
                            entry_z = 0
                            entry_time = None
                            hold_days = 0

except BacktestFinished as e:
    print("回测结束")
    api.close()