ReactiveStateEngine#
- class swordfish._swordfishcpp.ReactiveStateEngine#
The reactive state streaming engine maintains and updates states for stateful computations, ensuring efficient processing of continuous data streams. It triggers an output for each input record, supports only vectorized functions as operators, and optimizes stateful operations.
Note
Only the following optimized state functions can be used in the engine. Alternatively, you can implement a stateful indicator by defining a user- defined function and declaring it with keyword @state before the definition. Aggregate functions should be avoided.
Cumulative functions:
cumavg,cumsum,cumprod,cumcount,cummin,cummax,cumvar,cumvarp,cumstd,cumstdp,cumcorr,cumcovar,cumbeta,cumwsum,cumwavg,cumfirstNot,cumlastNot,cummed,cumpercentile,cumnunique,cumPositiveStreak,cummddMoving functions:
ema,mavg,msum,mcount,mprod,mvar,mvarp,mstd,mstdp,mskew,mkurtosis,mmin,mmax,mimin,mimax,mmed,mpercentile,mrank,mcorr,mcovar,mbeta,mwsum,mwavg,mmad,mfirst,mlast,mslr,tmove,tmfirst,tmlast,tmsum,tmavg,tmcount,tmvar,tmvarp,tmstd,tmstdp,tmprod,tmskew,tmkurtosis,tmmin,tmmax,tmmed,tmpercentile,tmrank,tmcovar,tmbeta,tmcorr,tmwavg,tmwsum,tmoving,moving,sma,wma,dema,tema,trima,linearTimeTrend,talib,t3,ma,mmaxPositiveStreakNote
If
talibis used as a state function, its first parameter must be a state function.Row-based functions:
rowMin,rowMax,rowAnd,rowOr,rowXor,rowProd,rowSum,rowSum2,rowSize,rowCount,rowAvg,rowKurtosis,rowSkew,rowVar,rowVarp,rowStd,rowStdpOrder-sensitive functions:
deltas,ratios,ffill,move,prev,iterate,ewmMean,ewmVar,ewmStd,ewmCov,ewmCorr,prevState,percentChangeTopN functions:
msumTopN,mavgTopN,mstdpTopN,mstdTopN,mvarpTopN,mvarTopN,mcorrTopN,mbetaTopN,mcovarTopN,mwsumTopN,cumwsumTopN,cumsumTopN,cumvarTopN,cumvarpTopN,cumstdTopN,cumstdpTopN,cumcorrTopN,cumbetaTopN,cumavgTopN,cumskewTopN,cumkurtosisTopN,mskewTopN,mkurtosisTopN,tmsumTopN,tmavgTopN,tmstdTopN,tmstdpTopN,tmvarTopN,tmvarpTopN,tmskewTopN,tmkurtosisTopN,tmbetaTopN,tmcorrTopN,tmcovarTopN,tmwsumTopNHigher-order functions:
segmentby(whose first parameter can only takecumsum,cummax,cummin,cumcount,cumavg,cumstd,cumvar,cumstdp,cumvarp),moving,byColumn,accumulate,windowOthers:
talibNull,topRange,lowRange,trueRangeFunctions that can only be used in the reactive state engine:
stateIterate,conditionalIterate,genericStateIterate,genericTStateIterateCalculation Rules#
The reactive state engine outputs a result for each input. If multiple records are ingested into the reactive state engine at the same time, the data is calculated in batches. The number of records in each batch is determined by the system.
To output only the results that met the specified conditions, set the parameter
filter.To perform calculations by group, set the parameter
key_col.To preserve the insertion order of the records in the output table, set the parameter
keep_order.
Features#
State cleanup: States in the engine are maintained by group. A large number of groups may lead to high memory overhead, and you can set a cleanup rule to clear data that are no longer needed. (See parameters
key_purge_filterandkey_purge_fre_in_second)Snapshot: Snapshot mechanism is used to restore the streaming engine to the latest snapshot after system interruption. (See parameters
snapshot_dirandsnapshot_interval_in_msg_count)
- engine_type: EngineType#
The type of the streaming engine.
- stat: ReactiveStateEngineStat#
Descriptive statistics related to the streaming engine.
- classmethod create(name, table_schema, output, metrics, *, key_col=None, filter=None, snapshot_dir=None, snapshot_interval_in_msg_count=None, keep_order=None, key_purge_filter=None, key_purge_freq_in_second=None, output_elapsed_microseconds=False, key_capacity=1024, parallelism=1, output_handler=None, msg_as_table=False)#
Creates a reactive state streaming engine with the specified parameters and configuration.
- Parameters:
name (str) – The name of the engine. It can contain letters, numbers and “_” and must start with a letter.
table_schema (Union[Table, TypeDict]) – Specifies the column names and corresponding types of the input stream. If a Table is provided, its schema must match the schema of the subscribed stream table. Whether the table contains data or not doesn’t matter.
output (Table) – The output table for the results. It can be an in-memory table or a DFS table. Create an empty table and specify the column names and types before calling
create. The columns in the output table are in the following order: (1) Ifkey_colis specified, the first few columns must match its order. (2) Ifoutput_elapsed_microsecondsis set to True, specify two more columns: a LONG column for elapsed time of each batch and an INT column for total records in each batch. (3) The remaining columns store the calculation results of metrics. Make sure the column types match the calculation results of the corresponding metrics.metrics – MetaCode specifying the formulas for calculation. The metacode can include one or more expressions, built-in or user-defined functions, or a constant scalar/vector. Note that the output column for a constant vector must be in array vector form.
key_col (Optional[Union[List[str], str]], optional) – The grouping column(s) for the calculation. Defaults to None.
filter (Optional[MetaCode], optional) – The filtering conditions for the output table. Defaults to None.
snapshot_dir (Optional[Union[Path, str]], optional) – The directory where the streaming engine snapshot is saved. Defaults to None.
snapshot_interval_in_msg_count (Optional[int], optional) – The number of messages to receive before saving the next snapshot. Defaults to None.
keep_order (Optional[bool], optional) – Whether to preserve the insertion order of records in the output table. Defaults to None.
key_purge_filter (Optional[MetaCode], optional) – The filtering conditions to identify the data to be purged from the cache. Defaults to None.
key_purge_freq_in_second (Optional[int], optional) – The time interval (in seconds) to trigger a purge. Defaults to None.
output_elapsed_microseconds (bool, optional) – Whether to output the elapsed time (in microseconds). Defaults to False.
key_capacity (int, optional) – A positive integer indicating the amount of memory allocated for buffering state of each group. Defaults to 1024.
parallelism (int, optional) – A positive integer no greater than 63, indicating the maximum number of workers that can run in parallel. Defaults to 1.
output_handler (Optional[FunctionDef], optional) – A unary function or a partial function with a single unfixed parameter. If set, the engine will not write the calculation results to the output table directly. Instead, the results will be passed as a parameter to the specified function. Defaults to None.
msg_as_table (bool, optional) – Whether the output data is passed into the function (specified by
output_handler) as a table or as an AnyVector. Defaults to False.
- Returns:
An instance of
ReactiveStateEngineBuilderthat allows further configuration and execution of the reactive state engine. This object enables setting up the optional parameters.- Return type:
Examples
>>> import swordfish as sf >>> table_schema = {"timestamp": "DATETIME", "device_id": "STRING", ... "temperature": "DOUBLE", "status": "STRING"} >>> output_table = sf.table(types={"device_id": "STRING", ... "max_temperature": "DOUBLE", "last_status": "STRING"}) >>> my_engine = sf.engine.ReactiveStateEngine.create( ... name="DeviceStateTracker", ... table_schema=table_schema, ... output=output_table, ... metrics=["max(temperature)", "last(status)"], ... key_col="device_id", ... filter=None, ... snapshot_dir="/path/to/snapshot", ... snapshot_interval_in_msg_count=1000, ... keep_order=True, ... key_purge_filter=None, ... key_purge_freq_in_second=60, ... key_capacity=4096, ... parallelism=2, ... output_handler=None, ... msg_as_table=True, ... ).submit()