
今天聊聊技术栈。
很多朋友问:“Rust 做量化,都用哪些库?”
今天把这套 “黄金组合” 讲清楚——
Rust + Polars + Arrow = 量化数据的瑞士军刀
先说为什么把它们放在一起:
1
2
3
4
5
Arrow:数据格式(底层基础)
↑
Polars:DataFrame 操作(中层能力)
↑
Rust:业务逻辑(上层应用)
三者完美互补,缺一不可。
Arrow = 列式内存数据格式。
简单说就是一种标准的内存数据格式:
以前数据在不同系统之间流转是这样的:
1
2
3
4
Python → 序列化 → 网络传输 → 反序列化 → Rust
↓
JSON / Pickle
慢、占用高、格式不统一
现在是这样的:
1
2
3
4
Python (Arrow) → 共享内存 → Rust (Arrow)
↓
Arrow 格式
快、零拷贝、类型统一
这就是为什么 Polars Python 版和 Rust 版性能几乎一样的原因——底层数据格式完全一样,不需要转换。
1
2
3
4
5
6
// 用 Parquet 格式存储(基于 Arrow)
use polars::prelude::*;
let df = CsvReader::from_path("data.csv")?
.finish()?
.write_parquet("data.parquet")?;
Parquet 是列式存储格式,压缩率高、查询快。
1
2
3
4
5
6
# Python 端
import pyarrow as pa
table = pa.RecordBatchReader.from_batches(schema, batches)
# Rust 端(共享内存)
let table = ArrowReader::from_ipc("data.arrow")?;
Python 和 Rust 之间,数据直接共享,不需要复制。
上篇已经详细讲过 Polars,这里简单回顾核心能力:
能力 | 说明 |
|---|---|
读取 | CSV、Parquet、JSON、Arrow、数据库 |
变换 | filter、select、with_columns、join |
聚合 | group_by、agg、pivot、rolling |
窗口 | over、rolling、expanding |
懒加载 | LazyFrame、谓词下推、查询优化 |
重点:Polars 是 Rust 量化开发的核心,所有数据处理都离不开它。
1
2
3
4
5
6
7
8
9
use polars::prelude::*;
let df = CsvReader::from_path("tick.csv")?
.finish()?
.drop_nulls(Some(col("price").is_not_null()))?
.filter(col("volume").gt(0))?
.with_columns([
col("time").str.to_datetime(None),
])?;
1
2
3
4
5
6
7
8
9
10
11
let factors = df.lazy()
.with_columns([
// 移动平均
col("close").rolling_mean(5).alias("ma5"),
col("close").rolling_mean(10).alias("ma10"),
col("close").rolling_mean(20).alias("ma20"),
// 指数移动平均
col("close").ewm_mean(0.03).alias("ema"),
// RSI 等复杂指标
])
.collect()?;
1
2
3
4
5
6
7
8
9
let result = df.lazy()
.group_by(["date"])
.agg([
// 截面市值加权
(col("return") * col("weight")).sum().alias("portfolio_return"),
// 截面因子 IC
col("factor").correlation(col("return")).alias("ic"),
])
.collect()?;
价值:Rust 是性能和安全性的终极保障。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
use std::sync::Arc;
use tokio::sync::broadcast;
async fn order_gateway() {
let (tx, _rx) = broadcast::channel(1000);
// 接收行情
let mut rx = market_data.subscribe();
// 实时处理
while let Ok(tick) = rx.recv().await {
let signal = compute_signal(&tick);
if signal.should_trade() {
let _ = tx.send(signal.to_order());
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
use rayon::prelude::*;
fn backtest(params: &BacktestParams) -> BacktestResult {
// 回测逻辑
// ...
}
fn parallel_backtest(all_params: Vec<BacktestParams>) -> Vec<BacktestResult> {
all_params
.par_iter() // 自动并行,利用所有 CPU 核心
.map(|p| backtest(p))
.collect()
}
价值:100 组参数优化,从 100 小时变成 10 小时。
1
2
3
4
5
6
7
use ndarray::Array2;
fn compute_factor(prices: &Array2<f64>) -> Array2<f64> {
// SIMD 加速的矩阵运算
// 底层调用 BLAS/LAPACK
// ...
}
来看一个完整的例子:
1
数据源 → Rust 读取 → Polars 处理 → Arrow 存储 → Python 读取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// data_processor.rs
use polars::prelude::*;
pub fn process_tick_data(path: &str) -> Result<DataFrame, Box<dyn std::error::Error>> {
let df = CsvReader::from_path(path)?
.has_header(true)
.finish()?;
// 数据清洗
let cleaned = df
.drop_nulls(Some(col("price").is_not_null()))?
.filter(col("volume").gt(0))?
.with_columns([
col("timestamp")
.str.to_datetime(None)
.alias("time"),
col("price").alias("close"),
])?
.select(["time", "symbol", "price", "volume"])?;
// 计算因子
let with_factors = cleaned
.lazy()
.sort("time", Default::default())
.with_columns([
col("price").pct_change().alias("return"),
col("price").rolling_mean(60).alias("ma60"),
col("price").ewm_mean(0.03).alias("ema"),
])
.collect()?;
// 保存为 Parquet
with_factors.write_parquet("processed.parquet")?;
Ok(with_factors)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# analyzer.py
import polars as pl
import pyarrow as pa
# 读取 Arrow 格式(零拷贝)
df = pl.read_ipc("processed.arrow")
# 分析
result = df.group_by("symbol").agg([
pl.col("return").mean().alias("avg_return"),
pl.col("return").std().alias("volatility"),
pl.col("ma60").last().alias("last_ma60"),
])
# 可视化
import matplotlib.pyplot as plt
plt.plot(result["return"])
plt.show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
┌─────────────────────────────────────────────────────────┐
│ Python 层 │
│ 策略研究 | 可视化 | 机器学习 | 报告生成 │
├─────────────────────────────────────────────────────────┤
│ PyO3 / Polars │
│ Python ↔ Rust 桥接 | Polars Python │
├─────────────────────────────────────────────────────────┤
│ Rust 层 │
│ 数据处理 | 因子计算 | 回测引擎 | 交易系统 │
├─────────────────────────────────────────────────────────┤
│ Polars / DataFusion │
│ DataFrame | SQL 引擎 │
├─────────────────────────────────────────────────────────┤
│ Arrow2 │
│ 列式存储 | 零拷贝 | 内存映射 │
├─────────────────────────────────────────────────────────┤
│ Parquet / IPC │
│ 文件格式 | 进程间通信 │
└─────────────────────────────────────────────────────────┘
价值点 | 说明 |
|---|---|
性能极致 | Rust + Polars + Arrow = 零拷贝 + 并行 + SIMD |
内存友好 | Arrow 列式存储,内存占用降低 50%+ |
跨语言 | Python / Rust / Go / Java 都能用 Arrow |
生态丰富 | Polars / DataFusion / DuckDB 都在用 Arrow |
生产就绪 | 编译成二进制,部署简单,无 Python 依赖 |
Rust + Polars + Arrow 这套组合,解决的核心问题是:
不是每个人都必须用这套组合。
但如果你被这些问题困扰过:
不妨试试。
效果会让你惊喜。
这就是你提升生产力的最好选择。
你们用的什么技术栈?有什么经验?评论区聊聊~
觉得有帮助,点个赞👍、在看👀,转给需要的朋友!