性能超Python30倍676个股票衍生特征计算的流批一体实现方案
随着算力的提升和机器学习与深度学习的普及,在进行数据建模时往往会采用批量生成衍生特征的方法来丰富数据集的特征,如:对原有的 10 维特征都采用 max, min, avg, std, sum 这五种方式进行聚合,数据的特征将变为 50 维。
类似的衍生特征工程极大地提升了模型的准确性。在 Python 中,调用 Pandas 的 agg 函数,传入一个字典(key:列名,value:衍生特征的函数列表)即可实现这样的衍生特征计算。本教程将通过DolphinDB的元编程 ,以极少的代码量实现与 pandas 中类似的衍生特征计算方法。
我们使用 2021 年 16 支股票的 Level 2 快照数据,构建频率为 10 分钟的特征,并利用元编程, 低代码量 实现了 676 列衍生特征的计算。与多个 Python Pandas 进程并行计算相比, DolphinDB 能带来约 30 倍的性能提升。 教程还展示了如何在生产环境中用流式计算进行输入特征的实时计算。 本教程示例代码必须在 2.00.6 及以上版本的 DolphinDB server上运行。
1. Snapshot 数据文件结构
本教程应用的数据源为上交所 level2 快照数据(Snapshot),每幅快照间隔时间为 3 秒或 5 秒,数据文件结构如下:
字段
含义
字段
含义
字段
含义
SecurityID
证券代码
LowPx
最低价
BidPrice[10]
申买十价
DateTime
日期时间
LastPx
最新价
BidOrderQty[10]
申买十量
PreClosePx
昨收价
TotalVolumeTrade
成交总量
OfferPrice[10]
申卖十价
OpenPx
开始价
TotalValueTrade
成交总金额
OfferOrderQty[10]
申卖十量
HighPx
最高价
InstrumentStatus
交易状态
……
……
2021 年上交所所有股票的快照数据已经提前导入至 DolphinDB 数据库中,一共约17.07亿条数据,导入方法见国内股票行情数据导入实例。 2. 教程开发背景
本教程受 Kaggle 的 Optiver Realized Volatility Prediction 竞赛项目启发,该项目排名第一的代码中使用了两档的买卖量价数据及 pandas 的元编程实现了批量衍生特征的计算,本教程在此基础上,使用真实十档买卖量价的快照数据实现了 676 列衍生特征的计算。 2.1 一级指标
一级指标全部由快照数据中的 10 档买卖单量价数据计算而来 Weighted Averaged Price(wap) :加权平均价格
Price Spread(priceSpread) :用于衡量买单价和卖单价的价差
Bid Price Spread(bidSpread) :用于衡量买 1 价和买 2 价的价差
Offer Price Spread(offerSpread) :用于衡量卖1价和卖2价的价差
Total Volume(totalVolume) :10 档买卖单的总量
Volume Imbalance(volumeImbalance) :买卖单总量不平衡
Log Return of Offer Price(logReturnOffer) :卖单价的对数收益
Log Return of Bid Price(logReturnBid) :买单价的对数收益
2.2 二级指标
二级指标全部由一级指标计算生成 WAP Balance(wapBalance) :加权平均价格平衡
Log Return of WAP(logReturnWap) :加权平均价格对数收益
2.3 衍生特征
衍生特征是对一级指标和二级指标做 10 分钟降采样的衍生,降采样的聚合方法通过元编程来批量实现,衍生方法如下:
指标名称
衍生方法
DateTime
count
Wap[10]
sum, mean, std
LogReturn[10]
sum, realizedVolatility, mean, std
LogReturnOffer[10]
sum, realizedVolatility, mean, std
LogReturnBid[10]
sum, realizedVolatility, mean, std
WapBalance
sum, mean, std
PriceSpread
sum, mean, std
BidSpread
sum, mean, std
OfferSpread
sum, mean, std
TotalVolume
sum, mean, std
VolumeImbalance
sum, mean, std
最终预测的计算指标为实际波动率: Realized Volatility(realizedVolatility) :实际波动率
同时考虑到,过去 10 分钟的特征中,特征的时效性是随时间增加的,越靠前的特征时效性越弱,越靠后的特征时效性越强。所以,本教程对 10 分钟的特征切分成了 0-600s(全部),150-600s,300-600s,450-600s 四段,分别进行上述衍生指标的计算。
10 分钟的快照数据最终形成 676 维的聚合特征,如下图所示。
3. DolphinDB 元编程计算代码开发
本教程中的重点和难点是批量生成大量特征列计算表达式,如果按照传统 SQL 来编程,实现 676 个计算指标需要编写大量代码,因此本教程使用元编程方法实现衍生特征的计算,关于元编程的详情请参考 元编程 — DolphinDB 2.0 文档 。本教程通过元编程函数 sql 生成元代码。为了对快照数据做 10min 的聚合计算,sql 函数的分组参数 groupby=[, ] 。在自定义聚合函数中,首先进行一二级指标计算,再进行衍生特征计算。通过 DolphinDB 元编程对 level2 快照数据完成 676 列衍生特征的完整计算代码如下: DolphinDB 批计算代码:十档量价数据用多列存储。 DolphinDB 批计算代码(数组向量版):十档量价数据用数组向量存储。 3.1 一二级指标计算
自定义聚合函数的入参是某支股票的 BidPrice, BidOrderQty, OfferPrice, OfferOrderQty 这四个十档量价数据的矩阵,在 DolphinDB 中对一二级指标的计算代码如下: wap = (BidPrice * OfferOrderQty + BidOrderQty * OfferPrice) (BidOrderQty + OfferOrderQty) wapBalance = abs(wap[0] - wap[1]) priceSpread = (OfferPrice[0] - BidPrice[0]) ((OfferPrice[0] + BidPrice[0]) 2) BidSpread = BidPrice[0] - BidPrice[1] OfferSpread = OfferPrice[0] - OfferPrice[1] totalVolume = OfferOrderQty.rowSum() + BidOrderQty.rowSum() volumeImbalance = abs(OfferOrderQty.rowSum() - BidOrderQty.rowSum()) LogReturnWap = logReturn(wap) LogReturnOffer = logReturn(OfferPrice) LogReturnBid = logReturn(BidPrice)3.2 衍生特征计算
利用 Python 的 pandas 库,通过向 groupby.agg 传入一个字典(字典的 key 为列名,value为聚合函数列表),即可实现对指定的列进行批量的聚合指标计算。
在 DolphinDB 中,亦可通过自定义函数实现类似的需求,即把字典转换成元编程代码,具体代码如下: def createAggMetaCode(aggDict){ metaCode = [] metaCodeColName = [] for(colName in aggDict.keys()){ for(funcName in aggDict[colName]) { metaCode.append!(sqlCol(colName, funcByName(funcName), colName + `_ + funcName$STRING)) metaCodeColName.append!(colName + `_ + funcName$STRING) } } return metaCode, metaCodeColName$STRING } features = { "DateTime":[`count] } for( i in 0..9) { features["Wap"+i] = [`sum, `mean, `std] features["LogReturn"+i] = [`sum, `realizedVolatility, `mean, `std] features["LogReturnOffer"+i] = [`sum, `realizedVolatility, `mean, `std] features["LogReturnBid"+i] = [`sum, `realizedVolatility, `mean, `std] } features["WapBalance"] = [`sum, `mean, `std] features["PriceSpread"] = [`sum, `mean, `std] features["BidSpread"] = [`sum, `mean, `std] features["OfferSpread"] = [`sum, `mean, `std] features["TotalVolume"] = [`sum, `mean, `std] features["VolumeImbalance"] = [`sum, `mean, `std] aggMetaCode, metaCodeColName = createAggMetaCode(features)
返回结果为元代码向量和对应的元代码列名,如下图所示:
在自定义函数中,为了方便后续使用元编程进行衍生特征计算,需要将计算的一二级指标拼接成一个 table,同时修改列名,具体代码如下: subTable = table(DateTime as `DateTime, BidPrice, BidOrderQty, OfferPrice, OfferOrderQty, wap, wapBalance, priceSpread, BidSpread, OfferSpread, totalVolume, volumeImbalance, LogReturnWap, LogReturnOffer, LogReturnBid) colNum = 0..9$STRING colName = `DateTime <- (`BidPrice + colNum) <- (`BidOrderQty + colNum) <- (`OfferPrice + colNum) <- (`OfferOrderQty + colNum) <- (`Wap + colNum) <- `WapBalance`PriceSpread`BidSpread`OfferSpread`TotalVolume`VolumeImbalance <- (`LogReturn + colNum) <- (`LogReturnOffer + colNum) <- (`LogReturnBid + colNum) subTable.rename!(colName)
其中 "<-" 是 DolphinDB 函数 join 的简写符号,此处用于将各字段拼接成列向量。
最后将元代码作为参数传入自定义聚合函数,配合一二级指标拼接而成的 table 进行 676 列衍生指标的计算,并以 676 列的形式作为聚合结果返回,具体代码如下: subTable["BarDateTime"] = bar(subTable["DateTime"], 10m) result = sql(select = aggMetaCode, from = subTable).eval().matrix() result150 = sql(select = aggMetaCode, from = subTable, where =