Stateless Operator

Stateless operators are used in stateless computation. The output is purely a function of the input data without relying on historical context of previously processed data. For the same input, the output will always be identical.

DolphinDB provides rich built-in functions, enabling users to perform complex data analysis and factor analysis in quantitative finance with minimal code. The built-infunctions can seamlessly function as stateless operators in stateless computation.

Applications

The following example shows how to use user-defined callback functions for ETL (extract, transform, load) operations on data. The process requires a user-defined stateless operator.

In this example, we use snapshots of Level-2 data to show how to perform data cleaning and transformation on raw market data.

(1) Create a shared stream table for publishing data.

colNames = `SecurityID`DateTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`InstrumentStatus`BidPrice0`BidPrice1`BidPrice2`BidPrice3`BidPrice4`BidPrice5`BidPrice6`BidPrice7`BidPrice8`BidPrice9`BidOrderQty0`BidOrderQty1`BidOrderQty2`BidOrderQty3`BidOrderQty4`BidOrderQty5`BidOrderQty6`BidOrderQty7`BidOrderQty8`BidOrderQty9`OfferPrice0`OfferPrice1`OfferPrice2`OfferPrice3`OfferPrice4`OfferPrice5`OfferPrice6`OfferPrice7`OfferPrice8`OfferPrice9`OfferOrderQty0`OfferOrderQty1`OfferOrderQty2`OfferOrderQty3`OfferOrderQty4`OfferOrderQty5`OfferOrderQty6`OfferOrderQty7`OfferOrderQty8`OfferOrderQty9
colTypes = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, INT, INT, INT, INT, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, INT, INT, INT, INT, INT, INT, INT]
share(table=streamTable(1:0, colNames, colTypes), sharedName="pubTable")

(2) Create a shared stream table for storing processed data.

colNames = ["SecurityID", "TradeDate", "TradeTime", "avgBidPrice", "sumBidQty", "minAskPrice", "maxAskQty"]
colTypes = [SYMBOL, DATE, TIME, DOUBLE, INT, DOUBLE, DOUBLE]
share(table=streamTable(1:0, colNames, colTypes), sharedName=`result)

(3) Define function dataETL as a stateless operator for subscription callback.

def dataETL(mutable result, msg){
	// data ETL
	tmp = 	select 	SecurityID,
			date(DateTime) as TradeDate,
			time(DateTime) as TradeTime,
			avg(BidPrice0+BidPrice1+BidPrice2+BidPrice3+BidPrice4+BidPrice5+BidPrice6+BidPrice7+BidPrice8+BidPrice9) as avgBidPrice,
			sum(BidOrderQty0+BidOrderQty1+BidOrderQty2+BidOrderQty3+BidOrderQty4+BidOrderQty5+BidOrderQty6+BidOrderQty7+BidOrderQty8+BidOrderQty9) as sumBidQty,
			min(OfferPrice0+OfferPrice1+OfferPrice2+OfferPrice3+OfferPrice4+OfferPrice5+OfferPrice6+OfferPrice7+OfferPrice8+OfferPrice9) as stdAskPrice,
			max(OfferOrderQty0+OfferOrderQty1+OfferOrderQty2+OfferOrderQty3+OfferOrderQty4+OfferOrderQty5+OfferOrderQty6+OfferOrderQty7+OfferOrderQty8+OfferOrderQty9) as skewAskQty
		from msg
		where LastPx>0, time(DateTime)>=09:30:00.000
	// result storage
	result.append!(tmp)
}
Note:

The parameter result in the user-defined function dataETL is a shared stream table used to store the processed data. It needs to be marked as a mutable parameter by using the mutable keyword. Otherwise, it will be regarded as a read-only variable and operations such as appending data (by append!) will not be allowed.

(4) Submit subscription.

subscribeTable(tableName="pubTable", actionName="dataETL", offset=-1, handler=dataETL{result}, msgAsTable=true, batchSize=2000, throttle=0.01, reconnect=true)
Note:

The handler parameter is set to the user-defined function dataETL, which takes two parameters: result and msg. result is a pre-defined and initialized variable that is passed into dataETL using partial application (via {}). msg represents the incremental data received from the subscribed stream table, which is automatically provided as the second argument to the function.

(5) Write mock data to the table pubTable.

rowNums = 10
simulateData = table(
	take(`000001SZ, rowNums) as SecurityID,
	take(0..(rowNums-1), rowNums)*1000*3+2023.12.15T09:30:00.000 as DateTime,
	take(10.6, rowNums) as PreClosePx,
	take(10.8, rowNums) as OpenPx,
	take(10.8, rowNums) as HighPx,
	take(10.2, rowNums) as LowPx,
	take(10.5, rowNums) as LastPx,
	take(0..(rowNums-1), rowNums)*1000+100000 as TotalVolumeTrade,
	take(0..(rowNums-1), rowNums)*1000*10.6+100000*10.6 as TotalValueTrade,
	take(`s, rowNums) as InstrumentStatus,
	take(10.4, rowNums) as BidPrice0,
	take(10.3, rowNums) as BidPrice1,
	take(10.2, rowNums) as BidPrice2,
	take(10.1, rowNums) as BidPrice3,
	take(10.0, rowNums) as BidPrice4,
	take(9.9, rowNums) as BidPrice5,
	take(9.8, rowNums) as BidPrice6,
	take(9.7, rowNums) as BidPrice7,
	take(9.6, rowNums) as BidPrice8,
	take(9.5, rowNums) as BidPrice9,
	take(10000, rowNums) as BidOrderQty0,
	take(20000, rowNums) as BidOrderQty1,
	take(30000, rowNums) as BidOrderQty2,
	take(40000, rowNums) as BidOrderQty3,
	take(50000, rowNums) as BidOrderQty4,
	take(60000, rowNums) as BidOrderQty5,
	take(50000, rowNums) as BidOrderQty6,
	take(40000, rowNums) as BidOrderQty7,
	take(30000, rowNums) as BidOrderQty8,
	take(20000, rowNums) as BidOrderQty9,
	take(10.6, rowNums) as OfferPrice0,
	take(10.7, rowNums) as OfferPrice1,
	take(10.8, rowNums) as OfferPrice2,
	take(10.9, rowNums) as OfferPrice3,
	take(11.0, rowNums) as OfferPrice4,
	take(11.1, rowNums) as OfferPrice5,
	take(11.2, rowNums) as OfferPrice6,
	take(11.3, rowNums) as OfferPrice7,
	take(11.4, rowNums) as OfferPrice8,
	take(11.5, rowNums) as OfferPrice9,
	take(10000, rowNums) as OfferOrderQty0,
	take(20000, rowNums) as OfferOrderQty1,
	take(30000, rowNums) as OfferOrderQty2,
	take(40000, rowNums) as OfferOrderQty3,
	take(50000, rowNums) as OfferOrderQty4,
	take(60000, rowNums) as OfferOrderQty5,
	take(50000, rowNums) as OfferOrderQty6,
	take(40000, rowNums) as OfferOrderQty7,
	take(30000, rowNums) as OfferOrderQty8,
	take(20000, rowNums) as OfferOrderQty9)
tableInsert(pubTable, simulateData)

(6) Check the result table.

res = select * from result where tradetime=09:30:09.000

The query result shows that the raw market data has been successfully cleaned and transformed.

(7) Unsubscribe from the stream table pubTable. Note that all subscriptions to a stream table must be canceledbefore deleting the table.

unsubscribeTable(tableName="pubTable", actionName="dataETL")

(8) Delete the stream table and the shared stream table.

dropStreamTable(tableName="pubTable")
dropStreamTable(tableName="result")