createCryptoOrderBookEngine

Syntax

createCryptoOrderBookEngine(name, dummyTable, inputColMap, [outputTable], depth, [updateRule='direct'], [errorHandler=NULL], [cachingInterval=5000], [timeout=-1], [outputHandler=NULL], [msgAsTable=false], [cachedDepth])

Note: This function is not supported by Community Edition. You can contact DolphinDB technical support and request a trial license.

Details

This engine maintains a real-time cryptocurrency order book, updated based on full-depth snapshots and incremental depth information.

Return value: A table.

Arguments

name is a string of the engine name. It is the only identifier of the crypto order book engine. It can contain letters, numbers and underscores, and must start with a letter.

dummyTable is a table object representing the input table. The input columns should be mapped to specific columns through parameter inputColMap.

The following columns are required for input tables:

Name Type Description
symbol SYMBOL Crypto symbol
isIncremental BOOL Whether the input data is incremental.
eventTime TIMESTAMP Event time
askQty DECIMAL[] / DOUBLE[] Ask quantity
askPrice DECIMAL[] / DOUBLE[] Ask price
bidQty DECIMAL[] / DOUBLE[] Bid quantity
bidPrice DECIMAL[] / DOUBLE[] Bid price

Additionally,

  • If updateRule=“direct“, no extra columns are required.
  • If updateRule=“general”, the following columns are required:
    Name Type Description
    prevUpdateId LONG Previous record ID
    updateId LONG Current record ID
  • If updateRule=“Binance-spot“, the following columns are required:
    Name Type Description
    lastUpdateId LONG Last record ID in event
    firstUpdateId LONG First record ID in event
  • If updateRule=“Binance-futures“, the following columns are required:
    Name Type Description
    lastUpdateId LONG Last record ID in event
    firstUpdateId LONG First record ID in event
    prevLastUpdateId LONG Previous record ID based on current record in event, i.e., lastUpdateId in last stream

Except the above-mentioned columns, all other columns from the input table will be copied to the output table.

inputColMap is a dictionary mapping column names in dummyTable to the required columns.

outputTable (optional) is a table object for storing the updated order book. This table should have the same schema as dummyTable.

depth is an integer or dictionary specifying the depth of the order book.

  • Integer: Applies the same depth for all cryptocurrencies.
  • Dictionary: Keys are cryptocurrency codes, and values are the depth for each. If a cryptocurrency is not specified, it will not output an order book result.

updateRule (optional) is a string specifying the order book update rule.

  • "direct" (default): Updates directly based on isIncremental field (true for update, false for overwrite).
  • "general": General update rule, requiring streaming data with monotonically increasing update IDs.
  • "Binance-spot": Update rule for Binance spot data.
  • "Binance-futures": Update rule for Binance futures data.

errorHandler (optional) is a UDF to handle errors when incremental data is missing. It takes two arguments:

  • The first argument is a string representing the cryptocurrency code.
  • The second argument is an integer representing the error code, with possible values:
    • 1: Received old data.
    • 2: Received out-of-order data expected at a future time.
    • 3: Timeout, indicating no new order book update within the specified time.
    • 4: Crossed prices error, where the highest bid is greater than or equal to the lowest ask.

cachingInterval(optional) is an integer indicating the interval (in milliseconds) within which incremental data is cached. The default is 5000. For each crypto, data is retained in the cache if the time difference between the first data in cache and the latest is no greater than cachingInterval.

timeout (optional) is an integer specifying the timeout period in milliseconds. The default is -1 (no timeout). If order book is not updated within this period, the errorHandler will be invoked.

outputHandler (optional) is a unary function. If set, the engine will use this function to process the calculation results instead of writing to the output table.

msgAsTable (optional) is a Boolean value specifying whether to form the engine's results as a table. The default value is false, indicating the output data is a tuple of columns. This parameter only takes effect when outputHandler is set.

cachedDepth (optional) is an integer or dictionary specifying the depth for cached order books.

  • Integer: Applies the same depth for cached order books for all cryptocurrencies.
  • Dictionary: Keys are cryptocurrency codes, and values are the depth for each. If a cryptocurrency is not specified, full-depth order books are cached.

Examples

Example 1. Creating a Crypto Order Book Engine for Binance Futures

This example uses the "Binance-futures" rule to process data retrieved from Binance's futures market. The sample data begins with incremental depth data, followed by a full-depth 1000-level snapshot.

// Define input/output table schema
colNames = `isIncremental`exchange`eventTime`transactionTime`symbol`firstUpdateId`lastUpdateId`prevLastUpdateId`bidPrice`bidQty`askPrice`askQty
colTypes = [BOOL, SYMBOL, TIMESTAMP, TIMESTAMP, SYMBOL, LONG, LONG, LONG, DECIMAL128(18)[], DECIMAL128(8)[], DECIMAL128(18)[], DECIMAL128(8)[]]

// Create input/output tables
share table(1:0, colNames, colTypes) as outputTable
share table(1:0, colNames, colTypes) as inputTable

inputTarget = ["symbol", "eventTime", "isIncremental", "bidPrice", "bidQty", "askPrice", "askQty", "lastUpdateId", "firstUpdateId", "prevLastUpdateId"]
inputSource = ["symbol", "eventTime", 'isIncremental', 'bidPrice', 'bidQty', 'askPrice', 'askQty', 'lastUpdateId', 'firstUpdateId', 'prevLastUpdateId']

// Map input columns
inputColMap = dict(inputTarget, inputSource)

// Set depth
depth = dict(["BTCUSDT"], [1000])
cachedDepth = dict(["BTCUSDT"], [1500])

// Define error handler
def errorHandler(instrument, code) {
    if (code == 1) {
        writeLog("handle hisotorical msg...")
    } else if (code == 2) {
        writeLog("handle unordered msg...")
    } else if (code == 3) {
        writeLog("handle timeout...")
    } else if (code == 4) {
        writeLog("handle corssed price...")
    } else {
        writeLog("unknown error!")
    }
}

// Create engine
engine = createCryptoOrderBookEngine(name="binanceFutures", dummyTable=inputTable, inputColMap=inputColMap, outputTable=outputTable, 
                                        depth=depth, updateRule="Binance-futures", errorHandler=errorHandler, cachingInterval=5000, 
                                        timeout=6000, msgAsTable=true, cachedDepth=cachedDepth)


// Load and append sample data
fin=file("binanceFuturesTestData.bin")
binanceFuturesTestData = fin.readObject()
fin.close();

getStreamEngine("binanceFutures").append!(binanceFuturesTestData)

// Clean environments
undef("inputTable", SHARED)
undef("outputTable", SHARED)
dropStreamEngine("binanceFutures")

Example 2. Creating a Crypto Order Book Engine for OKX Perpetual Swaps using the “general” rule. The sample data begins with 400-level snapshot data, followed by incremental snapshot data.

// Define input/output table schema
colNames=['isIncremental', 'symbol', 'askPrice', 'askVolume', 'askNum', 'bidPrice', 'bidVolume', 'bidNum', 'checksum', 'prevSeqId', 'seqId', 'updateTime'];
colTypes=[BOOL, SYMBOL, DECIMAL128(18)[], DECIMAL128(8)[], INT[], DECIMAL128(18)[], DECIMAL128(8)[], INT[], LONG, LONG, LONG, TIMESTAMP];

// Create input/output tables
share table(1:0, colNames, colTypes) as inputTable
share table(1:0, colNames, colTypes) as outputTable

inputTarget = ["symbol", "eventTime", "isIncremental", "bidPrice", "bidQty", "askPrice", "askQty", "prevUpdateId", "updateId"];
inputSource = ["symbol", "updateTime", 'isIncremental', 'bidPrice', 'bidVolume', 'askPrice', 'askVolume', 'prevSeqId', 'seqId'];

// Map input columns
inputColMap = dict(inputTarget, inputSource)

// Set depth
depth = dict(["BTC-USD-SWAP"], [400])
cachedDepth = dict(["BTC-USD-SWAP"], [800])

// Define error handler
def errorHandler(instrument, code) {
    if (code == 1) {
        writeLog("handle hisotorical msg...")
    } else if (code == 2) {
        writeLog("handle unordered msg...")
    } else if (code == 3) {
        writeLog("handle timeout...")
    } else if (code == 4) {
        writeLog("handle corssed price...")
    } else {
        writeLog("unknown error!")
    }
}

// Create engine
engine = createCryptoOrderBookEngine(name="okxSwap", dummyTable=inputTable, inputColMap=inputColMap, outputTable=outputTable, 
                                        depth=depth, updateRule="general", errorHandler=errorHandler, cachingInterval=5000, 
                                        timeout=6000, msgAsTable=true, cachedDepth=cachedDepth)

// Load and append sample data
fin=file("okxTestData.bin")
okxTestData = fin.readObject()
fin.close();

getStreamEngine("okxSwap").append!(okxTestData)

// Clean environments
undef("inputTable", SHARED)
undef("outputTable", SHARED)
dropStreamEngine("okxSwap")

Sample data: