State Functions

The swordfish.h header file declares a structure named ReactiveStateFactory. This structure encapsulates several static functions that implement diverse state operators for reactive state engine.

State functions' distinctive feature is that their output is determined by both the current input and the historical context. After processing each input, these operators update their internal state, which then influences future calculations.

Features of State Functions

State functions are a core component of Swordfish, offering several notable features:

  • Incremental processing: Support for performing computations based on both current inputs and previous states. This approach eliminates redundant calculations, significantly enhancing computational efficiency.
  • Group-based calculations: Allow for data to be segmented into groups, with computational rules applied separately to each group.
  • Diverse operator types: Support for a wide array of operators, including: TA-Lib, cumulative window, row-based, order-sensitive, topN, and higher-order functions.

Application Scenarios

State functions are well-suited for high-frequency computational scenarios that require immediate response to individual data points and real-time updates of calculation results. Two prime examples include:

  • Financial Analytics: In the finance sector, these functions excel at real-time computation of high-frequency factors. For instance, they can calculate real-time stock price movements based on snapshot data, providing up-to-the-second insights for traders and analysts.
  • Internet of Things (IoT) Monitoring: In IoT applications, state functions are helpful to detect subtle changes in sensor states. They enable real-time tracking of various data indicators such as temperature fluctuations, humidity levels, and pressure changes.

How to Use State Functions

This section explains how to use state functions. You can access the full range of Swordfish’s state functions by including Swordfish.h header file in their C++ code using #include directive.

Before using Swordfish, call DolphinDBLib::initializeRuntime to initialize the runtime. After all operations are performed, you can call DolphinDBLib::finalizeRuntime to finalize the runtime.

#include "Swordfish.h"
int main()
{
    DolphinDBLib::initializeRuntime();
    // function implementation
    DolphinDBLib::finalizeRuntime();
    return 0;
}

Let's begin with the declaration for the state function.

Function Declaration

Since all member functions share the same structure of ReactiveStateFactory, we will use createMsumReactiveState as an example. Its function declaration is as follows:
createMsumReactiveState(const vector<ObjectSP>& args, 
const vector<int>& inputColIndices, const vector<DATA_TYPE>& inputColTypes, 
const vector<int>& outputColIndices)
This function implements the DolphinDB msum function, calculating the moving sum in a sliding window. Its parameter list is as follows:
  • args: A list of input arguments, which should be provided according to the msum function's specifications.
  • inputColIndices: The index of the input column.
  • inputColTypes: The type of the input column.
  • outputColIndices: The index of the output column.

Examples

This example demonstrates how to use state functions by creating an operator for incremental calculation of a moving sum within a specified window. In such case, we can use the createMsumReactiveState state function to create a “msumReactiveState” operator. The process is as follows:

  1. Initialize a DolphinDB session for executing built-in functions.
  2. Create a state table to cache values (val) and its moving sum (msum).
  3. Form an args array with two elements: a reference to the val column and a window size of 3.
  4. Call ReactiveStateFactory::createMsumReactiveState to create the msumReactiveState operator.
  5. Insert an initial value (DBL_NMIN) into the state table to represent the starting state.
  6. Create an empty msumResult vector to store calculation results.
  7. Define the input data (valData) for the moving sum operation.
  8. Update the state table and invoke msumReactiveState->append() for incremental calculations. Extract the current moving sum from the state table and append it to msumResult.
  9. Output the final results.
#include "Swordfish.h"
int main() {
    DolphinDBLib::initializeRuntime();
    SessionSP session = DolphinDBLib::createSession();
    // Calculate msum(val, 3) context by sym
    vector<string> colNames = {"val", "msum"};
    vector<DATA_TYPE> colTypes = { DT_DOUBLE, DT_DOUBLE};
    TableSP stateTable = Util::createTable(colNames, colTypes, 0, 1);
    vector<ObjectSP> args(2);
    SQLContextSP context = new SQLContext();
    args[0] = new ColumnRef(context, "val");
    // Set window = 3
    args[1] = new Int(3); 
    // Set input column index in stateTable
    vector<int> inputColIndices = {0}; 
    // Set input column data type in stateTable
    vector<DATA_TYPE> inputColTypes = {DT_DOUBLE};
    // Set output column index in stateTable
    vector<int> outputColIndices = {1}; 
    // Create msum reactive state
    ReactiveStateSP msumReactiveState = ReactiveStateFactory::createMsumReactiveState(args, inputColIndices, inputColTypes, outputColIndices);
    msumReactiveState->setTable(stateTable);
    msumReactiveState->setSession(session);
    context->setTable(stateTable);
    INDEX rowInserted = 0;
    string errMsg;
    std::vector<ConstantSP> rowValues = {new Double(DBL_NMIN), new Double(DBL_NMIN)};
    if(!stateTable->append(rowValues, rowInserted, errMsg)){
        throw RuntimeException("Failed to append data to state table with error: " + errMsg);
    }
    msumReactiveState->addKeys(1);
    // Create result VectorSP
    VectorSP msumResult = Util::createVector(DT_DOUBLE, 0);
    // Create input data
    std::vector<double> valData = {1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0};

    INDEX rows = valData.size();
    ConstantSP val = new Double();
    std::vector<ConstantSP> updateValue = {val};
    std::vector<string> updateColName = {"val"};
    VectorSP index = Util::createIndexVector(0,1);
    // Iterate through each element of valCol
    for(int i = 0; i < rows; i++){
        // Get the val element
        val->setDouble(valData[i]);
        if(!stateTable->update(updateValue, index, updateColName, errMsg)){
            throw RuntimeException("Failed to update state table with error: " + errMsg);
        }
        // Incrementally calculate msum
        msumReactiveState->append(session->getHeap().get(), index);
        // Get the result from stateTable for the current index
        ConstantSP res = stateTable->getColumn(1)->get(index);
        msumResult->append(res);
    }

    std::cout<<" msum result"<<std::endl;
    std::cout << msumResult->getString() << std::endl;
    DolphinDBLib::finalizeRuntime();
    return 0;
}