Best Practices for Integrating Python Airflow With DolphinDB

As a high-performance time-series database, DolphinDB often faces requirements for data cleaning, transformation, and loading (ETL) in production environments. Airflow provides an effective framework to organize and manage these ETL workflows. This tutorial provides a practical solution for ETL operations in production scenarios by integrating Python Airflow with a high-availability DolphinDB cluster. By leveraging the capabilities provided by Airflow, we can achieve better management of DolphinDB data ETL tasks.

The overall architecture is as follows:

1. Introducing Airflow

1.1 Apache Airflow

Apache Airflow is a programmable, schedulable, and monitorable workflow platform based on Directed Acyclic Graphs (DAGs). It defines tasks with dependencies and executes them in order. Airflow includes command-line tools for system management and a web interface for task scheduling and real-time monitoring, simplifying operations and maintenance.

1.2 Core Features

  • Incremental Data Loading: For small datasets, full data loads are feasible. As data grows, incremental loading at fixed intervals (hourly, daily, weekly) becomes essential. Airflow facilitates easy scheduling (e.g., hourly, daily or weekly) of incremental data loads.
  • Historical Data Processing: To backfill data after deploying new workflows, set the start_date parameter in the DAG. Airflow will execute tasks retrospectively, supporting backfills spanning weeks, months, or years.
  • Partitioned Data Extraction: Partitioning data enables parallel DAG execution, reducing lock contention and optimizing performance. Archived or obsolete data can be removed to maintain efficiency.
  • Enforcing Idempotency: The results of DAG runs should always be idempotent. This means that repeated executions with identical parameters produce consistent results.
  • Conditional Execution: Airflow supports conditional task execution based on prior task success or failure.

1.3 DolphinDBOperator

DolphinDBOperator is a specialized Airflow operator for executing DolphinDB scripts, queries, and computations within workflows. Key parameters include:

  • dolphindb_conn_id: DolphinDB connection identifier.
  • sql: DolphinDB script to execute.
  • file_path: Path to a DolphinDB .dos script file.

Example:

  • Execute inline DolphinDB script by setting sql
    // create a shared table in DolphinDB
    create_parameter_table = DolphinDBOperator(
            task_id='create_parameter_table',
            dolphindb_conn_id='dolphindb_test',
            sql='''
                undef(`paramTable,SHARED)
                t = table(1:0, `param`value, [STRING, STRING])
                share t as paramTable
                '''
        )
  • Execute DolphinDB script from file by setting file_path
    // CalAlpha001.dos is a DolphinDB script file
    case1 = DolphinDBOperator(
        task_id='case1',
        dolphindb_conn_id='dolphindb_test',
        file_path=path + "/StreamCalculating/CalAlpha001.dos"
    )

1.4 Installation and Deployment

Hardware Environment

Hardware Information
Host HostName
Extranet IP xxx.xxx.xxx.122
OS Linux (Kernel 3.10 and above)
Memory 64 GB
CPU x86_64 with 12 CPU cores

Software Environment

Software Version
DolphinDB 2.00.9
Airflow 2.5.1
Python 3.7 and above

Note:

Host Environment Configuration

Execute the following command to install Airflow:

pip install airflow-provider-dolphindb

After installing the airflow.provider.dolphindb plugin, start Airflow. For detailed deployment and installation steps, refer to Airflow Quick Start.

# Initialize the database
airflow db init

# Create a user
airflow users create --username admin --firstname Peter --lastname Parker --role Admin --email spiderman@superhero.org --password admin

# Run the webserver in daemon mode
airflow webserver --port 8080 -D

# Run the scheduler in daemon mode
airflow scheduler -D

Verify Airflow startup:

ps -aux | grep airflow

Expected output is as shown below.

After successful startup, access the Airflow web interface in your browser. The default address is http://<IP>:8080. Login with the username and password created during database initialization.

In the Airflow UI, fill in the DolphinDB connection details to connect to the DolphinDB database. Once connected, specify dolphindb_conn_id='dolphindb_test' in the DolphinDBOperator to run DolphinDB scripts.

2. Airflow Scheduling for Market Data ETL

2.1 Overall ETL Architecture

ETL Module Directory

  • add: Incremental data ETL
    • addLoadSnapshot: Daily import of new snapshot raw data.
    • addProcessSnapshot: Process incremental snapshot data into array vectors and clean the data.
    • addFactor: Generate daily and 1-minute OHLC data, and store it.
    • addETL.py: Construct the DAG for incremental data ETL.
  • full: Full data ETL
    • loadSnapshot: Create tables and import snapshot data.
    • processSnapshot: Create tables for cleaned snapshot data, process data into array vectors, and store it.
    • Factor: Create factor storage tables, generate daily and 1-minute OHLC data, and store it.
    • fullETL.py: Construct the DAG for full data ETL.

Data Flow Process

  • External data source > ODS data source: Import raw data from external sources into DolphinDB.
  • ODS data source > DWD Data Details: Clean raw data, convert multi-level data into array vectors, and remove duplicates.
  • DWD Data Details > DWB/DWS Data Aggregation: Generate OHLC data from cleaned snapshot data.

Note: This tutorial uses DolphinDB modules and client tools for engineering management. For more details, see DolphinDB Tutorial: Modules.

2.2 Data Overview

This tutorial uses Level 2 snapshot data from 2020.01.04 to 2021.01.08. Below is the structure of the snapshot table in DolphinDB. Fields such as BidOrderQty and BidPrice contain multi-level data, stored as array vectors in DolphinDB:

Field Description Data Type
SecurityID Stock code SYMBOL
DateTime Date and time TIMESTAMP
PreClosePx Previous close price DOUBLE
OpenPx Open price DOUBLE
HighPx High price DOUBLE
LowPx Low price DOUBLE
LastPx Latest price DOUBLE
TotalVolumeTrade Total trade volume INT
TotalValueTrade Total trade value DOUBLE
InstrumentStatus Trading status SYMBOL
BidPrice Bid prices (10 levels) DOUBLE[]
BidOrderQty Bid order quantities (10 levels) INT[]
BidNumOrders Bid order counts (10 levels) INT[]
BidOrders Bid orders (50 levels) INT[]
OfferPrice Offer prices (10 levels) DOUBLE[]
OfferOrderQty Offer order quantities (10 levels) INT[]
OfferNumOrders Offer order counts (10 levels) INT[]
OfferOrders Offer orders (50 levels) INT[]

2.3 DolphinDB Scripts for Data Cleaning

2.3.1 Creating Distributed Databases and Tables

Create raw snapshot data table

module loadSnapshot::createSnapshotTable

// Create a table for raw snapshot data
def createSnapshot(dbName, tbName){
	login("admin", "123456")
	if(existsDatabase(dbName)){
		dropDatabase(dbName)
	}
	db1 = database(, VALUE, 2020.01.01..2021.01.01)
	db2 = database(, HASH, [SYMBOL, 50])
	// Partitioned by day and stock code
	db = database(dbName,COMPO,[db1,db2],engine='TSDB')
	colName = ["SecurityID","DateTime","PreClosePx","OpenPx","HighPx","LowPx","LastPx","TotalVolumeTrade","TotalValueTrade","InstrumentStatus","BidPrice0","BidPrice1","BidPrice2","BidPrice3","BidPrice4","BidPrice5","BidPrice6","BidPrice7","BidPrice8","BidPrice9","BidOrderQty0","BidOrderQty1","BidOrderQty2","BidOrderQty3","BidOrderQty4","BidOrderQty5","BidOrderQty6","BidOrderQty7","BidOrderQty8","BidOrderQty9","BidNumOrders0","BidNumOrders1","BidNumOrders2","BidNumOrders3","BidNumOrders4","BidNumOrders5","BidNumOrders6","BidNumOrders7","BidNumOrders8","BidNumOrders9","BidOrders0","BidOrders1","BidOrders2","BidOrders3","BidOrders4","BidOrders5","BidOrders6","BidOrders7","BidOrders8","BidOrders9","BidOrders10","BidOrders11","BidOrders12","BidOrders13","BidOrders14","BidOrders15","BidOrders16","BidOrders17","BidOrders18","BidOrders19","BidOrders20","BidOrders21","BidOrders22","BidOrders23","BidOrders24","BidOrders25","BidOrders26","BidOrders27","BidOrders28","BidOrders29","BidOrders30","BidOrders31","BidOrders32","BidOrders33","BidOrders34","BidOrders35","BidOrders36","BidOrders37","BidOrders38","BidOrders39","BidOrders40","BidOrders41","BidOrders42","BidOrders43","BidOrders44","BidOrders45","BidOrders46","BidOrders47","BidOrders48","BidOrders49","OfferPrice0","OfferPrice1","OfferPrice2","OfferPrice3","OfferPrice4","OfferPrice5","OfferPrice6","OfferPrice7","OfferPrice8","OfferPrice9","OfferOrderQty0","OfferOrderQty1","OfferOrderQty2","OfferOrderQty3","OfferOrderQty4","OfferOrderQty5","OfferOrderQty6","OfferOrderQty7","OfferOrderQty8","OfferOrderQty9","OfferNumOrders0","OfferNumOrders1","OfferNumOrders2","OfferNumOrders3","OfferNumOrders4","OfferNumOrders5","OfferNumOrders6","OfferNumOrders7","OfferNumOrders8","OfferNumOrders9","OfferOrders0","OfferOrders1","OfferOrders2","OfferOrders3","OfferOrders4","OfferOrders5","OfferOrders6","OfferOrders7","OfferOrders8","OfferOrders9","OfferOrders10","OfferOrders11","OfferOrders12","OfferOrders13","OfferOrders14","OfferOrders15","OfferOrders16","OfferOrders17","OfferOrders18","OfferOrders19","OfferOrders20","OfferOrders21","OfferOrders22","OfferOrders23","OfferOrders24","OfferOrders25","OfferOrders26","OfferOrders27","OfferOrders28","OfferOrders29","OfferOrders30","OfferOrders31","OfferOrders32","OfferOrders33","OfferOrders34","OfferOrders35","OfferOrders36","OfferOrders37","OfferOrders38","OfferOrders39","OfferOrders40","OfferOrders41","OfferOrders42","OfferOrders43","OfferOrders44","OfferOrders45","OfferOrders46","OfferOrders47","OfferOrders48","OfferOrders49","NumTrades","IOPV","TotalBidQty","TotalOfferQty","WeightedAvgBidPx","WeightedAvgOfferPx","TotalBidNumber","TotalOfferNumber","BidTradeMaxDuration","OfferTradeMaxDuration","NumBidOrders","NumOfferOrders","WithdrawBuyNumber","WithdrawBuyAmount","WithdrawBuyMoney","WithdrawSellNumber","WithdrawSellAmount","WithdrawSellMoney","ETFBuyNumber","ETFBuyAmount","ETFBuyMoney","ETFSellNumber","ETFSellAmount","ETFSellMoney"]
	colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","SYMBOL","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","INT","INT","DOUBLE","INT","INT","INT","INT","INT","INT"]
	schemaTable = table(1:0,colName, colType)
	
	db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`DateTime`SecurityID, compressMethods={DateTime:"delta"}, sortColumns=`SecurityID`DateTime, keepDuplicates=ALL)
}

For snapshot data, composite partitioning scheme is used: first partitioned by day, then split into 50 partitions based on stock code.

Create cleaned snapshot data table with array vectors

module processSnapshot::createSnapshot_array

// Create a table for cleaned snapshot data
def createProcessTable(dbName, tbName){
	if(existsDatabase(dbName)){
		dropDatabase(dbName)
	}
	db1 = database(, VALUE, 2020.01.01..2021.01.01)
	db2 = database(, HASH, [SYMBOL, 50])
	// Partitioned by day and stock code
	db = database(dbName,COMPO,[db1,db2],engine='TSDB')
	colName = ["SecurityID","DateTime","PreClosePx","OpenPx","HighPx","LowPx","LastPx","TotalVolumeTrade","TotalValueTrade","InstrumentStatus","BidPrice","BidOrderQty","BidNumOrders","BidOrders","OfferPrice","OfferOrderQty","OfferNumOrders","OfferOrders","NumTrades","IOPV","TotalBidQty","TotalOfferQty","WeightedAvgBidPx","WeightedAvgOfferPx","TotalBidNumber","TotalOfferNumber","BidTradeMaxDuration","OfferTradeMaxDuration","NumBidOrders","NumOfferOrders","WithdrawBuyNumber","WithdrawBuyAmount","WithdrawBuyMoney","WithdrawSellNumber","WithdrawSellAmount","WithdrawSellMoney","ETFBuyNumber","ETFBuyAmount","ETFBuyMoney","ETFSellNumber","ETFSellAmount","ETFSellMoney"]
	colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","SYMBOL","DOUBLE[]","INT[]","INT[]","INT[]","DOUBLE[]","INT[]","INT[]","INT[]","INT","INT","INT","INT","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","INT","INT","DOUBLE","INT","INT","INT","INT","INT","INT"]
	schemaTable = table(1:0, colName, colType)
	db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`DateTime`SecurityID, compressMethods={DateTime:"delta"}, sortColumns=`SecurityID`DateTime, keepDuplicates=ALL)
}

Create 1-minute OHLC data table

module Factor::createFactorOneMinute

// Create the table for 1-minute OHLC
def createFactorOneMinute(dbName, tbName){
	if(existsDatabase(dbName)){
		dropDatabase(dbName)
	}
	// Partitioned by day
	db = database(dbName, VALUE, 2021.01.01..2021.01.03,engine = `TSDB)
	colName = `TradeDate`TradeTime`SecurityID`Open`High`Low`Close`Volume`Amount`Vwap
	colType =[DATE, MINUTE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE]
	tbSchema = table(1:0, colName, colType)
  	db.createPartitionedTable(table=tbSchema,tableName=tbName,partitionColumns=`TradeDate,sortColumns=`SecurityID`TradeTime,keepDuplicates=ALL)
}

Create daily OHLC data table

module Factor::createFactorDaily

// Create the table for daily OHLC
def createFactorDaily(dbName, tbName){
	if(existsDatabase(dbName)){
		dropDatabase(dbName)
	}
	// Partitioned by year
	db = database(dbName, RANGE, datetimeAdd(2000.01M,(0..50)*12, "M"),engine = `TSDB)
	colName = `TradeDate`SecurityID`Open`High`Low`Close`Volume`Amount`Vwap
	colType =[DATE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE]
	tbSchema = table(1:0, colName, colType)
  	db.createPartitionedTable(table=tbSchema,tableName=tbName,partitionColumns=`TradeDate,sortColumns=`SecurityID`TradeDate,keepDuplicates=ALL)
}

2.3.2 Cleaning Data

Data cleaning scripts primarily involves two steps:

  1. Convert multi-level price and quantity columns into array vectors.
  2. Remove duplicate records by SecurityID and DateTime.

The detailed processing logic and core code are as follows:

module processSnapshot::processSnapshotData

// Combine data into array vectors and deduplicate
def mapProcess(mutable t, dbName, tbName){
	n1 = t.size()
	t = select SecurityID, DateTime, PreClosePx, OpenPx, HighPx, LowPx, LastPx, TotalVolumeTrade, TotalValueTrade, InstrumentStatus, fixedLengthArrayVector(BidPrice0, BidPrice1, BidPrice2, BidPrice3, BidPrice4, BidPrice5, BidPrice6, BidPrice7, BidPrice8, BidPrice9) as BidPrice, fixedLengthArrayVector(BidOrderQty0, BidOrderQty1, BidOrderQty2, BidOrderQty3, BidOrderQty4, BidOrderQty5, BidOrderQty6, BidOrderQty7, BidOrderQty8, BidOrderQty9) as BidOrderQty, fixedLengthArrayVector(BidNumOrders0, BidNumOrders1, BidNumOrders2, BidNumOrders3, BidNumOrders4, BidNumOrders5, BidNumOrders6, BidNumOrders7, BidNumOrders8, BidNumOrders9) as BidNumOrders, fixedLengthArrayVector(BidOrders0, BidOrders1, BidOrders2, BidOrders3, BidOrders4, BidOrders5, BidOrders6, BidOrders7, BidOrders8, BidOrders9, BidOrders10, BidOrders11, BidOrders12, BidOrders13, BidOrders14, BidOrders15, BidOrders16, BidOrders17, BidOrders18, BidOrders19, BidOrders20, BidOrders21, BidOrders22, BidOrders23, BidOrders24, BidOrders25, BidOrders26, BidOrders27, BidOrders28, BidOrders29, BidOrders30, BidOrders31, BidOrders32, BidOrders33, BidOrders34, BidOrders35, BidOrders36, BidOrders37, BidOrders38, BidOrders39, BidOrders40, BidOrders41, BidOrders42, BidOrders43, BidOrders44, BidOrders45, BidOrders46, BidOrders47, BidOrders48, BidOrders49) as BidOrders, fixedLengthArrayVector(OfferPrice0, OfferPrice1, OfferPrice2, OfferPrice3, OfferPrice4, OfferPrice5, OfferPrice6, OfferPrice7, OfferPrice8, OfferPrice9) as OfferPrice, fixedLengthArrayVector(OfferOrderQty0, OfferOrderQty1, OfferOrderQty2, OfferOrderQty3, OfferOrderQty4, OfferOrderQty5, OfferOrderQty6, OfferOrderQty7, OfferOrderQty8, OfferOrderQty9) as OfferQty, fixedLengthArrayVector(OfferNumOrders0, OfferNumOrders1, OfferNumOrders2, OfferNumOrders3, OfferNumOrders4, OfferNumOrders5, OfferNumOrders6, OfferNumOrders7, OfferNumOrders8, OfferNumOrders9) as OfferNumOrders, fixedLengthArrayVector(OfferOrders0, OfferOrders1, OfferOrders2, OfferOrders3, OfferOrders4, OfferOrders5, OfferOrders6, OfferOrders7, OfferOrders8, OfferOrders9, OfferOrders10, OfferOrders11, OfferOrders12, OfferOrders13, OfferOrders14, OfferOrders15, OfferOrders16, OfferOrders17, OfferOrders18, OfferOrders19, OfferOrders20, OfferOrders21, OfferOrders22, OfferOrders23, OfferOrders24, OfferOrders25, OfferOrders26, OfferOrders27, OfferOrders28, OfferOrders29, OfferOrders30, OfferOrders31, OfferOrders32, OfferOrders33, OfferOrders34, OfferOrders35, OfferOrders36, OfferOrders37, OfferOrders38, OfferOrders39, OfferOrders40, OfferOrders41, OfferOrders42, OfferOrders43, OfferOrders44, OfferOrders45, OfferOrders46, OfferOrders47, OfferOrders48, OfferOrders49) as OfferOrders, NumTrades, IOPV, TotalBidQty, TotalOfferQty, WeightedAvgBidPx, WeightedAvgOfferPx, TotalBidNumber, TotalOfferNumber, BidTradeMaxDuration, OfferTradeMaxDuration, NumBidOrders, NumOfferOrders, WithdrawBuyNumber, WithdrawBuyAmount, WithdrawBuyMoney, WithdrawSellNumber, WithdrawSellAmount, WithdrawSellMoney, ETFBuyNumber, ETFBuyAmount, ETFBuyMoney, ETFSellNumber, ETFSellAmount, ETFSellMoney from t where isDuplicated([SecurityID, DateTime], FIRST) = false
	n2 = t.size()
	loadTable(dbName, tbName).append!(t)
	return n1,n2
}

def process(processDate, dbName_orig, tbName_orig, dbName_process, tbName_process){
	dataString = temporalFormat(processDate, "yyyyMMdd")
	// Check if data for the processing date already exists
	todayCount = exec count(*) from loadTable(dbName_process, tbName_process) where date(DateTime)=processDate
	// If data exists, delete it first
	if(todayCount != 0){
		writeLog("Start to delete the process snapshot data, the delete date is: " + dataString)
		dropPartition(database(dbName_process), processDate, tbName_process)
		writeLog("Successfully deleted the process snapshot data, the delete date is: " + dataString)
	}
	// Start processing snapshot data
	writeLog("Start process Snapshot Data, the datetime is "+ dataString)
	ds = sqlDS(sql(select=sqlCol("*"), from=loadTable(dbName_orig,tbName_orig),where=<date(DateTime)=processDate>))
	n1,n2=mr(ds, mapProcess{, dbName_process, tbName_process}, +, , false)
	if(n1 != n2){
		writeLog("ERROR: Duplicated datas exists in " + dataString + ", Successfully drop " + string(n1-n2) + " pieces of data" )
	}
	writeLog("Successfully process the snapshot data, the processDate is: " + dataString)
}

2.3.3 Generating OHLC Data

Generate 1-minute OHLC data

module Factor::calFactorOneMinute

// Generate 1-minute OHLC
def calFactorOneMinute(dbName, tbName, mutable factorTable){
	pt = loadTable(dbName, tbName)
	// Process in groups of 10 days
	dayList = schema(pt).partitionSchema[0]
	if(dayList.size()>10) dayList = dayList.cut(10)
	for(days in dayList){
		// Calculate 1-minute OHLC
		res =   select first(LastPX) as Open, max(LastPx) as High, min(LastPx) as Low, last(LastPx) as Close, sum(TotalVolumeTrade) as Volume, sum(LastPx*totalVolumeTrade) as Amount, wavg(LastPx, TotalVolumeTrade) as Vwap from pt where date(DateTime) in days group by date(DateTime) as TradeDate,minute(DateTime) as TradeTime, SecurityID
		writeLog("Start to append minute factor result , the days is: [" + concat(days, ",")+"]")
		// Write OHLC data to database
		factorTable.append!(res)
		writeLog("Successfully append the minute factor result to databse, the days is: [" + concat(days, ",")+"]")
	}
}

Generate daily OHLC data

module Factor::calFactorDaily1

// Generate daily OHLC
def calFactorDaily(dbName, tbName, mutable factorTable){
	pt = loadTable(dbName, tbName)
	// Process in groups of 10 days
	dayList = schema(pt).partitionSchema[0]
	if(dayList.size()>10) dayList = dayList.cut(10)
	for(days in dayList){
		// Calculate daily OHLC
		res =   select first(LastPX) as Open, max(LastPx) as High, min(LastPx) as Low, last(LastPx) as Close, sum(TotalVolumeTrade) as Volume, sum(LastPx*totalVolumeTrade) as Amount, wavg(LastPx, TotalVolumeTrade) as Vwap from pt where date(DateTime) in days group by date(DateTime) as TradeDate, SecurityID 
		writeLog("Start to append daily factor result , the days is: [" + concat(days, ",")+"]")
		// Write OHLC data to database
		factorTable.append!(res)
		writeLog("Successfully append the daily factor result to databse, the days is: [" + concat(days, ",")+"]")
	}
}

2.4 Incremental Data Cleaning

The logic for incremental data import is similar to full data processing, with the following key differences:

  • Full data is imported in bulk, while incremental data is processed daily on a schedule.
  • Full data is imported asynchronously using submitJob, while incremental data is imported synchronously using append!.
  • Full data generates OHLC data in bulk, while incremental data only generates data for the current day.

See appendix for full scripts.

2.5 DAG Execution

2.5.1 Creating DAG Instances

Full ETL DAG

with DAG(dag_id="ETLTest", start_date=datetime(2023, 3, 10), schedule_interval=None) as dag:
  • dag_id defines the DAG name and must be unique.
  • start_date sets the start date of the task.
  • schedule_interval sets the interval between two task executions.
  • None indicates the DAG does not execute automatically and must be triggered manually.

Incremental ETL DAG

args = {
    "owner" : "Airflow",
    "start_date" : days_ago(1),
    "catchup" : False,
    'retries' : 0
}
with DAG(dag_id="addETLTest", default_args = args, schedule_interval="0 12 * * *") as dag:
  • catchup determines whether backfill should be performed.
  • retries specifies the number of retries on failure.
  • schedule_interval = "0 12 * * *" means the task runs daily at 12:00 UTC. For detailed schedule settings, see Scheduling & Triggers.

2.5.2 Accessing Variables in Airflow

Variable values set in Airflow cannot be directly accessed in DolphinDB scripts. To use them in subsequent tasks, Airflow variables are written into a shared table, enabling access by DolphinDB tasks. The specific code example is as follows:

// Retrieve variable values
variable = ['ETL_dbName_origin', "ETL_tbName_origin", "ETL_dbName_process",
            "ETL_tbName_process", 'ETL_dbName_factor','ETL_tbName_factor','ETL_dbName_factor_daily',
            'ETL_tbName_factor_daily',"ETL_filedir", "ETL_start_date","ETL_end_date"]
value = []
for var in variable:
    value.append(str(Variable.get(var)))
variables = ",".join(variable)
values = ",".join(value)

// Create a shared table via DolphinDBOperator and write variable values into it
  create_parameter_table = DolphinDBOperator(
        task_id='create_parameter_table',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            undef(`paramTable,SHARED)
            t = table(1:0, `param`value, [STRING, STRING])
            share t as paramTable
            '''
    )
    given_param = DolphinDBOperator(
        task_id='given_param',
        dolphindb_conn_id='dolphindb_test',
        sql="params = split('" + variables + "',',');" + \
            "values = split('" + values + "',',');" + \
            "for(i in 0..(params.size()-1)){" + \
            "insert into paramTable values(params[i], values[i]);}"
    )

After the task runs, the parameter table and its values will appear in the DolphinDB GUI under Shared Tables, as shown in the figure below.

Each DAG variable can be modified in Airflow. Click Admin > Variables in the menu as shown below:

Once the DAG is generated, variables used in the DAG can be dynamically modified on the following web interface, as shown below:

The following table lists the variables used in this project:

Name Description Example
ETL_dbName_factor Database name for storing minute-level OHLC dfs://oneMinuteFactor
ETL_tbName_factor Table name for storing minute-level OHLC oneMinuteFactor
ETL_dbName_factor_daily Database name for storing daily OHLC dfs://dailyFactor
ETL_tbName_factor_daily Table name for storing daily OHLC dailyFactor
ETL_dbName_origin Database name for storing raw snapshot data dfs://TSDB_snapshot_orig
ETL_tbName_origin Table name for storing raw snapshot data snapshot_orig
ETL_dbName_process Database name for cleaned snapshot data dfs://TSDB_snapshot_process
ETL_tbName_process Table name for cleaned snapshot data snapshot_process
ETL_filedir File path to raw snapshot data /home/appadmin/ (customized)
ETL_start_date Start date of raw data for full ETL tasks 2021.01.04
ETL_end_date End date of raw data for full ETL tasks 2021.01.04

2.5.3 Executing Tasks with DolphinDBOperator

Full Data Processing

Data ingestion, cleaning, and computation can be set as tasks within the DAG using DolphinDBOperator. The core code for full data processing is as follows:

   loadSnapshot = DolphinDBOperator(
        task_id='loadSnapshot',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            pnodeRun(clearAllCache)
            undef(all)
            go;
            // Use module to load predefined table creation and data loading functions
            use loadSnapshot::createSnapshotTable
            use  loadSnapshot::loadSnapshotData
            // Retrieve parameters from the shared table
            params = dict(paramTable[`param], paramTable[`value])
            dbName = params[`ETL_dbName_origin]
            tbName = params[`ETL_tbName_origin]
            startDate = date(params[`ETL_start_date])
            endDate = date(params[`ETL_end_date])
            fileDir = params[`ETL_filedir]
            // Create target table if it does not exist
            if(not existsDatabase(dbName)){
                loadSnapshot::createSnapshotTable::createSnapshot(dbName, tbName)
            }
            // Start asynchronous data ETL for each date
            start = now()
            for (loadDate in startDate..endDate){
                submitJob("loadSnapshot"+year(loadDate)+monthOfYear(loadDate)+dayOfMonth(loadDate), "loadSnapshot", loadSnapshot::loadSnapshotData::loadSnapshot{, dbName, tbName, fileDir}, loadDate)
            }
            // Check whether all jobs have completed
            do{
                cnt = exec count(*) from getRecentJobs() where jobDesc="loadSnapshot" and endTime is null
            }
            while(cnt != 0)
            // Check for any errors during import
            cnt = exec count(*) from pnodeRun(getRecentJobs) where jobDesc="loadSnapshot" and errorMsg is not null and receivedTime > start
            if (cnt != 0){
                error = exec errorMsg from pnodeRun(getRecentJobs) where jobDesc="loadSnapshot" and errorMsg is not null and receivedTime > start
                throw error[0]
            }
            '''
    )
    processSnapshot = DolphinDBOperator(
        task_id='processSnapshot',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            pnodeRun(clearAllCache)
            undef(all)
            go;
            // Use module to load predefined table creation and data loading functions
            use processSnapshot::createSnapshot_array
            use processSnapshot::processSnapshotData
            // Retrieve parameters from the shared table
            params = dict(paramTable[`param], paramTable[`value])
            dbName_orig = params[`ETL_dbName_origin]
            tbName_orig = params[`ETL_tbName_origin]
            dbName_process = params[`ETL_dbName_process]
            tbName_process = params[`ETL_tbName_process]
            startDate = date(params[`ETL_start_date])
            endDate = date(params[`ETL_end_date])
            // Create target table if it does not exist
            if(not existsDatabase(dbName_process)){
                processSnapshot::createSnapshot_array::createProcessTable(dbName_process, tbName_process)
            }
            // Start asynchronous data ETL for each date
            start = now()
            for (processDate in startDate..endDate){
                submitJob("processSnapshot"+year(processDate)+monthOfYear(processDate)+dayOfMonth(processDate), "processSnapshot", processSnapshot::processSnapshotData::process{, dbName_orig, tbName_orig, dbName_process, tbName_process}, processDate)
            }
            // Check whether all jobs have completed
            do{
                cnt = exec count(*) from getRecentJobs() where jobDesc="processSnapshot" and endTime is null
            }
            while(cnt != 0)
            // Check for any errors during import
            cnt = exec count(*) from pnodeRun(getRecentJobs) where jobDesc="processSnapshot" and errorMsg is not null and receivedTime > start
            if (cnt != 0){
                error = exec errorMsg from pnodeRun(getRecentJobs) where jobDesc="processSnapshot" and errorMsg is not null and receivedTime > start
                throw error[0]
            }
            '''
    )
    calMinuteFactor = DolphinDBOperator(
        task_id='calMinuteFactor',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            pnodeRun(clearAllCache)
            undef(all)
            go;
            // Use module to load predefined table creation and data loading functions
            use Factor::createFactorOneMinute
            use Factor::calFactorOneMinute
            // Retrieve parameters from the shared table
            params = dict(paramTable[`param], paramTable[`value])
            dbName = params[`ETL_dbName_process]
            tbName = params[`ETL_tbName_process]	
            dbName_factor = params[`ETL_dbName_factor]
            tbName_factor = params[`ETL_tbName_factor]
            // Create target table if it does not exist
            if(not existsDatabase(dbName_factor)){
                createFactorOneMinute(dbName_factor, tbName_factor)
            }
            factorTable = loadTable(dbName_factor, tbName_factor)
            // Invoke the calculation function
            calFactorOneMinute(dbName, tbName,factorTable)
            '''
    )
    calDailyFactor = DolphinDBOperator(
        task_id='calDailyFactor',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            pnodeRun(clearAllCache)
            undef(all)
            go;
            // Use module to load predefined table creation and data loading functions
            use Factor::createFactorDaily
            use Factor::calFactorDaily1	
            // Retrieve parameters from the shared table
            params = dict(paramTable[`param], paramTable[`value])
            dbName = params[`ETL_dbName_process]
            tbName = params[`ETL_tbName_process]	
            dbName_factor = params[`ETL_dbName_factor_daily]
            tbName_factor = params[`ETL_tbName_factor_daily]
            // Create target table if it does not exist
            if(not existsDatabase(dbName_factor)){
                createFactorDaily(dbName_factor, tbName_factor)
            }
            // Invoke the calculation function
            factorTable = loadTable(dbName_factor, tbName_factor)
            Factor::calFactorDaily1::calFactorDaily(dbName, tbName,factorTable)
            '''
    )

The DAG is constructed based on the dependencies between tasks, as shown below:

start_task >> create_parameter_table >> given_param >> loadSnapshot >> processSnapshot >> calMinuteFactor >> calDailyFactor

Incremental Data Import

The code for constructing tasks for incremental jobs is as follows:

addLoadSnapshot = DolphinDBOperator(
        task_id='addLoadSnapshot',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            pnodeRun(clearAllCache)
            undef(all)
            go;
            // Use module to load predefined table creation and data loading functions
            use  addLoadSnapshot::loadSnapshotData
            // Retrieve parameters from the shared table
            params = dict(paramTable[`param], paramTable[`value])
            dbName = params[`ETL_dbName_origin]
            tbName = params[`ETL_tbName_origin]
            fileDir = params[`ETL_filedir]
            // Get market calendars
            MarketDays = getMarketCalendar("CFFEX")
            // Import the data for trading days
            if(today() in MarketDays ){
                fileDir = params[`ETL_filedir]
                addLoadSnapshot::loadSnapshotData::loadSnapshot(today(), dbName, tbName, fileDir)
            }
            '''
    )
    addProcessSnapshot = DolphinDBOperator(
        task_id='addProcessSnapshot',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            pnodeRun(clearAllCache)
            undef(all)
            go;
            // Use module to load data cleaning functions
            use addProcessSnapshot::processSnapshotData
            // Retrieve parameters from the shared table
            params = dict(paramTable[`param], paramTable[`value])
            dbName_orig = params[`ETL_dbName_origin]
            tbName_orig = params[`ETL_tbName_origin]
            dbName_process = params[`ETL_dbName_process]
            tbName_process = params[`ETL_tbName_process]
            // Get market calendars
            MarketDays = getMarketCalendar("CFFEX")
            // Process the data for trading days
            if(today() in MarketDays ){
                addProcessSnapshot::processSnapshotData::process(today(), dbName_orig, tbName_orig, dbName_process, tbName_process)
            }
            '''
    )
    addCalMinuteFactor= DolphinDBOperator(
        task_id='addCalMinuteFactor',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            pnodeRun(clearAllCache)
            undef(all)
            go;
            // Use module to load calculation functions
            use addFactor::calFactorOneMinute
            // Retrieve parameters from the shared table
            params = dict(paramTable[`param], paramTable[`value])
            dbName = params[`ETL_dbName_process]
            tbName = params[`ETL_tbName_process]	
            dbName_factor = params[`ETL_dbName_factor]
            tbName_factor = params[`ETL_tbName_factor]
            factorTable = loadTable(dbName_factor, tbName_factor)
            // Get market calendars
            MarketDays = getMarketCalendar("CFFEX")
            // Generate OHLC data for trading days
            if(today() in MarketDays ){
                	addFactor::calFactorOneMinute::calFactorOneMinute(dbName, tbName,today(), factorTable)
            }
            '''
    )
    addCalDailyFactor= DolphinDBOperator(
        task_id='addCalDailyFactor',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            pnodeRun(clearAllCache)
            undef(all)
            go;
            // Use module to load calculation functions
            use addFactor::calFactorDaily1	
            // Retrieve parameters from the shared table
            params = dict(paramTable[`param], paramTable[`value])
            dbName = params[`ETL_dbName_process]
            tbName = params[`ETL_tbName_process]	
            dbName_factor = params[`ETL_dbName_factor_daily]
            tbName_factor = params[`ETL_tbName_factor_daily]
            factorTable = loadTable(dbName_factor, tbName_factor)
            // Get market calendars
            MarketDays = getMarketCalendar("CFFEX")
            // Generate OHLC data for trading days
            if(today() in MarketDays ){
                addFactor::calFactorDaily1::calFactorDaily(dbName, tbName,today(), factorTable)
            }
            '''
    )

The DAG is constructed based on the dependencies between tasks, as shown below:

start_task >> create_parameter_table >> given_param >> addLoadSnapshot >> addProcessSnapshot >> addCalMinuteFactor >> addCalDailyFactor

2.5.4. Generating DAGs

Deploy DolphinDB project

Import the addETL and fullETL projects into the DolphinDB GUI:

Upload the modules within addETL and fullETL projects to the DolphinDB server already connected in Airflow:

Deploy Python project

Place the Python scripts from the Python project into the <Airflow_install_Dir/airflow/dags> directory. Note that newly created DAG tasks will not appear on the interface immediately; by default, they are refreshed after 5 minutes. You can also adjust the refresh interval by modifying the dag_dir_list_interval parameter in the airflow.cfg file.

Import Airflow variables

Go to Admin > Variables in the Airflow web interface, upload the Variables.json file, import the variables into Airflow, and modify their values as appropriate.

Upload raw data files

Upload the data files to the server, and modify the ETL_filedir variable in Airflow based on the actual storage path of the data files. When running incremental ETL tasks, the date in the data file name must be updated to the current date (e.g., 20230330snapshot.csv) to avoid task failure due to missing data.

The final implemented DAGs are as shown below:

Figure 1. Full data processing
Figure 2. Incremental data processing

After running the tasks, a green task instance indicates successful execution; red indicates task failure; orange indicates that an upstream task failed, and this task was not executed.

3. FAQs

3.1 How to capture output printed by the print function in DolphinDB scripts

The output of the print function in DolphinDB scripts is standard output and can be found in airflow-scheduler.out file, as shown in the figure below:

3.2 How to monitor the completion status of asynchronous jobs submitted using submitJob in DolphinDB scripts

You can use the DolphinDB getRecentJobs function to retrieve background job information, and add conditional logic in the DolphinDB DAG. An example code snippet is as follows:

DolphinDBOperator(
        task_id='processSnapshot',
        dolphindb_conn_id='dolphindb_test',
        sql='''
          // Check whether all tasks are completed
            do{
                cnt = exec count(*) from getRecentJobs() where jobDesc="processSnapshot" and endTime is null
            }
            while(cnt != 0)
            // Check whether background tasks succeeded; throw exception if failed
            cnt = exec count(*) from pnodeRun(getRecentJobs) where jobDesc="processSnapshot" and errorMsg is not null and receivedTime > start
            if (cnt != 0){
                error = exec errorMsg from pnodeRun(getRecentJobs) where jobDesc="processSnapshot" and errorMsg is not null and receivedTime > start
                throw error[0]
            }
            '''
    )

3.3 How to handle frequent connection timeouts when running tasks in Airflow

Connection timeouts may result from network instability or high latency. You can set connection parameters during setup. Configure the keepAliveTime and reconnect parameters in the DolphinDB connection settings.

3.4 Why does the task not run on the current day if start_date is set to today with a daily schedule

In Airflow, scheduled tasks begin execution after the specified start_date by one full schedule_interval. For example, if start_date = 2023.03.16 and the task is scheduled daily, the earliest execution will be on 2023.03.17. Therefore, the task will not run on the day it is set to start.

3.5 How to troubleshoot DolphinDBOperator task failures

When a task fails, DolphinDBOperator will print detailed error messages in the logs. You can locate the exception code and modify it by checking the log information. The steps to view the logs are as follows:

Disclaimer

Under the Apache License 2.0, the Airflow-related installation packages and source code involved in this document exist as part of the DolphinDB Airflow plugin.However, the copyright of the Airflow installation packages or source code belongs to the Apache Software Foundation.