createCryptoOrderBookEngine
Syntax
createCryptoOrderBookEngine(name, dummyTable, inputColMap, [outputTable],
depth, [updateRule='direct'], [errorHandler=NULL], [cachingInterval=5000],
[timeout=-1], [outputHandler=NULL], [msgAsTable=false],
[cachedDepth])
Details
This engine maintains a real-time cryptocurrency order book, updated based on full-depth snapshots and incremental depth information.
Return value: A table.
Arguments
name is a string of the engine name. It is the only identifier of the crypto order book engine. It can contain letters, numbers and underscores, and must start with a letter.
dummyTable is a table object representing the input table. The input columns should be mapped to specific columns through parameter inputColMap.
The following columns are required for input tables:
Name | Type | Description |
---|---|---|
symbol | SYMBOL | Crypto symbol |
isIncremental | BOOL | Whether the input data is incremental. |
eventTime | TIMESTAMP | Event time |
askQty | DECIMAL[] / DOUBLE[] | Ask quantity |
askPrice | DECIMAL[] / DOUBLE[] | Ask price |
bidQty | DECIMAL[] / DOUBLE[] | Bid quantity |
bidPrice | DECIMAL[] / DOUBLE[] | Bid price |
Additionally,
- If updateRule=“direct“, no extra columns are required.
- If updateRule=“general”, the following columns are required:
Name Type Description prevUpdateId LONG Previous record ID updateId LONG Current record ID - If updateRule=“Binance-spot“, the following columns are required:
Name Type Description lastUpdateId LONG Last record ID in event firstUpdateId LONG First record ID in event - If updateRule=“Binance-futures“, the following columns are
required:
Name Type Description lastUpdateId LONG Last record ID in event firstUpdateId LONG First record ID in event prevLastUpdateId LONG Previous record ID based on current record in event, i.e., lastUpdateId in last stream
Except the above-mentioned columns, all other columns from the input table will be copied to the output table.
inputColMap is a dictionary mapping column names in dummyTable to the required columns.
outputTable (optional) is a table object for storing the updated order book. This table should have the same schema as dummyTable.
depth is an integer or dictionary specifying the depth of the order book.
- Integer: Applies the same depth for all cryptocurrencies.
- Dictionary: Keys are cryptocurrency codes, and values are the depth for each. If a cryptocurrency is not specified, it will not output an order book result.
updateRule (optional) is a string specifying the order book update rule.
- "direct" (default): Updates directly based on isIncremental field (true for update, false for overwrite).
- "general": General update rule, requiring streaming data with monotonically increasing update IDs.
- "Binance-spot": Update rule for Binance spot data.
- "Binance-futures": Update rule for Binance futures data.
errorHandler (optional) is a UDF to handle errors when incremental data is missing. It takes two arguments:
- The first argument is a string representing the cryptocurrency code.
- The second argument is an integer representing the error code, with possible
values:
- 1: Received old data.
- 2: Received out-of-order data expected at a future time.
- 3: Timeout, indicating no new order book update within the specified time.
- 4: Crossed prices error, where the highest bid is greater than or equal to the lowest ask.
cachingInterval(optional) is an integer indicating the interval (in milliseconds) within which incremental data is cached. The default is 5000. For each crypto, data is retained in the cache if the time difference between the first data in cache and the latest is no greater than cachingInterval.
timeout (optional) is an integer specifying the timeout period in milliseconds. The default is -1 (no timeout). If order book is not updated within this period, the errorHandler will be invoked.
outputHandler (optional) is a unary function. If set, the engine will use this function to process the calculation results instead of writing to the output table.
msgAsTable (optional) is a Boolean value specifying whether to form the engine's results as a table. The default value is false, indicating the output data is a tuple of columns. This parameter only takes effect when outputHandler is set.
cachedDepth (optional) is an integer or dictionary specifying the depth for cached order books.
- Integer: Applies the same depth for cached order books for all cryptocurrencies.
- Dictionary: Keys are cryptocurrency codes, and values are the depth for each. If a cryptocurrency is not specified, full-depth order books are cached.
Examples
Example 1. Creating a Crypto Order Book Engine for Binance Futures
This example uses the "Binance-futures" rule to process data retrieved from Binance's futures market. The sample data begins with incremental depth data, followed by a full-depth 1000-level snapshot.
// Define input/output table schema
colNames = `isIncremental`exchange`eventTime`transactionTime`symbol`firstUpdateId`lastUpdateId`prevLastUpdateId`bidPrice`bidQty`askPrice`askQty
colTypes = [BOOL, SYMBOL, TIMESTAMP, TIMESTAMP, SYMBOL, LONG, LONG, LONG, DECIMAL128(18)[], DECIMAL128(8)[], DECIMAL128(18)[], DECIMAL128(8)[]]
// Create input/output tables
share table(1:0, colNames, colTypes) as outputTable
share table(1:0, colNames, colTypes) as inputTable
inputTarget = ["symbol", "eventTime", "isIncremental", "bidPrice", "bidQty", "askPrice", "askQty", "lastUpdateId", "firstUpdateId", "prevLastUpdateId"]
inputSource = ["symbol", "eventTime", 'isIncremental', 'bidPrice', 'bidQty', 'askPrice', 'askQty', 'lastUpdateId', 'firstUpdateId', 'prevLastUpdateId']
// Map input columns
inputColMap = dict(inputTarget, inputSource)
// Set depth
depth = dict(["BTCUSDT"], [1000])
cachedDepth = dict(["BTCUSDT"], [1500])
// Define error handler
def errorHandler(instrument, code) {
if (code == 1) {
writeLog("handle hisotorical msg...")
} else if (code == 2) {
writeLog("handle unordered msg...")
} else if (code == 3) {
writeLog("handle timeout...")
} else if (code == 4) {
writeLog("handle corssed price...")
} else {
writeLog("unknown error!")
}
}
// Create engine
engine = createCryptoOrderBookEngine(name="binanceFutures", dummyTable=inputTable, inputColMap=inputColMap, outputTable=outputTable,
depth=depth, updateRule="Binance-futures", errorHandler=errorHandler, cachingInterval=5000,
timeout=6000, msgAsTable=true, cachedDepth=cachedDepth)
// Load and append sample data
fin=file("binanceFuturesTestData.bin")
binanceFuturesTestData = fin.readObject()
fin.close();
getStreamEngine("binanceFutures").append!(binanceFuturesTestData)
// Clean environments
undef("inputTable", SHARED)
undef("outputTable", SHARED)
dropStreamEngine("binanceFutures")
Example 2. Creating a Crypto Order Book Engine for OKX Perpetual Swaps using the “general” rule. The sample data begins with 400-level snapshot data, followed by incremental snapshot data.
// Define input/output table schema
colNames=['isIncremental', 'symbol', 'askPrice', 'askVolume', 'askNum', 'bidPrice', 'bidVolume', 'bidNum', 'checksum', 'prevSeqId', 'seqId', 'updateTime'];
colTypes=[BOOL, SYMBOL, DECIMAL128(18)[], DECIMAL128(8)[], INT[], DECIMAL128(18)[], DECIMAL128(8)[], INT[], LONG, LONG, LONG, TIMESTAMP];
// Create input/output tables
share table(1:0, colNames, colTypes) as inputTable
share table(1:0, colNames, colTypes) as outputTable
inputTarget = ["symbol", "eventTime", "isIncremental", "bidPrice", "bidQty", "askPrice", "askQty", "prevUpdateId", "updateId"];
inputSource = ["symbol", "updateTime", 'isIncremental', 'bidPrice', 'bidVolume', 'askPrice', 'askVolume', 'prevSeqId', 'seqId'];
// Map input columns
inputColMap = dict(inputTarget, inputSource)
// Set depth
depth = dict(["BTC-USD-SWAP"], [400])
cachedDepth = dict(["BTC-USD-SWAP"], [800])
// Define error handler
def errorHandler(instrument, code) {
if (code == 1) {
writeLog("handle hisotorical msg...")
} else if (code == 2) {
writeLog("handle unordered msg...")
} else if (code == 3) {
writeLog("handle timeout...")
} else if (code == 4) {
writeLog("handle corssed price...")
} else {
writeLog("unknown error!")
}
}
// Create engine
engine = createCryptoOrderBookEngine(name="okxSwap", dummyTable=inputTable, inputColMap=inputColMap, outputTable=outputTable,
depth=depth, updateRule="general", errorHandler=errorHandler, cachingInterval=5000,
timeout=6000, msgAsTable=true, cachedDepth=cachedDepth)
// Load and append sample data
fin=file("okxTestData.bin")
okxTestData = fin.readObject()
fin.close();
getStreamEngine("okxSwap").append!(okxTestData)
// Clean environments
undef("inputTable", SHARED)
undef("outputTable", SHARED)
dropStreamEngine("okxSwap")
Sample data: