Stateless Operator
Stateless operators are used in stateless computation. The output is purely a function of the input data without relying on historical context of previously processed data. For the same input, the output will always be identical.

DolphinDB provides rich built-in functions, enabling users to perform complex data analysis and factor analysis in quantitative finance with minimal code. The built-infunctions can seamlessly function as stateless operators in stateless computation.
Applications
The following example shows how to use user-defined callback functions for ETL (extract, transform, load) operations on data. The process requires a user-defined stateless operator.
In this example, we use snapshots of Level-2 data to show how to perform data cleaning and transformation on raw market data.
(1) Create a shared stream table for publishing data.
colNames = `SecurityID`DateTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`InstrumentStatus`BidPrice0`BidPrice1`BidPrice2`BidPrice3`BidPrice4`BidPrice5`BidPrice6`BidPrice7`BidPrice8`BidPrice9`BidOrderQty0`BidOrderQty1`BidOrderQty2`BidOrderQty3`BidOrderQty4`BidOrderQty5`BidOrderQty6`BidOrderQty7`BidOrderQty8`BidOrderQty9`OfferPrice0`OfferPrice1`OfferPrice2`OfferPrice3`OfferPrice4`OfferPrice5`OfferPrice6`OfferPrice7`OfferPrice8`OfferPrice9`OfferOrderQty0`OfferOrderQty1`OfferOrderQty2`OfferOrderQty3`OfferOrderQty4`OfferOrderQty5`OfferOrderQty6`OfferOrderQty7`OfferOrderQty8`OfferOrderQty9
colTypes = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, INT, INT, INT, INT, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, INT, INT, INT, INT, INT, INT, INT]
share(table=streamTable(1:0, colNames, colTypes), sharedName="pubTable")
(2) Create a shared stream table for storing processed data.
colNames = ["SecurityID", "TradeDate", "TradeTime", "avgBidPrice", "sumBidQty", "minAskPrice", "maxAskQty"]
colTypes = [SYMBOL, DATE, TIME, DOUBLE, INT, DOUBLE, DOUBLE]
share(table=streamTable(1:0, colNames, colTypes), sharedName=`result)
(3) Define function dataETL
as a stateless operator for subscription
callback.
def dataETL(mutable result, msg){
// data ETL
tmp = select SecurityID,
date(DateTime) as TradeDate,
time(DateTime) as TradeTime,
avg(BidPrice0+BidPrice1+BidPrice2+BidPrice3+BidPrice4+BidPrice5+BidPrice6+BidPrice7+BidPrice8+BidPrice9) as avgBidPrice,
sum(BidOrderQty0+BidOrderQty1+BidOrderQty2+BidOrderQty3+BidOrderQty4+BidOrderQty5+BidOrderQty6+BidOrderQty7+BidOrderQty8+BidOrderQty9) as sumBidQty,
min(OfferPrice0+OfferPrice1+OfferPrice2+OfferPrice3+OfferPrice4+OfferPrice5+OfferPrice6+OfferPrice7+OfferPrice8+OfferPrice9) as stdAskPrice,
max(OfferOrderQty0+OfferOrderQty1+OfferOrderQty2+OfferOrderQty3+OfferOrderQty4+OfferOrderQty5+OfferOrderQty6+OfferOrderQty7+OfferOrderQty8+OfferOrderQty9) as skewAskQty
from msg
where LastPx>0, time(DateTime)>=09:30:00.000
// result storage
result.append!(tmp)
}
The parameter result in the user-defined function dataETL
is a shared stream table used to store the processed data. It needs to be marked
as a mutable parameter by using the mutable
keyword. Otherwise,
it will be regarded as a read-only variable and operations such as appending
data (by append!
) will not be allowed.
(4) Submit subscription.
subscribeTable(tableName="pubTable", actionName="dataETL", offset=-1, handler=dataETL{result}, msgAsTable=true, batchSize=2000, throttle=0.01, reconnect=true)
The handler parameter is set to the user-defined function
dataETL
, which takes two parameters: result and
msg. result is a pre-defined and initialized variable that is
passed into dataETL
using partial application (via
{}
). msg represents the incremental data received
from the subscribed stream table, which is automatically provided as the second
argument to the function.
(5) Write mock data to the table pubTable.
rowNums = 10
simulateData = table(
take(`000001SZ, rowNums) as SecurityID,
take(0..(rowNums-1), rowNums)*1000*3+2023.12.15T09:30:00.000 as DateTime,
take(10.6, rowNums) as PreClosePx,
take(10.8, rowNums) as OpenPx,
take(10.8, rowNums) as HighPx,
take(10.2, rowNums) as LowPx,
take(10.5, rowNums) as LastPx,
take(0..(rowNums-1), rowNums)*1000+100000 as TotalVolumeTrade,
take(0..(rowNums-1), rowNums)*1000*10.6+100000*10.6 as TotalValueTrade,
take(`s, rowNums) as InstrumentStatus,
take(10.4, rowNums) as BidPrice0,
take(10.3, rowNums) as BidPrice1,
take(10.2, rowNums) as BidPrice2,
take(10.1, rowNums) as BidPrice3,
take(10.0, rowNums) as BidPrice4,
take(9.9, rowNums) as BidPrice5,
take(9.8, rowNums) as BidPrice6,
take(9.7, rowNums) as BidPrice7,
take(9.6, rowNums) as BidPrice8,
take(9.5, rowNums) as BidPrice9,
take(10000, rowNums) as BidOrderQty0,
take(20000, rowNums) as BidOrderQty1,
take(30000, rowNums) as BidOrderQty2,
take(40000, rowNums) as BidOrderQty3,
take(50000, rowNums) as BidOrderQty4,
take(60000, rowNums) as BidOrderQty5,
take(50000, rowNums) as BidOrderQty6,
take(40000, rowNums) as BidOrderQty7,
take(30000, rowNums) as BidOrderQty8,
take(20000, rowNums) as BidOrderQty9,
take(10.6, rowNums) as OfferPrice0,
take(10.7, rowNums) as OfferPrice1,
take(10.8, rowNums) as OfferPrice2,
take(10.9, rowNums) as OfferPrice3,
take(11.0, rowNums) as OfferPrice4,
take(11.1, rowNums) as OfferPrice5,
take(11.2, rowNums) as OfferPrice6,
take(11.3, rowNums) as OfferPrice7,
take(11.4, rowNums) as OfferPrice8,
take(11.5, rowNums) as OfferPrice9,
take(10000, rowNums) as OfferOrderQty0,
take(20000, rowNums) as OfferOrderQty1,
take(30000, rowNums) as OfferOrderQty2,
take(40000, rowNums) as OfferOrderQty3,
take(50000, rowNums) as OfferOrderQty4,
take(60000, rowNums) as OfferOrderQty5,
take(50000, rowNums) as OfferOrderQty6,
take(40000, rowNums) as OfferOrderQty7,
take(30000, rowNums) as OfferOrderQty8,
take(20000, rowNums) as OfferOrderQty9)
tableInsert(pubTable, simulateData)
(6) Check the result table.
res = select * from result where tradetime=09:30:09.000

The query result shows that the raw market data has been successfully cleaned and transformed.
(7) Unsubscribe from the stream table pubTable. Note that all subscriptions to a stream table must be canceledbefore deleting the table.
unsubscribeTable(tableName="pubTable", actionName="dataETL")
(8) Delete the stream table and the shared stream table.
dropStreamTable(tableName="pubTable")
dropStreamTable(tableName="result")