createCryptoOrderBookEngine

语法

createCryptoOrderBookEngine(name, dummyTable, inputColMap, [outputTable], depth, [updateRule='direct'], [errorHandler=NULL], [cachingInterval=5000], [timeout=-1], [outputHandler], [msgAsTable=false], [cachedDepth], [snapshotDir], [snapshotIntervalInMsgCount])

注:

社区版 License 暂不支持该引擎,如需使用此功能,请联系技术支持。

详情

根据数字货币全量订单簿快照和增量订单簿更新数据,实时更新数字货币交易对的订单簿,并输出指定深度的订单簿信息。

该引擎通常与交易所行情接口配合使用:

  1. 通过 REST API 获取一次全量订单簿快照。

  2. 通过 WebSocket 订阅订单簿增量更新。

  3. 将全量数据和后续增量数据输入引擎,由引擎在本地维护实时订单簿。

  4. 若检测到增量数据丢失,可重新请求 REST 快照并重新同步订单簿。

当检测到增量数据丢失或乱序时,可以通过 errorHandler 执行自定义处理逻辑(例如重新请求全量快照)。

该引擎支持通用的订单簿更新规则,同时提供针对 Binance 现货(Binance-spot)和期货(Binance-futures)的专用规则。对于其他交易所,可根据数据格式选择 generaldirect 规则,并确保增量数据顺序或更新逻辑正确。

参数

name 字符串标量,表示 CryptoOrderBook 引擎的名称,可包含字母,数字和下划线,但必须以字母开头。

dummyTable 一个表对象,表示输入表,用于接收来自交易所或其他行情系统的订单簿数据。dummyTable 需要通过 inputColMap 映射为如下两类字段:基础字段(所有 updateRule 必需)和更新规则字段(根据 updateRule 不同而变化):

输入表必须字段:

列名 类型 含义
symbol SYMBOL 数字货币代码。
isIncremental BOOL 是否为增量数据。
  • true 表示订单簿增量更新数据,通常来自 WebSocket 推送。

  • false 表示全量订单簿数据,通常来自 REST 接口。

eventTime TIMESTAMP / NANOTIMESTAMP 事件时间。
askQty DECIMAL[] / DOUBLE[] 卖出数量的列表。
askPrice DECIMAL[] / DOUBLE[] 卖出价格的列表。
bidQty DECIMAL[] / DOUBLE[] 买入数量的列表。
bidPrice DECIMAL[] / DOUBLE[] 买入价格的列表。

另外,根据 updateRule 的不同,可能需要提供其它字段:

  • updateRule =“direct“ 时,不需要额外指定字段。

  • updateRule =“general“ 时,还需要指定以下字段:

列名 类型 含义
prevUpdateId LONG 上一条数据的 ID。
updateId LONG 当前收到数据的 ID。
  • updateRule ="Binance-spot" 时,指定以下字段:
列名 类型 含义
lastUpdateId LONG 从上次推送至今新增的最后一个 update Id。
firstUpdateId LONG 从上次推送至今新增的第一个 update Id。
  • updateRule ="Binance-futures" 时,指定以下字段:
列名 类型 含义
lastUpdateId LONG 从上次推送至今新增的最后一个 update Id。
firstUpdateId LONG 从上次推送至今新增的第一个 update Id。
prevLastUpdateId LONG 上次推送的最后一个 update Id (即上条消息的 lastUpdateId)。

除了上述列之外,输入表中的所有其他列都将被复制到输出表中。

inputColMap 字典,将 dummyTable 中列字段的名称映射为引擎计算所需要的列。

outputTable 可选参数,一个表对象,用于输出最新的订单簿。其结构必须与 dummyTable 相同。outputTableoutputHandler 只需指定其一。

depth 正整数或字典,用于指定订单簿的深度。
  • 正整数:所有数字货币的订单簿深度都为该设置值。

  • 字典:键是字符串标量或向量,表示数字货币代码;值是整型,表示订单簿深度。对于指定的数字货币,将按照设定的深度输出订单簿。未被指定的数字货币将不输出订单簿结果。

updateRule 可选参数,字符串标量,用于指定订单簿更新规则。可选值为:
  • "direct":默认值,表示直接更新,系统将不进行数据丢失判断,仅根据 isIncremental 字段判断是对订单簿执行增量更新(isIncremental=true)还是覆盖(isIncremental-false)。
  • "general":通用更新规则。要求按顺序插入增量数据,并确保数据中 updateId 单调递增。
  • "Binance-spot":基于 Binance 现货交易所订单簿更新机制定义的规则。
  • "Binance-futures":基于 Binance 期货交易所订单簿更新机制定义的规则。
注:
该参数并不限制交易所类型。只要输入数据满足相应规则,即可使用该引擎。
errorHandler 可选参数,自定义错误处理函数。当增量数据出现缺失时,可通过该函数进行错误处理。包含 2 个参数:
  • 第一个是字符串标量,表示数字货币代码。
  • 第二个是整型标量,表示错误码,包括以下情况:
    • 1:接收到历史数据。
    • 2:接收到应在未来时间点到达的乱序数据。
    • 3:超时错误,指定时间间隔内没有新的订单簿合成。
    • 4:买卖价格交叉错误。判断交叉的条件为最高买价大于等于最低卖价。

cachingInterval 可选参数,整型标量,表示缓存增量深度数据的时间间隔。默认值为 5000,单位为毫秒。对于每个数字货币,当缓存中的数据时间与当前最新一条数据时间的差值小于等于 cachingInterval 时,该数据才会被保留在缓存中。

timeout 可选参数,整型标量,表示订单簿更新超时时间。单位为毫秒,默认值为 -1,表示不设置超时时间。在 timeout 指定的时间内未合成新的订单簿时,将触发超时错误,此时会通过 errorHandler 进行错误处理。

outputHandler 可选参数,一元函数,用于处理引擎输出结果。设置此参数时,引擎计算结束后,不再将计算结果写到输出表,而是会调用此函数处理计算结果。

msgAsTable 可选参数,布尔标量,表示在设置了参数 outputHandler 时,将引擎的计算结果以表的结构调用函数。默认值为 false,此时将计算结果的每一列作为元素组成元组。

cachedDepth 可选参数,正整数或字典,用于指定缓存中价格和数量(包括 askQty, askPrice, bidQty, bidPrice)的深度。

  • 正整数:缓存中所有数字货币的价格和数量将按照该值的深度进行缓存。

  • 字典:键是字符串标量或向量,表示数字货币代码;值是整型,表示对应的深度。对于指定的数字货币,将按照设定的深度缓存价格和数量。未被指定的数字货币将保留实际收到的所有价格和数量信息。

若要开启快照机制 (snapshot),必须指定 snapshotDirsnapshotIntervalInMsgCount

snapshotDir 可选参数,字符串,表示保存引擎快照的文件目录。

  • 指定的目录必须存在,否则系统会提示异常。

  • 创建流数据引擎时,如果指定了 snapshotDir,会检查该目录下是否存在快照。如果存在,会加载该快照,恢复引擎的状态。

  • 多个引擎可以指定同一个目录存储快照,用引擎的名称来区分快照文件。

  • 一个引擎的快照可能会使用三个文件名:

    • 临时存储快照信息:文件名为 <engineName>.tmp

    • 快照生成并刷到磁盘:文件保存为 <engineName>.snapshot

    • 存在同名快照:旧快照自动重命名为 <engineName>.old

snapshotIntervalInMsgCount 可选参数,为整数类型,表示每隔多少条数据保存一次流数据引擎快照。

返回值

一个表。

例子

运行代码前,先下载 ../data/CryptoOrderBookInput.zip 文件。

例1. 创建一个加密货币订单簿引擎,使用 "Binance-futures" 规则处理来自 Binance 交易所 U 本位合约的深度数据。根据 Binance 交易所数据发送规则,示例数据中首先为增量深度数据,在 68 行插入 1000 档全量快照数据后,引擎开始输出订单簿。

// 定义输入输出表结构
colNames = `isIncremental`exchange`eventTime`transactionTime`symbol`firstUpdateId`lastUpdateId`prevLastUpdateId`bidPrice`bidQty`askPrice`askQty
colTypes = [BOOL, SYMBOL, TIMESTAMP, TIMESTAMP, SYMBOL, LONG, LONG, LONG, DECIMAL128(18)[], DECIMAL128(8)[], DECIMAL128(18)[], DECIMAL128(8)[]]

// 创建输入输入输出表
share table(1:0, colNames, colTypes) as outputTable
share table(1:0, colNames, colTypes) as inputTable

inputTarget = ["symbol", "eventTime", "isIncremental", "bidPrice", "bidQty", "askPrice", "askQty", "lastUpdateId", "firstUpdateId", "prevLastUpdateId"]
inputSource = ["symbol", "eventTime", 'isIncremental', 'bidPrice', 'bidQty', 'askPrice', 'askQty', 'lastUpdateId', 'firstUpdateId', 'prevLastUpdateId']

inputColMap = dict(inputTarget, inputSource)


depth = dict(["BTCUSDT"], [1000])
cachedDepth = dict(["BTCUSDT"], [1500])

def errorHandler(instrument, code) {
    if (code == 1) {
        writeLog("handle hisotorical msg...")
    } else if (code == 2) {
        writeLog("handle unordered msg...")
    } else if (code == 3) {
        writeLog("handle timeout...")
    } else if (code == 4) {
        writeLog("handle corssed price...")
    } else {
        writeLog("unknown error!")
    }
}

// 创建引擎
engine = createCryptoOrderBookEngine(name="binanceFutures", dummyTable=inputTable, inputColMap=inputColMap, outputTable=outputTable, 
                                        depth=depth, updateRule="Binance-futures", errorHandler=errorHandler, cachingInterval=5000, 
                                        timeout=6000, msgAsTable=true, cachedDepth=cachedDepth)


// 加载示例数据
fin=file("binanceFuturesTestData.bin")
binanceFuturesTestData = fin.readObject()
fin.close();
// 数据插入引擎进行合成
getStreamEngine("binanceFutures").append!(binanceFuturesTestData)

// 清除环境
undef("inputTable", SHARED)
undef("outputTable", SHARED)
dropStreamEngine("binanceFutures")

例2. 创建一个加密货币订单簿引擎,使用 "general" 规则处理来自 OKX 交易所永续合约的深度数据。根据 OKX 交易所数据发送规则,示例数据中首行为 400 档快照数据,之后为增量快照数据。


// 定义输入输出表结构
colNames=['isIncremental', 'symbol', 'askPrice', 'askVolume', 'askNum', 'bidPrice', 'bidVolume', 'bidNum', 'checksum', 'prevSeqId', 'seqId', 'updateTime'];
colTypes=[BOOL, SYMBOL, DECIMAL128(18)[], DECIMAL128(8)[], INT[], DECIMAL128(18)[], DECIMAL128(8)[], INT[], LONG, LONG, LONG, TIMESTAMP];

// 创建输入输入输出表
share table(1:0, colNames, colTypes) as inputTable
share table(1:0, colNames, colTypes) as outputTable

inputTarget = ["symbol", "eventTime", "isIncremental", "bidPrice", "bidQty", "askPrice", "askQty", "prevUpdateId", "updateId"];
inputSource = ["symbol", "updateTime", 'isIncremental', 'bidPrice', 'bidVolume', 'askPrice', 'askVolume', 'prevSeqId', 'seqId'];

inputColMap = dict(inputTarget, inputSource)

depth = dict(["BTC-USD-SWAP"], [400])
cachedDepth = dict(["BTC-USD-SWAP"], [800])

def errorHandler(instrument, code) {
    if (code == 1) {
        writeLog("handle hisotorical msg...")
    } else if (code == 2) {
        writeLog("handle unordered msg...")
    } else if (code == 3) {
        writeLog("handle timeout...")
    } else if (code == 4) {
        writeLog("handle corssed price...")
    } else {
        writeLog("unknown error!")
    }
}

// 创建引擎
engine = createCryptoOrderBookEngine(name="okxSwap", dummyTable=inputTable, inputColMap=inputColMap, outputTable=outputTable, 
                                        depth=depth, updateRule="general", errorHandler=errorHandler, cachingInterval=5000, 
                                        timeout=6000, msgAsTable=true, cachedDepth=cachedDepth)


// 加载示例数据
fin=file("okxTestData.bin")
okxTestData = fin.readObject()
fin.close();
// 数据插入引擎进行合成
getStreamEngine("okxSwap").append!(okxTestData)

// 清除环境
undef("inputTable", SHARED)
undef("outputTable", SHARED)
dropStreamEngine("okxSwap")