Historical Data Replay

In DolphinDB, we can import historical data into a stream table in chronological order as "real-time data" so that the same script can be used both for backtesting and real-time trading. Regarding streaming in DolphinDB please refer to DolphinDB Streaming Tutorial.

This article introduces functions replay and replayDS and then demonstrates the process of data replaying.

1. Functions

1.1. replay

replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [absoluteRate=true], [parallelLevel=1])

Function replay injects data from specified tables or data sources into stream tables.

  • 'inputTables' is a table or a tuple. Each element of the tuple is an unpartitioned table or a data source generated by function replayDS.

  • 'outputTables' is a table or a tuple of tables, or a string or a string vector. The number of elements of outputTables must be the same as the number of elements of inputTables. If it is a vector, it is a list of the names of the shared stream tables where the replayed data of the corresponding tables of inputTables are saved. If it is a tuple, each element is a shared stream table where the replayed data of the corresponding table in inputTables are saved. The schema of each table in outputTables must be identical as the schema of the corresponding table in inputTables.

  • 'dateColumn' and 'timeColumn' are strings indicating the date column and time column in inputTables. If neither is specified, the first column of the table is chosen as 'dateColumn'. If there is a 'dateColumn', it must be one of the partitioning columns. If only 'timeColumn' is specified, it must be one of the partitioning columns. If information about date and time comes from the same column (e.g., DATETIME, TIMESTAMP), use the same column for both 'dateColumn' and 'timeColumn'. Data are replayed in batches determined by the smallest unit of time in 'timeColumn' or 'dateColumn' if 'timeColumn' is not specified. For examples, if the smallest unit of time in 'timeColumn' is second then all data in the same second are replayed in the same batch; if 'timeColumn' is not specified, then all data in the same day are replayed in the same batch.

  • 'replayRate' is a nonnegative integer indicating the number of rows to be replayed per second. If it is not specified, it means data are replayed at the maximum speed.

  • 'replayRate' is an integer.

  • 'absoluteRate' is a Boolean value. The default value is true.

Regarding 'replayRate' and 'absoluteRate':

(1) If 'replayRate' is a positive integer and absoluteRate=true, replay at the speed of 'replayRate' rows per second.

(2) If 'replayRate' is a positive integer and absoluteRate=false, replay at 'replayRate' times the original speed of the data. For example, if the difference between the maximum and the minimum values of 'dateColumn' or 'timeColumn' is n seconds, then it takes n/replayRate seconds to finish the replay.

(3) If 'replayRate' is unspecified or negative, replay at the maximum speed.

  • 'parallelLevel' is a positive integer. When the size of individual partitions in the data sources is too large relative to memory size, we need to use function replayDS to further divide individual partitions into smaller data sources. 'parallelLevel' indicates the number of threads loading data into memory from these smaller data sources simultaneously. The default value is 1. If 'inputTables' is a table or a tuple of tables, the effective 'parallelLevel' is always 1.

1.2. replayDS

replayDS(sqlObj, [dateColumn], [timeColumn], [timeRepartitionSchema])

Function replayDS generates a group of data sources to be used as the inputs of function replay. It splits a SQL query into multiple subqueries based on 'timeRepartitionSchema' with 'timeColumn' within each 'dateColumn' partition.

  • 'sqlObj' is a table or metacode with SQL statements (such as <select * from sourceTable>) indicating the data to be replayed. The table object of "select from" must use a DATE type column as one of the partitioning columns.

  • 'dateColumn' and 'timeColumn' are strings indicating the date column and time column. If neither is specified, the first column of the table is chosen as 'dateColumn'. If there is a 'dateColumn', it must be one of the partitioning columns. If only 'timeColumn' is specified, it must be one of the partitioning columns. If information about date and time comes from the same column (e.g., DATETIME, TIMESTAMP), use the same column for both 'dateColumn' and 'timeColumn'. Function replayDS and the corresponding function replay must use the same set of 'dateColumn' and 'timeColumn'.

  • 'timeRepartitionSchema' is a TIME or NANOTIME type vector. 'timeRepartitionSchema' deliminates multiple data sources on the dimension of 'timeColumn' within each 'dateColumn' partition. For example, if timeRepartitionSchema=[t1, t2, t3], then there are 4 data sources within a day: [00:00:00.000,t1), [t1,t2), [t2,t3) and [t3,23:59:59.999).

1.3. Replay a single in-memory table

replay(inputTable, outputTable, `date, `time, 10)

1.4. Replay a single table using data sources

To replay a single table with a large number of rows, we can use function replayDS together with function replay. Function replayDS deliminates multiple data sources on the dimension of 'timeColumn' within each 'dateColumn' partition. Parameter 'parallelLevel' of function replay` specifies the number of threads loading data into memory from these smaller data sources simultaneously. In this example, 'parallelLevel' is set to 2.

inputDS = replayDS(<select * from inputTable>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
replay(inputDS, outputTable, `date, `time, 1000, true, 2)

1.5. Replay multiple tables simultaneously using data sources

To replay multiple tables simultaneously, assign a tuple of these table names to parameter 'inputTables' of function replay and specify the output tables. Each of the output tables corresponds to an input table and should have the same schema as the corresponding input table. All input tables should have identical 'dateColumn' and 'timeColumn'.

ds1 = replayDS(<select * from input1>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
ds2 = replayDS(<select * from input2>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
ds3 = replayDS(<select * from input3>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
replay([ds1, ds2, ds3], [out1, out2, out3], `date, `time, 1000, true, 2)

1.6. Cancel replay

If function replay was called with submitJob, we can use getRecentJobs to get jobId, then cancel the replay with command cancelJob.

getRecentJobs()
cancelJob(jobid)

If function replay was called directly, we can use getConsoleJobs in another GUI session to get jobId, then cancel the replay use command cancelConsoleJob.

getConsoleJobs()
cancelConsoleJob(jobId)

2. How to use replayed data

Replayed data are streaming data. We can subscribe to and process the replayed data in the following 3 ways:

  • Subscribe in DolphinDB. Write user-defined functions in DolphinDB to process streaming data.
  • Subscribe in DolphinDB. To conduct real-time calculations with streaming data, use DolphinDB's built-in streaming aggregators such as time-series aggregator, cross-sectional aggregator and anomaly detection engine. They are very easy to use and have excellent performance. In section 3.2, we use a cross-sectional aggregator to calculate the intrinsic value of an ETF.
  • With third-party client through DolphinDB's streaming API.

3. Examples

3.1. Replay level 1 stock quotes to calculate ETF intrinsic value

In this example, we replay the level 1 stock quotes in US stock markets on 2007/08/17, and calculate the intrinsic value of an ETF with the built-in cross-sectional aggregator in DolphinDB. The following are the schema of the input table 'quotes' and a preview of the data.

quotes = database("dfs://TAQ").loadTable("quotes");
quotes.schema().colDefs;
nametypeStringtypeInt
timeSECOND10
symbolSYMBOL17
ofrsizINT4
ofrDOUBLE16
modeINT4
mmidSYMBOL17
exCHAR2
dateDATE6
bidsizeINT4
bidDOUBLE16
select top 10 * from quotes where date=2007.08.17
symboldatetimebidofrbidsizofrsizmodeexmmid
A2007.08.1704:15:060.0101001280
A2007.08.1706:21:1610101280
A2007.08.1706:21:440.0101001280
A2007.08.1706:49:0232.030101280
A2007.08.1706:49:0232.0332.78111280
A2007.08.1707:02:0118.50101284
A2007.08.1707:02:0118.545.25111284
A2007.08.1707:54:5531.945.25311284
A2007.08.1708:00:0031.940321284
A2007.08.1708:00:0031.935.5321284

(1) To replay a large amount of data, if we load all data into memory first, we may have an out-of-memory problem. We can first use function replayDS and specify parameter 'timeRepartitionSchema' to divide the data into 60 parts based on the column 'time'.

trs = cutPoints(09:30:00.000..16:00:00.000, 60)
rds = replayDS(<select * from quotes where date=2007.08.17>, `date, `time,  trs);

(2) Define the output stream table 'outQuotes'.

sch = select name,typeString as type from quotes.schema().colDefs
share streamTable(100:0, sch.name, sch.type) as outQuotes

(3) Define a dictionary for the ETF components weights and function etfVal to calculate ETF intrinsic value. For simplicity we use an ETF with only 6 component stocks.

defg etfVal(weights,sym, price) {
    return wsum(price, weights[sym])
}
weights = dict(STRING, DOUBLE)
weights[`AAPL] = 0.1
weights[`IBM] = 0.1
weights[`MSFT] = 0.1
weights[`NTES] = 0.1
weights[`AMZN] = 0.1
weights[`GOOG] = 0.5

(4) Define a streaming aggregator to subscribe to the output stream table 'outQuotes'. We specify a filtering condition for the subscription that only data with stock symbols of AAPL, IBM, MSFT, NTES, AMZN or GOOG are published to the aggregator. This significantly reduces unnecessary network overhead and data transfer.

setStreamTableFilterColumn(outQuotes, `symbol)
outputTable = table(1:0, `time`etf, [TIMESTAMP,DOUBLE])
tradesCrossAggregator=createCrossSectionalAggregator("etfvalue", <[etfVal{weights}(symbol, ofr)]>, quotes, outputTable, `symbol, `perBatch)
subscribeTable(tableName="outQuotes", actionName="tradesCrossAggregator", offset=-1, handler=append!{tradesCrossAggregator}, msgAsTable=true, filter=`AAPL`IBM`MSFT`NTES`AMZN`GOOG)  

(5) Start to replay data at the specified speed of 100,000 rows per second. The streaming aggregator conducts real-time calculation with the replayed data.

submitJob("replay_quotes", "replay_quotes_stream",  replay,  [rds],  [`outQuotes], `date, `time, 100000, true, 4)

(6) Check ETF intrinsic values

select top 15 * from outputTable
timeetf
2019.06.04T16:40:18.47614.749
2019.06.04T16:40:19.47614.749
2019.06.04T16:40:20.47714.749
2019.06.04T16:40:21.47722.059
2019.06.04T16:40:22.47722.059
2019.06.04T16:40:23.47734.049
2019.06.04T16:40:24.47734.049
2019.06.04T16:40:25.477284.214
2019.06.04T16:40:26.477284.214
2019.06.04T16:40:27.477285.68
2019.06.04T16:40:28.477285.68
2019.06.04T16:40:29.478285.51
2019.06.04T16:40:30.478285.51
2019.06.04T16:40:31.478285.51
2019.06.04T16:40:32.478285.51

4. Performance testing

We tested data replaying in DolphinDB on a server with the following configuration:

  • Server: DELL PowerEdge R730xd
  • CPU: Intel Xeon(R) CPU E5-2650 v4(24cores, 48 threads, 2.20GHz)
  • RAM: 512 GB (32GB × 16, 2666 MHz)
  • Harddisk: 17T HDD (1.7T × 10, read speed 222 MB/s, write speed 210 MB/s)
  • Network: 10 Gigabit Ethernet

DolphinDB script:

sch = select name,typeString as type from  quotes.schema().colDefs
trs = cutPoints(09:30:00.000..16:00:00.001,60)
rds = replayDS(<select * from quotes where date=2007.08.17>, `date, `time,  trs);
share streamTable(100:0, sch.name, sch.type) as outQuotes1
jobid = submitJob("replay_quotes","replay_quotes_stream",  replay,  [rds],  [`outQuotes1], `date, `time, , ,4)

When replaying at maximum speed (parameter 'replayRate' is not specified) and the output table is not subscribed, it only takes about 100 seconds to replay 336,305,414 rows of data.