CrossSectionalEngine#
- class swordfish._swordfishcpp.CrossSectionalEngine#
The cross-sectional streaming engine is used for real-time computing on cross-sectional data, which is a collection of observations (behaviors) for multiple subjects (entities such as different stocks) at a single point in time.
CrossSectionalEngine.createreturns a Builder object, and then call the submit to create a keyed table object with thekey_colparameter as the key. The keyed table is updated every time a new record arrives. If thelast_batch_onlyparameter is set to True, the table only maintains the latest record in each group. When new data is ingested into the engine:If
metricsandoutputare specified, the engine first updates the keyed table, then performs calculations on the latest data and outputs the results tooutput.If
metricsandoutputare not specified, the engine only updates the keyed table.
Calculation can be triggered by the number of records or time interval. See
createparameterstriggering_patternandtriggering_interval. Note that ifcontext_by_colis specified, the data will be grouped by the specified columns and calculated by group.Snapshot mechanism is used to restore the streaming engine to the latest snapshot after system interruption. (See
createparameterssnapshot_dirandsnapshot_interval_in_msg_count)- engine_type: EngineType#
The type of the streaming engine.
- stat: CrossSectionalEngineStat#
Descriptive statistics related to the streaming engine.
- classmethod create(name, table_schema, key_col, *, metrics=None, output=None, triggering_pattern='per_batch', triggering_interval=None, use_system_time=True, time_col=None, last_batch_only=False, context_by_col=None, snapshot_dir=None, snapshot_interval_in_msg_count=None, output_elapsed_microseconds=False, round_time=True, key_filter=None, updated_context_groups_only=False)#
Creates a cross-sectional streaming engine with the specified parameters and configuration.
- Parameters:
name (str) – The name of the engine. It can contain letters, numbers and “_” and must start with a letter.
table_schema (Union[Table, TypeDict]) – Specifies the column names and corresponding types of the input stream. If a Table is provided, its schema must match the schema of the subscribed stream table. Whether the table contains data or not doesn’t matter.
key_col (Union[List[str], str]) – One or more columns in the stream table as the key columns. For each key entry, only the latest record is used in the calculation.
metrics (optional) – The formulas for calculation using MetaCode or an AnyVector. Defaults to None.
output (Table) –
The output table for the results. It can be an in-memory table or a DFS table. Create an empty table and specify the column names and types before calling
create. Make sure the column types match the calculation results of the corresponding metrics. The columns in the output table are in the following order:The first column is of TIMESTAMP type.
If
use_system_time= True, the column stores the time when each calculation starts.If
use_system_time= False, it takes the values oftime_col.
The following column is the
context_by_col(if specified).If the
output_elapsed_microsecondsis set to True, specify two more columns: a LONG column and an INT column.The remaining columns store the calculation results of metrics.
triggering_pattern (Literal["per_batch", "per_row", "interval", "key_count", "data_interval"], optional) – Specifies how to trigger the calculations.
triggering_interval (Any, optional) – The triggering interval for the system based on the triggering pattern. Defaults to None.
use_system_time (bool, optional) – Whether the calculations are performed based on the system time when data is ingested into the engine. Defaults to True.
time_col (Optional[str], optional) – The time column in the stream table to which the engine subscribes if
use_system_time= False. Defaults to None.last_batch_only (bool, optional) – Whether to keep only the records with the latest timestamp in the engine. Defaults to False.
context_by_col (Optional[Union[List[str], str]], optional) – The grouping column(s) by which calculations are performed within groups. Only takes effect if
metricsandoutputare specified. Defaults to None.snapshot_dir (Optional[Union[Path, str]], optional) – The directory where the streaming engine snapshot is saved. Defaults to None.
snapshot_interval_in_msg_count (Optional[int], optional) – The number of messages to receive before saving the next snapshot. Defaults to None.
output_elapsed_microseconds (bool, optional) – Whether to output the elapsed time (in microseconds). Defaults to False.
round_time (bool, optional) – Aligns the window boundary based on the specified alignment rule. Defaults to True.
key_filter (Optional[MetaCode], optional) – The conditions for filtering keys in the keyed table returned by the engine. Defaults to None.
updated_context_groups_only (bool, optional) – Whether to compute only the groups updated with new data since the last output. Defaults to False.
- Returns:
An instance of
CrossSectionalEngineBuilderthat allows further configuration and execution of the cross-sectional engine. This object enables setting up the opional parameters.- Return type:
Examples
>>> import swordfish as sf >>> table_schema = {"timestamp": "DATETIME", "symbol": "STRING", "price": ... "DOUBLE", "volume": "LONG"} >>> output_table = sf.table(types={"symbol": "STRING", "avg_price": "DOUBLE", ... "total_volume": "LONG"}) >>> my_engine = sf.engine.CrossSectionalEngine.create( ... name="StockAnalysisEngine", ... table_schema=table_schema, ... key_col="symbol", ... metrics=["avg(price)", "sum(volume)"], ... output=output_table, ... triggering_pattern="interval", ... triggering_interval=10, ... use_system_time=True, ... time_col="timestamp", ... last_batch_only=False, ... snapshot_dir="/path/to/snapshot", ... snapshot_interval_in_msg_count=1000, ... round_time=True, ... updated_context_groups_only=True ... ).submit()