TimeSeriesEngineBuilder#

class swordfish._engine.TimeSeriesEngineBuilder(name, table_schema, outputs, window_size, step, metrics, *, time_col=None, use_system_time=False, key_col=None, garbage_size=50000, update_time=None, use_window_start_time=False, round_time=True, snapshot_dir=None, snapshot_interval_in_msg_count=None, fill='none', force_trigger_time=None, key_purge_freq_in_sec=None, closed='left', output_elapsed_microseconds=False, sub_window=None, parallelism=1, accepted_delay=0, output_handler=None, msg_as_table=False)#
Parameters:
  • name (str)

  • table_schema (Table | Dict[str, DataType | str])

  • outputs (Table)

  • time_col (List[str] | str | None)

  • use_system_time (bool)

  • key_col (List[str] | str | None)

  • garbage_size (int)

  • update_time (int | None)

  • use_window_start_time (bool)

  • round_time (bool)

  • snapshot_dir (Path | str | None)

  • snapshot_interval_in_msg_count (int | None)

  • fill (Literal['none', 'null', 'ffill'] | ~swordfish._swordfishcpp.Constant | ~typing.List[~typing.Literal['null', 'ffill'] | ~swordfish._swordfishcpp.Constant])

  • force_trigger_time (int | None)

  • key_purge_freq_in_sec (int | None)

  • closed (Literal['left', 'right'])

  • output_elapsed_microseconds (bool)

  • sub_window (int | Constant | None)

  • parallelism (int)

  • accepted_delay (int)

  • output_handler (FunctionDef | None)

  • msg_as_table (bool)

time_col(val=None)#

Sets the time column(s) for the subscribed stream table.

Parameters:

val (Union[List[str], str], optional) – Specifies the time column(s). If provided as a list, it must contain exactly two elements: a date (as a DATE type) and a time (as a TIME, SECOND, or NANOTIME type). In such cases, the first column of the output table will combine these elements into a single datetime value, with the data type matching the result of concatDateTime(date, time). Defaults to None.

Returns:

The instance itself.

Return type:

Self

use_system_time(val=False)#

Sets whether to perform calculations based on system time when ingesting data.

Parameters:

val (bool, optional) – If True, the engine will regularly window the streaming data at fixed time intervals for calculations according to the ingestion time (local system time with millisecond precision, independent of any temporal columns in the streaming table) of each record. As long as a window contains data, the calculation will be performed automatically when the window ends. The first column in output table indicates the timestamp when the calculation occurred. If False, the engine windows data based on a specified time column in the stream table. The calculation for a window is triggered by the first record arriving after the previous window, excluding the triggering record. Defaults to False.

Returns:

The instance itself.

Return type:

Self

key_col(val=None)#

Sets the name of the grouping column(s).

Parameters:

val (Union[List[str], str], optional) – The name of the grouping column(s). Defaults to None.

Returns:

The instance itself.

Return type:

Self

garbage_size(val=50000)#

Sets the threshold for garbage collection of historical data.

Parameters:

val (int, optional) – The threshold for garbage collection in number of rows. Defaults to 50,000.

Returns:

The instance itself.

Return type:

Self

update_time(val=None)#

Sets the interval to trigger window calculations before the window ends.

Parameters:

val (int, optional) – The interval to trigger window calculations. Defaults to None.

Returns:

The instance itself.

Return type:

Self

use_window_start_time(val=False)#

Sets whether the time column in the output table uses the starting time of the windows.

Parameters:

val (bool, optional) – Whether to use the starting time of the windows. If False, the timestamps in the output table represent the end time of the windows. If window_size is a list, use_window_startTime must be False. Defaults to False.

Returns:

The instance itself.

Return type:

Self

round_time(val=True)#

Aligns the window boundary based on the specified alignment rule.

Parameters:

val (bool, optional) – If True, uses the multi-minute rule for alignment. If False, uses the one-minute rule. Defaults to True.

Returns:

The instance itself.

Return type:

Self

snapshot_dir(val=None)#

Sets the directory where the streaming engine snapshot is saved.

Parameters:

val (Union[Path, str], optional) – The directory path for saving the snapshot. Defaults to None.

Returns:

The instance itself.

Return type:

Self

snapshot_interval_in_msg_count(val=None)#

Sets the number of messages to receive before saving the next snapshot.

Parameters:

val (int, optional) – The number of messages before the next snapshot. Defaults to None.

Returns:

The instance itself.

Return type:

Self

fill(val='none')#

Sets the filling method(s) to deal with an empty window (in a group).

Parameters:

val (Union[Literal["none", "null", "ffill"], Constant,) – List[Union[Literal[“null”, “ffill”], Constant]]], optional The filling method or a list of filling methods. Defaults to “none”.

Returns:

The instance itself.

Return type:

Self

force_trigger_time(val=None)#

Sets the waiting time to force trigger calculation in uncalculated windows for each group.

Parameters:

val (int, optional) – The waiting time in milliseconds to trigger window calculation. Defaults to None.

Returns:

The instance itself.

Return type:

Self

key_purge_freq_in_sec(val=None)#

Sets the interval in seconds to remove groups with no incoming data for a long time.

Parameters:

val (int, optional) – The interval (in seconds) to purge inactive groups. Defaults to None.

Returns:

The instance itself.

Return type:

Self

closed(val='left')#

Specifies whether the left or right boundary is included in the window.

Parameters:

val (Literal["left", "right"], optional) – Specifies which boundary is included. Defaults to “left”.

Returns:

The instance itself.

Return type:

Self

output_elapsed_microseconds(val=False)#

Determines whether to output the elapsed time (in microseconds).

Parameters:

val (bool, optional) – Whether to output the elapsed time. Defaults to False.

Returns:

The instance itself.

Return type:

Self

sub_window(val=None)#

Specifies the range of the subwindow within the window defined by window_size.

Parameters:

val (Union[int, Constant], optional) – The range of the subwindow. Defaults to None.

Returns:

The instance itself.

Return type:

Self

parallelism(val=1)#

Sets the number of worker threads for parallel computation.

Parameters:

val (int, optional) – The number of worker threads. Defaults to 1.

Returns:

The instance itself.

Return type:

Self

accepted_delay(val=0)#

Sets the maximum delay for each window to accept data.

Parameters:

val (int, optional) – A positive integer specifying the maximum delay. Defaults to 0.

Returns:

The instance itself.

Return type:

Self

output_handler(val=None)#

Sets a unary or partial function to handle the output. If specified, the engine will not write calculation results to the output table directly.

Parameters:

val (FunctionDef, optional) – The function to handle the output. Defaults to None.

Returns:

The instance itself.

Return type:

Self

msg_as_table(val=False)#

Sets whether the output data is passed into the function (specified by output_handler) as a table or as an AnyVector.

Parameters:

val (bool, optional) – Whether to pass data as a table (True) or as an AnyVector (False). Defaults to False.

Returns:

The instance itself.

Return type:

Self

submit()#

Abstract method to build a StreamEngine.

Returns:

An instance of a built StreamEngine.

Return type:

StreamEngine