Statistical Arbitrage — Pairs Trading via Cointegration and Kalman Filter Hedge Ratios
A Python module for systematic pairs trading grounded in the statistical concept of cointegration. Two asset prices are cointegrated when their individual log-price series are each integrated of order one — random walks that individually have no tendency to revert — but a linear combination of the two is stationary and mean-reverting. This long-run equilibrium relationship, and the self-correcting dynamics that enforce it, is precisely what a mean-reversion trading strategy requires; correlation-based approaches, which operate on returns rather than price levels, lack this property. The module screens a universe of candidate pairs using the Engle–Granger two-step test and the Johansen trace test, ranks them by p-value and half-life of mean reversion, fits an Ornstein–Uhlenbeck process to each spread, and replaces the static OLS hedge ratio with a Kalman filter estimate that adapts as the cointegrating relationship drifts over time. Trading signals are generated from z-score extremes, and the strategy is backtested with realistic transaction costs, position sizing from spread volatility, and cointegration stability monitoring via rolling Engle–Granger p-values and half-life drift. Built on Python 3.11+ using pandas, numpy, scipy, statsmodels, plotly, streamlit, duckdb, and pydantic v2; packaged with hatchling and tested with pytest against deterministic seed-42 fixtures.
%==========%
I. Interactive Dashboard:
The dashboard below runs entirely in the browser via stlite (Streamlit on WebAssembly — no server required). Three synthetic cointegrated pairs are generated from known OU parameters with a slowly drifting true hedge ratio: an energy-major pair (XOM/CVX), a utility pair (NEE/SO), and a financial-ETF pair (XLF/KBE). Sidebar controls let you select the pair and adjust signal thresholds, transaction costs, and Kalman state-noise parameter. The four tabs show the spread and z-score with entry/exit bands, the fitted OU process against the actual spread, the Kalman filter dynamic hedge ratio versus the static OLS estimate, and the backtested equity curve with trade annotations. First load downloads Pyodide and may take 20–40 seconds; subsequent loads are cached.
%==========%
II. Project Layout:
pairs-trading/
├── pyproject.toml # Build config, deps, ruff + pytest settings
├── .env.example # DB_PATH
├── data/ # Populated by scripts/download_data.py (git-ignored)
│ └── pairs.duckdb # DuckDB: prices + pairs + trades tables
├── scripts/
│ └── download_data.py # yfinance → DuckDB
├── src/pairs_trading/
│ ├── data/
│ │ ├── schemas.py # Pydantic v2: PriceRecord, PairRecord, TradeRecord
│ │ ├── fetchers.py # yfinance daily close download
│ │ └── store.py # DuckDB init, upsert, read for prices/pairs/trades
│ ├── cointegration/
│ │ ├── engle_granger.py # Two-step EG test, ADF on residuals, half-life
│ │ ├── johansen.py # Johansen trace + max-eigenvalue test via statsmodels
│ │ └── screening.py # Universe screening, rolling EG p-value + half-life
│ ├── ou_process/
│ │ └── ou_fit.py # OUProcess, fit_ou, simulate_ou, equilibrium bands
│ ├── kalman/
│ │ └── hedge_ratio.py # KalmanHedgeRatio, kalman_spread
│ ├── signals/
│ │ └── generator.py # compute_spread, compute_zscore, generate_signals
│ ├── backtest/
│ │ └── engine.py # BacktestConfig, BacktestResult, run_backtest
│ ├── report/
│ │ └── plots.py # Plotly: spread/z-score, Kalman hedge, equity curve, stability
│ ├── cli.py # Typer CLI: fetch | screen | analyse | backtest | dashboard
│ └── app.py # Streamlit: 4 tabs (Spread, OU, Kalman, Equity)
└── tests/
├── conftest.py # Seed-42 cointegrated price pair + spread fixtures
├── test_cointegration.py # EG p-value, hedge ratio, half-life, Johansen invariants
├── test_ou_process.py # OU fit + simulation invariants
├── test_kalman.py # Kalman output length, finite values, β convergence
└── test_backtest.py # Equity curve, trade count, drawdown, win rate invariants
%==========%
III. Data Sources:
Daily adjusted closing prices are fetched via yfinance for sector universes where cointegration is economically motivated: energy majors (XOM, CVX, OXY, COP, EOG, MPC, PSX, VLO, XLE, XOP), regulated utilities (NEE, DUK, SO, D, AEP, EXC, SRE, PEG, XLU, ED), and financials (JPM, BAC, WFC, C, GS, MS, XLF, KBE, KRE, FITB). Sector concentration matters because cointegration arises from shared factor exposures: energy companies are jointly driven by crude oil and refining margins; regulated utilities face the same rate-setting regime; large-cap financial firms share credit cycle exposure. Random pairs from different sectors may exhibit spurious correlation over short windows but no long-run mean-reverting equilibrium. The data layer stores closing prices in a DuckDB prices table (ticker × date × close), screened pair results in a pairs table, and trade records in a trades table.
# fetchers.py
def fetch_prices(tickers: list[str], start: str, end: str | None = None) -> pd.DataFrame:
"""Download adjusted closes from yfinance; return wide DataFrame (date × ticker)."""
raw = yf.download(tickers, start=start, end=end, auto_adjust=True, progress=False)["Close"]
if isinstance(raw, pd.Series):
raw = raw.to_frame(tickers[0])
raw = raw[sorted(raw.columns)]
raw.index = pd.to_datetime(raw.index).normalize()
raw.index.name = "date"
return raw.dropna(how="all")
%==========%
IV. Cointegration vs Correlation:
Correlation measures the co-movement of returns: if \(\text{Corr}(\Delta \log A_t, \Delta \log B_t)\) is high, the two assets tend to move in the same direction on a daily basis. Correlation is symmetric under a change of frequency — weekly and monthly returns may be similarly correlated — but it is entirely silent about the joint level of prices. Two assets can be perfectly correlated on every return horizon and still diverge permanently in price.
Cointegration is a statement about price levels. The pair \((\log A_t, \log B_t)\) is cointegrated if each individual series is I(1) — a random walk with no tendency to revert — but there exists a vector \([1, -\beta]\) such that \(\log A_t - \beta \log B_t\) is I(0), i.e., stationary. The spread has a well-defined long-run mean, finite variance, and an error-correction mechanism: when the spread drifts above its mean, market forces (index rebalancing, arbitrage, fundamental linkages) push it back. This is not assumed; it is tested.
Why OLS correlation-based pairs fail: a pair selected for high historical return correlation will not necessarily produce a stationary spread. The spread may be I(1) even if returns are correlated, meaning it follows a random walk with no tendency to return to any fixed level. A strategy that enters on spread widening and exits at reversion will produce losses without bound if the spread is a random walk. Cointegration rules this out by construction — under the null of cointegration, the spread is stationary and the expected holding period is finite.
| Property | Correlation | Cointegration |
|---|---|---|
| Operates on | Returns \(\Delta \log P\) | Log-price levels \(\log P\) |
| Stationarity guarantee | None (prices may diverge) | Spread I(0) by definition |
| Error correction | No (no level anchor) | Yes (VECM error-correction term) |
| Theoretical basis | Co-movement of shocks | Shared long-run equilibrium |
| Mean-reversion horizon | Undefined | Measurable half-life via OU fit |
| Test | Pearson / Spearman | Engle-Granger, Johansen |
%==========%
V. Engle-Granger Two-Step Test (cointegration/engle_granger.py):
The Engle-Granger procedure tests for cointegration in two steps. Step 1: estimate the static hedge ratio by OLS regression of \(\log A_t\) on \(\log B_t\):
\[ \log A_t = \beta \log B_t + \alpha + u_t \]where \(\hat{\beta}\) is the cointegrating coefficient (hedge ratio) and the residual \(\hat{u}_t = \log A_t - \hat{\beta}\log B_t - \hat{\alpha}\) is the estimated spread. Step 2: apply the Augmented Dickey-Fuller (ADF) test to \(\hat{u}_t\) under the null hypothesis \(H_0\): the residuals have a unit root (i.e. the pair is not cointegrated). Rejection of \(H_0\) at the 5% level implies the spread is stationary. Because the critical values for this test depend on whether the residuals come from an OLS regression rather than a known process, standard ADF critical values are conservative; the MacKinnon (1994) critical values are used instead.
The half-life of mean reversion is estimated directly from the spread by OLS regression of \(\Delta \hat{u}_t\) on \(\hat{u}_{t-1}\):
\[ \Delta \hat{u}_t = a + b\,\hat{u}_{t-1} + \varepsilon_t \] \[ \hat{\kappa} = -\hat{b}, \qquad \text{half-life} = \frac{\ln 2}{\hat{\kappa}} \]
# engle_granger.py
def engle_granger_test(y: pd.Series, x: pd.Series, trend: str = "c") -> EGResult:
logy = np.log(y)
logx = np.log(x)
X = pd.concat([logx, pd.Series(np.ones(len(logx)), index=logx.index, name="const")], axis=1)
res = OLS(logy, X).fit()
hedge_ratio = float(res.params.iloc[0])
intercept = float(res.params.iloc[1])
residuals = pd.Series(res.resid, index=y.index, name="spread")
adf = adfuller(residuals.dropna(), regression=trend, autolag="AIC")
half_life = _half_life(residuals)
return EGResult(hedge_ratio=hedge_ratio, intercept=intercept, residuals=residuals,
adf_stat=float(adf[0]), pvalue=float(adf[1]), half_life=half_life)
%==========%
VI. Johansen Trace Test (cointegration/johansen.py):
The Johansen procedure is a multivariate maximum-likelihood test that simultaneously tests for the number of cointegrating relationships (\(r\)) in a system. For a bivariate pair it tests \(H_0: r = 0\) (no cointegration) against \(H_1: r \geq 1\). The trace statistic is:
\[ \Lambda_\text{trace}(r) = -T \sum_{i=r+1}^{n} \ln(1 - \hat{\lambda}_i) \]where \(\hat{\lambda}_i\) are the ordered eigenvalues of the VECM coefficient matrix and \(T\) is the sample size. The max-eigenvalue statistic \(\Lambda_\text{max}(r, r+1) = -T\ln(1-\hat{\lambda}_{r+1})\) tests \(H_0: r = r_0\) against \(H_1: r = r_0 + 1\). Johansen is preferred over Engle-Granger when the direction of the cointegrating regression is ambiguous, as it does not require choosing a dependent variable; for bivariate systems both tests should agree.
| Test | \(H_0\) | \(H_1\) | Statistic |
|---|---|---|---|
| Trace | \(r = 0\) | \(r \geq 1\) | \(-T\sum\ln(1-\hat\lambda_i)\) |
| Max-eigenvalue | \(r = r_0\) | \(r = r_0 + 1\) | \(-T\ln(1-\hat\lambda_{r_0+1})\) |
%==========%
VII. Ornstein-Uhlenbeck Process (ou_process/ou_fit.py):
The continuous-time Ornstein-Uhlenbeck process models the mean-reverting dynamics of a stationary spread:
\[ dS_t = \kappa(\mu - S_t)\,dt + \sigma\,dW_t \]The parameter \(\kappa > 0\) is the mean-reversion speed; \(\mu\) is the long-run mean to which the spread reverts; \(\sigma\) is the instantaneous volatility of the spread. The expected half-life — the time for the spread to close half the gap to its mean from any starting point — is:
\[ t_{1/2} = \frac{\ln 2}{\kappa} \]The equilibrium distribution of \(S_t\) is \(\mathcal{N}(\mu,\, \sigma^2 / 2\kappa)\), so the equilibrium standard deviation of the spread (and thus the natural z-score denominator) is \(\sigma_\infty = \sigma / \sqrt{2\kappa}\). Parameters are estimated in closed form by OLS on the Euler discretisation:
\[ \Delta S_t = a + b\,S_{t-1} + \varepsilon_t \quad\Longrightarrow\quad \hat\kappa = -\hat{b},\quad \hat\mu = -\hat{a}/\hat{b},\quad \hat\sigma = \text{std}(\hat\varepsilon) \]
# ou_fit.py
def fit_ou(spread: pd.Series) -> OUProcess:
s = spread.dropna()
ds = s.diff().dropna()
s_lag = s.shift(1).loc[ds.index]
X = pd.concat([s_lag, pd.Series(np.ones(len(s_lag)), index=s_lag.index, name="const")], axis=1)
res = OLS(ds, X).fit()
b = float(res.params.iloc[0])
a = float(res.params.iloc[1])
kappa = max(-b, 1e-8)
mu = -a / b if abs(b) > 1e-10 else float(s.mean())
sigma = float(res.resid.std())
half_life = float(np.log(2) / kappa)
return OUProcess(kappa=kappa, mu=mu, sigma=sigma, half_life=half_life, r_squared=float(res.rsquared))
%==========%
VIII. Dynamic Hedge Ratio via Kalman Filter (kalman/hedge_ratio.py):
The Kalman filter replaces the static OLS hedge ratio with a time-varying estimate that updates each day as new price observations arrive. The state vector is \(\theta_t = [\beta_t,\, \alpha_t]^\top\) (hedge ratio and intercept), which evolves as a random walk:
\[ \theta_t = \theta_{t-1} + w_t, \quad w_t \sim \mathcal{N}(0,\, W) \]The observation at each date is:
\[ \log A_t = [\log B_t,\; 1]\cdot\theta_t + v_t, \quad v_t \sim \mathcal{N}(0,\, V) \]The state noise covariance is \(W = \frac{\delta}{1-\delta}\,I\), where \(\delta\) controls how quickly \(\beta\) is allowed to drift. The Kalman gain \(K_t\) determines how much weight to assign to the new observation versus the prior state estimate:
\[ K_t = P_{t|t-1}\,H_t^\top\,\bigl(H_t P_{t|t-1} H_t^\top + V\bigr)^{-1} \] \[ \hat\theta_t = \hat\theta_{t-1} + K_t\,(y_t - H_t\hat\theta_{t-1}) \] \[ P_{t|t} = (I - K_t H_t)\,P_{t|t-1} \]The Kalman spread is then \(S_t = \log A_t - \hat\beta_t\,\log B_t - \hat\alpha_t\), which is constructed to remain stationary even as the cointegrating relationship drifts. The key advantage over static OLS: a drift in \(\beta_t\) does not accumulate as systematic bias in the spread, so z-score signals remain calibrated throughout the trading period.
# hedge_ratio.py
class KalmanHedgeRatio:
def __init__(self, delta: float = 1e-4, vt: float = 1e-3):
self.delta = delta
self.vt = vt
self._W = delta / (1 - delta) * np.eye(2)
self.theta = np.zeros(2) # [beta, alpha]
self.P = np.eye(2)
def update(self, log_x: float, log_y: float) -> tuple[float, float]:
H = np.array([[log_x, 1.0]])
P_pred = self.P + self._W
innov = log_y - float(H @ self.theta)
S = float(H @ P_pred @ H.T) + self.vt
K = (P_pred @ H.T) / S
self.theta = self.theta + K.flatten() * innov
self.P = (np.eye(2) - K @ H) @ P_pred
return float(self.theta[0]), float(self.theta[1])
| Parameter | Role | Typical range |
|---|---|---|
| \(\delta\) | State-noise magnitude; controls how fast \(\beta\) can drift | \(10^{-5}\) (slow) to \(10^{-2}\) (fast) |
| \(V\) (\(\mathtt{vt}\)) | Observation noise variance | \(10^{-3}\) for log-price series |
| Initial \(P\) | Prior covariance; \(I\) implies uninformative prior on \(\theta\) | \(I_2\) |
%==========%
IX. Signal Generation (signals/generator.py):
Once the spread \(S_t\) has been computed (either from a static OLS hedge ratio or a dynamic Kalman ratio), a rolling z-score normalises it against a lookback window of length \(\ell\):
\[ z_t = \frac{S_t - \mu_t^{(\ell)}}{\sigma_t^{(\ell)}} \]where \(\mu_t^{(\ell)}\) and \(\sigma_t^{(\ell)}\) are the rolling mean and standard deviation over the preceding \(\ell\) trading days. The signal rules are:
| Condition | Signal | Trade |
|---|---|---|
| \(z_t < -z_\text{entry}\) | Long spread | Buy A, sell \(\beta\) units of B |
| \(z_t > +z_\text{entry}\) | Short spread | Sell A, buy \(\beta\) units of B |
| \(|z_t| < z_\text{exit}\) | Exit | Close both legs at mean reversion |
| \(|z_t| > z_\text{stop}\) | Stop-loss | Exit if spread continues to widen |
Typical parameter values from the literature are \(z_\text{entry} \approx 2.0\), \(z_\text{exit} \approx 0.5\), and \(z_\text{stop} \approx 3.5\). The lookback window \(\ell\) should be chosen relative to the spread half-life: too short and the z-score is noisy; too long and it is slow to adapt to structural shifts. A window of 60–90 days is common for equity pairs with half-lives of 10–30 days.
%==========%
X. Backtesting Methodology (backtest/engine.py):
The backtest runs in vectorised form. Daily P&L is computed as the change in spread value times the current position, less transaction costs:
\[ \text{PnL}_t = \text{pos}_{t-1} \cdot \Delta S_t - c \cdot \mathbf{1}_{\text{entry}} \cdot |S_t| \]where \(c\) is the one-way cost in basis points and \(\mathbf{1}_{\text{entry}}\) is an indicator for days a new trade is opened (the round-trip cost \(2c\) is charged on entry). P&L is normalised by the mean absolute spread level to convert it into NAV units. Transaction costs are a significant drag at high turnover: at 5 bps per leg, a strategy with 20 round-trips per year incurs approximately 200 bps in annual costs.
Position sizing is spread-volatility-normalised: target a fixed spread-dollar position rather than a fixed share count. When the spread is unusually wide (spread volatility elevated), the dollar amount in the trade is the same as when the spread is near its mean, giving consistent risk exposure across market conditions. The equity curve, Sharpe ratio, maximum drawdown, trade count, and win rate are all reported as BacktestResult attributes.
# engine.py (excerpt)
pos_series = pd.Series(position, index=z.index)
spread_ret = s.diff().fillna(0.0)
gross_pnl_daily = pos_series.shift(1).fillna(0) * spread_ret * cfg.position_size
tc_bps = cfg.transaction_cost_bps / 10_000
trades_entered = (pos_series.diff().fillna(0) != 0) & (pos_series != 0)
cost_series = trades_entered.astype(float) * abs(s) * tc_bps * 2
net_pnl_daily = gross_pnl_daily - cost_series
equity = (1 + net_pnl_daily.cumsum() / max(abs(s.mean()), 1e-8)).clip(lower=0)
%==========%
XI. Cointegration Stability Analysis (cointegration/screening.py):
A pair that was cointegrated during the formation period may cease to be cointegrated during trading. This is the most common failure mode for pairs strategies: regulatory changes (utility deregulation), corporate events (M&A, spin-offs), commodity price regime breaks, or index reconstitution can all break the economic linkage that produced the cointegrating relationship. Monitoring cointegration stability is therefore a risk management tool as much as a screening criterion.
Two rolling diagnostics are computed using a sliding window of 252 trading days. The rolling Engle-Granger p-value tracks whether the pair remains cointegrated throughout the trading period; a sustained rise above 5% signals a regime break. The rolling half-life tracks whether the speed of mean reversion is stable; large increases in half-life (slowing reversion) indicate that the equilibrium mechanism is weakening even before the pair formally fails the cointegration test.
# screening.py
def rolling_eg_pvalue(y: pd.Series, x: pd.Series, window: int = 252) -> pd.Series:
pvalues: dict = {}
idx = y.dropna().index.intersection(x.dropna().index)
for i in range(window, len(idx) + 1):
sub_idx = idx[i - window : i]
try:
eg = engle_granger_test(y.loc[sub_idx], x.loc[sub_idx])
pvalues[idx[i - 1]] = eg.pvalue
except Exception:
pvalues[idx[i - 1]] = np.nan
return pd.Series(pvalues, name="eg_pvalue")
%==========%
XII. CLI — cli.py:
Five subcommands cover the full pipeline from data ingestion through backtesting. All commands read from and write to the same DuckDB file.
# Install
pip install -e ".[dev]"
# Download daily prices for the energy sector universe (~seconds via yfinance)
pairs fetch --tickers "XOM,CVX,OXY,COP,EOG,MPC,PSX,VLO,XLE,XOP" --start 2015-01-01
# Screen all pairs — Engle-Granger p < 0.05, half-life 5–252 days
pairs screen --n-top 10
# Fit OU parameters + Kalman filter for a specific pair
pairs analyse XOM CVX
# Backtest with Kalman hedge, z=±2 entry, z=±0.5 exit, 5 bps transaction cost
pairs backtest XOM CVX --entry 2.0 --exit- 0.5 --stop 3.5 --cost-bps 5.0
# Launch Streamlit server-side dashboard
pairs dashboard
| Command | Key options | Output |
|---|---|---|
pairs fetch | --tickers, --start, --db | Downloads prices and upserts to DuckDB prices table |
pairs screen | --eg-threshold, --n-top, --db | Rich table: pair, EG p-value, Johansen p-value (approx), half-life, OLS β |
pairs analyse | TICKER_A, TICKER_B, --kalman-delta | OU parameters (κ, μ, σ, half-life) and Kalman vs OLS β comparison |
pairs backtest | --entry, --exit-, --stop, --cost-bps, --use-kalman | Sharpe, max drawdown, trade count, win rate |
pairs dashboard | — | Launches streamlit run src/pairs_trading/app.py |
%==========%
XIII. Test Suite:
All tests are fully offline. The shared conftest.py generates a seed-42 synthetic pair from a known OU process: price B is a geometric random walk, the spread is simulated from OU(\(\kappa=0.05\), \(\mu=0\), \(\sigma=0.015\)), and price A is constructed as \(A_t = \exp(1.2\log B_t + S_t + 0.5)\) so the true hedge ratio is exactly 1.2. This gives full control over the cointegration relationship and allows precise invariant testing. Cointegration tests verify that the EG p-value is below 0.10 for this known pair, the hedge ratio is positive, and the half-life is finite and positive. OU fit tests verify \(\kappa > 0\), half-life = \(\ln 2 / \kappa\) to machine precision, and that long-horizon simulation of a tight-κ process is stationary (|mean| < 0.1). Kalman tests verify output length matches input, all values are finite, and the mean Kalman β (after a warm-up of 100 steps) is within 20% of the OLS β. Backtest tests verify the equity curve starts at 1.0, remains non-negative, has at least one trade, max drawdown is non-positive, win rate is in [0, 1], and that a higher entry threshold produces no more trades than a lower entry threshold.
# test_backtest.py — selected invariants
def test_equity_curve_starts_at_one(self, spread_and_zscore):
spread, zscore = spread_and_zscore
result = run_backtest(spread, zscore)
assert abs(result.equity_curve.iloc[0] - 1.0) < 1e-6
def test_higher_entry_threshold_fewer_trades(self, spread_and_zscore):
spread, zscore = spread_and_zscore
r1 = run_backtest(spread, zscore, BacktestConfig(entry_threshold=1.5))
r2 = run_backtest(spread, zscore, BacktestConfig(entry_threshold=2.5))
assert r1.num_trades >= r2.num_trades
def test_zero_cost_sharpe_at_least_as_good(self, spread_and_zscore):
spread, zscore = spread_and_zscore
r_cost = run_backtest(spread, zscore, BacktestConfig(transaction_cost_bps=10.0))
r_free = run_backtest(spread, zscore, BacktestConfig(transaction_cost_bps=0.0))
assert r_free.total_return >= r_cost.total_return