State Functions
The swordfish.h header file declares a structure named
ReactiveStateFactory. This structure encapsulates several static
functions that implement diverse state operators for reactive state engine.
State functions' distinctive feature is that their output is determined by both the current input and the historical context. After processing each input, these operators update their internal state, which then influences future calculations.
Features of State Functions
State functions are a core component of Swordfish, offering several notable features:
- Incremental processing: Support for performing computations based on both current inputs and previous states. This approach eliminates redundant calculations, significantly enhancing computational efficiency.
- Group-based calculations: Allow for data to be segmented into groups, with computational rules applied separately to each group.
- Diverse operator types: Support for a wide array of operators, including: TA-Lib, cumulative window, row-based, order-sensitive, topN, and higher-order functions.
Application Scenarios
State functions are well-suited for high-frequency computational scenarios that require immediate response to individual data points and real-time updates of calculation results. Two prime examples include:
- Financial Analytics: In the finance sector, these functions excel at real-time computation of high-frequency factors. For instance, they can calculate real-time stock price movements based on snapshot data, providing up-to-the-second insights for traders and analysts.
- Internet of Things (IoT) Monitoring: In IoT applications, state functions are helpful to detect subtle changes in sensor states. They enable real-time tracking of various data indicators such as temperature fluctuations, humidity levels, and pressure changes.
How to Use State Functions
This section explains how to use state functions. You can access the full range of
Swordfish’s state functions by including Swordfish.h header file in their C++
code using #include directive.
Before using Swordfish, call DolphinDBLib::initializeRuntime to
initialize the runtime. After all operations are performed, you can call
DolphinDBLib::finalizeRuntime to finalize the runtime.
#include "Swordfish.h"
int main()
{
DolphinDBLib::initializeRuntime();
// function implementation
DolphinDBLib::finalizeRuntime();
return 0;
}
Let's begin with the declaration for the state function.
Function Declaration
ReactiveStateFactory, we will use
createMsumReactiveState as an example. Its function
declaration is as
follows:createMsumReactiveState(const vector<ObjectSP>& args,
const vector<int>& inputColIndices, const vector<DATA_TYPE>& inputColTypes,
const vector<int>& outputColIndices)- args: A list of input arguments, which should be provided
according to the
msumfunction's specifications. - inputColIndices: The index of the input column.
- inputColTypes: The type of the input column.
- outputColIndices: The index of the output column.
Examples
This example demonstrates how to use state functions by creating an operator for
incremental calculation of a moving sum within a specified window. In such case,
we can use the createMsumReactiveState state function to create
a “msumReactiveState” operator. The process is as follows:
- Initialize a DolphinDB session for executing built-in functions.
- Create a state table to cache values (val) and its moving sum (msum).
- Form an args array with two elements: a reference to the val column and a window size of 3.
- Call
ReactiveStateFactory::createMsumReactiveStateto create the msumReactiveState operator. - Insert an initial value (DBL_NMIN) into the state table to represent the starting state.
- Create an empty msumResult vector to store calculation results.
- Define the input data (valData) for the moving sum operation.
- Update the state table and invoke
msumReactiveState->append()for incremental calculations. Extract the current moving sum from the state table and append it to msumResult. - Output the final results.
#include "Swordfish.h"
int main() {
DolphinDBLib::initializeRuntime();
SessionSP session = DolphinDBLib::createSession();
// Calculate msum(val, 3) context by sym
vector<string> colNames = {"val", "msum"};
vector<DATA_TYPE> colTypes = { DT_DOUBLE, DT_DOUBLE};
TableSP stateTable = Util::createTable(colNames, colTypes, 0, 1);
vector<ObjectSP> args(2);
SQLContextSP context = new SQLContext();
args[0] = new ColumnRef(context, "val");
// Set window = 3
args[1] = new Int(3);
// Set input column index in stateTable
vector<int> inputColIndices = {0};
// Set input column data type in stateTable
vector<DATA_TYPE> inputColTypes = {DT_DOUBLE};
// Set output column index in stateTable
vector<int> outputColIndices = {1};
// Create msum reactive state
ReactiveStateSP msumReactiveState = ReactiveStateFactory::createMsumReactiveState(args, inputColIndices, inputColTypes, outputColIndices);
msumReactiveState->setTable(stateTable);
msumReactiveState->setSession(session);
context->setTable(stateTable);
INDEX rowInserted = 0;
string errMsg;
std::vector<ConstantSP> rowValues = {new Double(DBL_NMIN), new Double(DBL_NMIN)};
if(!stateTable->append(rowValues, rowInserted, errMsg)){
throw RuntimeException("Failed to append data to state table with error: " + errMsg);
}
msumReactiveState->addKeys(1);
// Create result VectorSP
VectorSP msumResult = Util::createVector(DT_DOUBLE, 0);
// Create input data
std::vector<double> valData = {1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0};
INDEX rows = valData.size();
ConstantSP val = new Double();
std::vector<ConstantSP> updateValue = {val};
std::vector<string> updateColName = {"val"};
VectorSP index = Util::createIndexVector(0,1);
// Iterate through each element of valCol
for(int i = 0; i < rows; i++){
// Get the val element
val->setDouble(valData[i]);
if(!stateTable->update(updateValue, index, updateColName, errMsg)){
throw RuntimeException("Failed to update state table with error: " + errMsg);
}
// Incrementally calculate msum
msumReactiveState->append(session->getHeap().get(), index);
// Get the result from stateTable for the current index
ConstantSP res = stateTable->getColumn(1)->get(index);
msumResult->append(res);
}
std::cout<<" msum result"<<std::endl;
std::cout << msumResult->getString() << std::endl;
DolphinDBLib::finalizeRuntime();
return 0;
}
