ETL Tuning: From 4.5 Hours to 3.5 Minutes

Extract, transform, and load (ETL) is a data integration process which first extracts the data from various sources, then transforms the data according to business rules, and loads the data into a destination data warehouse.

When utilizing traditional tech stacks like Python, MySQL, and Java as ETL tools, it's common for users to experience performance limitations as the volume of data grows, particularly when processing high-frequency trading data. In this tutorial, we will show you how to implement an ETL process and enhance its performance with SQL tuning techniques using DolphinDB. Our optimized script reduces the processing time from 4.5 hours to 3.5 minutes, with the performance improved by over 70 times.

1. Data Preparation

In this tutorial, we prepare 1 year's raw trade data (up to 1.7 TB before compression) for ETL processing. Table "trade" contains tick data of around 3000 stocks with 60 million records per day.

FieldData TypeDescription
securityIDSTRINGstock ID
tradingdateDATEtrading date
tradingtimeTIMESTAMPtrading time
tradetypeSYMBOLtrade type
recidINTrecord ID
tradepriceDOUBLEtrading price
tradevolumeINTtrading volume
buyorderidINTID of the buy order
sellorderidINTID of the sell order
unixTIMESTAMPUnix timestamp

We will write the processed data to the target table. Both the source and target tables use the OLAP storage engines and adopt composite partitions that combine date-based value partition and stock-based hash partition (with 20 hash partitions).

To meet the requirements for quantitative analysis listed below, the ETL process is supposed to convert the field types, add symbol suffixes, add calculated fields, filter failed trades, etc.

  • Convert the data type of column "tradingDate" from DATE to INT;
  • Convert the data type of column "tradingTime" from TIMESTAMP to LONG type;
  • Add a column of BSFlag (Buy/Sell flag);
  • Add a column of tradevalue (total value traded).

1.1. Environment

  • CPU: Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz
  • Logical processors: 16
  • Memory: 256 GB
  • OS: 64-bit CentOS Linux 7 (Core)
  • DolphinDB Server version: 2.00.6
  • Deployment: high-availability cluster (with 3 controllers and 3 data nodes)

2. ETL Process

2.1. ETL Design

A typical ETL design operates on each shard of source dataset and consolidates the outcomes into the target database. Specifically in DolphinDB, the ETL process works as follows:

(1) Partition the original dataset by trading date and stock ID.

data = [cut1, cut2, ... , cutN]

(2) Clean and transform the data within each partition, and save the processed data to an in-memory object "tradingdf".

(3) Append the in-memory table "tradingdf" to a DFS table.

def genDataV1(date1, dateN){
    tradeSrc = loadTable("dfs://originData", "trade")
    tradeTgt = loadTable("dfs://formatData", "trade")
    for (aDate in date1..dateN){
        tradeSecurityID = (exec distinct(securityID) from tradeSrc where tradingdate = aDate).shuffle()
        for (m in tradeSecurityID){		
		    tradingdf = select  * from tradeSrc where securityID = m and tradingdate = aDate    
		    tradingdf["symbol"] = m + "SZ"        
		    //print("stock " + m + ",date is " + aDate + ",tradingdf size " + tradingdf.size())  
		    tradingdf["buysellflag"] =iif(tradingdf["sellorderid"] > tradingdf["buyorderid"],"S", "B")
		    tradingdf["tradeamount"] = tradingdf["tradevolume"] * tradingdf["tradeprice"]
		    tradingdf = tradingdf[(tradingdf["tradetype"] == "0") || (tradingdf["tradetype"] == "F")]
		    tradingdf = select symbol,tradingdate, tradingtime, recid, tradeprice, tradevolume, tradeamount, buyorderid, sellorderid, buysellflag, unix from tradingdf
		    tradingdf = select * from tradingdf order by symbol, tradingtime, recid     
		    tradingdf.replaceColumn!("tradingdate", toIntDate(::date(tradingdf.tradingDate)))            
		    tradingtime = string(exec tradingtime from tradingdf)
		    tradingdf.replaceColumn!(`tradingtime, tradingtime)
		    unix = long(exec unix from tradingdf)
		    tradingdf.replaceColumn!(`unix, unix)                                             
		    tradeTgt.append!(tradingdf)	      		
        }
	}
}

When using ETL tools lik Python, MySQL, Java, or middleware like Kettle, the processing is often limited due to its single-threaded nature. When the similar logic is applied to the ETL process in DolphinDB, it takes up to 4.5 hours to process trade data of 20 trading days. In the following section we will analyze the performance bottleneck and demonstrate how to optimize the script.

2.2. Bottleneck Analysis

The performance bottlenecks of the ETL process mainly lie in:

(1) Nested loops

The code is executed in nested loops based on the stock ID and trading date. The time complexity t can be calculated as follows:

t = O(N) * O(M) * t0 = O(MN) * t0
  • N: trading days
  • M: number of stocks
  • t0: execution time (in seconds) of the innermost loop

The execution time of the innermost loop is about 0.4 seconds, and the execution time for the full script is estimated to be t ~= 20 * 0.4 * 3000 = 6.7 hours.

(2) Repeated data access

As shown above, the transformation script repeatedly operates on the same dataset, causing higher round-trip time. However, some operations, such as data filtering and sorting, can be done within one query.

(3) Computing is conducted on only one data node

Starting from the assignment statement for "tradingdf":

tradingdf=select * from loadTable("dfs://test", 'szl2_stock_trade_daily') where symbol = m and tradingDate = date

The subsequent script falls short in leveraging DolphinDB's distributed and concurrent computing capabilities, as it only utilizes the resources of one data node. As a result, the performance deteriorates as the volume of data continues to increase.

3. Performance Tuning

The formula of execution time is referenced to optimize the performance of ETL process in DolphinDB:

t = S / V
  • t: execution time.
  • S: space complexity, i.e., the amount of data to be processed.
  • V: speed of data processing, i.e., how many records can be processed per second.

Therefore, the key to reducing execution time t for an ETL process lies in reducing space complexity and increasing processing speed.

3.1. Space Complexity

In DolphinDB, space complexity can be reduced by partition pruning, columnar storage, indexing, and so on.

  • Partition pruning

    For the time series data that is divided into partitions (based on the temporal column), if a filtering condition in the where clause specifies the partitioning column, then only the needed partitions are accessed.

  • Columnar storage

    For example, if a table contains hundreds of columns, but only a few of them are needed for an aggregate query. DolphinDB's OLAP storage engine adopts columnar storage, enabling users to access only the required columns, which greatly reduces the disk I/O.

  • Indexing

    If a DFS table uses TSDB engine and the query statement is filtered by the specified sort columns, the corresponding block ID can be quickly queried by scanning the sparse index. Only the required blocks are accessed for the query, thus avoiding full a table scan.

3.2. Processing Speed

To improve the efficiency of batch processing, you can set a proper batch size and apply multithreaded and distributed computing.

  • Proper batch size

    DolphinDB manages historical data for batching processing on a partition basis. It is recommended to set the partition size to 100 MB – 500 MB (before compression).

  • Multithreading

    DolphinDB makes extensive use of multithreading. Specifically, for a distributed SQL query, multiple threads are applied to concurrent processing of partitioned data.

  • Distributed computing

    DolphinDB leverages distributed computing in multi-machine clusters and coordinates distributed transactions. When a distributed SQL query is executed, it uses a Map-Reduce-Merge model to perform parallel computation. In the Map phase, the query is automatically split into tasks that are assigned to the nodes in the cluster to maximize the utilization of the cluster's hardware resources.

4. Optimized ETL Process

Based on the performance bottleneck analysis, we optimize the code in the following aspects:

  • Improve parallelism
  • Reduce query times
  • Adopt vectorized processing

4.1. Optimized Script

The optimized script processes daily market data of 3000 stocks (distributed across 20 hash partitions) efficiently through a distributed SQL query with 20 tasks. Each task is assigned to the corresponding node within the cluster, allowing for parallel execution and optimized processing.

The optimized code is as follows:

def transformData(tradeDate){
    tradeSrc = loadTable("dfs://originData", "trade")
    tradeTgt = loadTable("dfs://formatData", "trade")
    data = select
        securityID + "SZ" as securityID
        ,toIntDate(tradingdate) as  tradingdate
        ,tradingtime$STRING as tradingtime
        ,recid as recid
        ,tradeprice
        ,tradevolume
        ,tradevolume * tradeprice as tradeamount        
        ,buyorderid as buyrecid
        ,sellorderid as sellrecid
        ,iif(sellorderid>  buyorderid,"S", "B") as buysellflag      
        ,unix$LONG as unix
    from tradeSrc
    where tradingdate = tradeDate and tradetype in ["0", "F"]
    tradeTgt.append!(data)
    pnodeRun(flushOLAPCache)
}
​
allDays = 2022.05.01..2022.05.20
for(aDate in allDays){
    jobId = "transform_"+ strReplace(aDate$STRING, ".", "") 
    jobDesc = "transform data"
    submitJob(jobId, jobDesc, transformData, aDate)
}

The script takes about 40 seconds to process one day's market data of 3000 stocks. Data of 20 trading days can be processed in concurrent jobs submitted by the function submitJob. By configuring maxBatchJobWorker =16 (generally it is set to the number of CPU cores), the script takes only 210 seconds, which improves the performance by 74 times.

4.2. Performance Analysis

The performance improvement is mainly due to:

  • Distributed computing with high parallelism

    The select statement uses parallel execution, with the parallelism determined by the number of partitions of the source dataset and the number of configured threads. Based on the partitioning scheme of the source table "trade", the parallelism is up to 20, which implies that compared to single-threaded data processing the execution speed is improved by 20 times theoretically. In practical tests, this enhancement is observed to be 18 times when handling data for a single trading day. Furthermore, multiple tasks can be executed in parallel through submitJob.

  • Efficient queries

    All the processing logic, including data filtering, data type conversion, and adding derived fields, can be done within one query. The data does not need to be accessed repeatedly.

  • Vectorization

    The OLAP engine adopts columnar storage, and each column is loaded into memory in the form of a vector. Therefore, vectorization is applied to improve the efficiency of SQL queries.

5. Conclusion

In this tutorial we explore the optimization of ETL process in DolphinDB using a practical example of SQL query tuning. By avoiding nested loops and adopting distributed and vectorized computing techniques, the optimized SQL query demonstrates a remarkable 74-fold increase in efficiency. Specifically, the ETL processing of market data for 3000 stocks over 20 trading days, which previously took over 4.5 hours, can now be completed in just 210 seconds. By incorporating vectorization and leveraging the distributed, parallel processing capabilities of columnar databases, DolphinDB can serve as a highly efficient ETL tool for preprocessing large datasets.