ReactiveStateEngineBuilder#

class swordfish._engine.ReactiveStateEngineBuilder(name, table_schema, output, metrics, *, key_col=None, filter=None, snapshot_dir=None, snapshot_interval_in_msg_count=None, keep_order=None, key_purge_filter=None, key_purge_freq_in_second=None, output_elapsed_microseconds=False, key_capacity=1024, parallelism=1, output_handler=None, msg_as_table=False)#
Parameters:
  • name (str)

  • table_schema (Table | Dict[str, DataType | str])

  • output (Table)

  • key_col (List[str] | str | None)

  • filter (MetaCode | None)

  • snapshot_dir (Path | str | None)

  • snapshot_interval_in_msg_count (int | None)

  • keep_order (bool | None)

  • key_purge_filter (MetaCode | None)

  • key_purge_freq_in_second (int | None)

  • output_elapsed_microseconds (bool)

  • key_capacity (int)

  • parallelism (int)

  • output_handler (FunctionDef | None)

  • msg_as_table (bool)

key_col(val=None)#

Specifies the grouping column(s) for the calculation.

The calculation is conducted within each group defined by the specified column(s).

Parameters:

val (Optional[Union[List[str], str]], optional) – The column(s) to group by. Defaults to None.

Returns:

The instance itself.

Return type:

Self

filter(val=None)#

Specifies the filtering conditions for the output table.

The MetaCode represents the filtering conditions, which must be an expression and can only include columns of dummy_table. Multiple conditions can be combined using logical operators (and, or). Only results satisfying the filter conditions are included in the output table.

Parameters:

val (Optional[MetaCode], optional) – The MetaCode representing the filtering conditions. Defaults to None.

Returns:

The instance itself.

Return type:

Self

snapshot_dir(val=None)#

Sets the directory where the streaming engine snapshot is saved.

The directory must already exist, or an exception will be raised. If a snapshot directory is specified, the system checks for an existing snapshot in the directory when creating the streaming engine.

If found, the snapshot is loaded to restore the engine’s state. Multiple streaming engines can share a directory, with snapshot files named after the engine names.

Snapshot file extensions:
  • <engineName>.tmp: Temporary snapshot.

  • <engineName>.snapshot: A snapshot that is flushed to disk.

  • <engineName>.old: If a snapshot with the same name exists, the previous one is renamed to <engineName>.old.

Parameters:

val (Optional[Union[Path, str]], optional) – The directory path for saving the snapshot. Defaults to None.

Returns:

The instance itself.

Return type:

Self

snapshot_interval_in_msg_count(val=None)#

Sets the number of messages to receive before saving the next snapshot.

Parameters:

val (Optional[int], optional) – The number of messages before the next snapshot. Defaults to None.

Returns:

The instance itself.

Return type:

Self

keep_order(val=None)#

Specifies whether to preserve the insertion order of records in the output table.

If key_col contains a time column, the default value is True; otherwise, it is False.

Parameters:

val (Optional[bool], optional) – Whether to preserve the insertion order. Defaults to None.

Returns:

The instance itself.

Return type:

Self

key_purge_filter(val=None)#

Sets the filtering conditions to identify the data to be purged from the cache.

To clean up unnecessary data after calculations, specify both key_purge_filter and key_purge_freq_in_second.

This is MetaCode composed of conditional expressions that must refer to columns in the output table. The filter is effective only when key_col is specified.

Parameters:

val (Optional[MetaCode], optional) – The MetaCode filter conditions. Defaults to None.

Returns:

The instance itself.

Return type:

Self

key_purge_freq_in_second(val=None)#

Sets the time interval (in seconds) to trigger a purge. The purge is triggered when the time since the last data ingestion meets or exceeds this interval.

The filter is effective only when key_col is specified.

For each data ingestion, a purge is triggered if the following conditions are met:

  1. The time elapsed since the last data ingestion is equal to or greater than key_purge_freq_in_second (for the first check, the time elapsed between data ingestion and engine creation is used).

  2. If the first condition is met, key_purge_filter is applied to determine the data to be purged.

  3. The number of groups containing data to be purged is equal to or greater than 10% of the total groups in the engine.

To check engine status before and after the purge, access the attribute ReactiveStateEngine.stat, where the numGroups field indicates the number of groups in the reactive state engine.

Parameters:

val (Optional[int], optional) – The time interval (in seconds) to trigger the purge. Defaults to None.

Returns:

The instance itself.

Return type:

Self

output_elapsed_microseconds(val=False)#

Determines whether to output the elapsed time (in microseconds).

The elapsed time is measured from when the calculation is triggered to when the result is output for each window. When both output_elapsed_microseconds and useSystemTime parameters are set to true, aggregate function cannot be used in metrics.

Parameters:

val (bool, optional) – Whether to output the elapsed time. Defaults to False.

Returns:

The instance itself.

Return type:

Self

key_capacity(val=1024)#

A positive integer indicating the amount of memory allocated for buffering state of each group.

The memory is allocated on a row basis. The default value is 1024. For data with a large number of groups, setting this parameter can reduce latency.

Parameters:

val (int, optional) – A positive integer. Defaults to 1024.

Returns:

The instance itself.

Return type:

Self

parallelism(val=1)#

A positive integer no greater than 63, indicating the maximum number of workers that can run in parallel.

The default value is 1. For large computation workloads, adjusting this parameter can effectively utilize computing resources and reduce computation time.

Note: parallelism cannot exceed the lesser of the numbers of logical cores minus one.

Parameters:

val (int, optional) – A positive integer. Defaults to 1.

Returns:

The instance itself.

Return type:

Self

output_handler(val=None)#

A unary function or a partial function with a single unfixed parameter.

If set, the engine will not write the calculation results to the output table directly. Instead, the results will be passed as a parameter to the specified function.

Parameters:

val (Optional[FunctionDef], optional) – A unary function or a partial function with a single unfixed parameter. The default value is null, which means the result will be written to the output table.

Returns:

The instance itself.

Return type:

Self

msg_as_table(val=False)#

Sets whether the output data is passed into the function (specified by output_handler) as a Table or as an AnyVector. If True, the data is passed as a Table; otherwise, it is passed as AnyVector of columns.

Parameters:

val (bool, optional) – Whether to pass data as a Table (True) or as an AnyVector (False). Defaults to False.

Returns:

The instance itself.

Return type:

Self

submit()#

Abstract method to build a StreamEngine.

Returns:

An instance of a built StreamEngine.

Return type:

StreamEngine