CrossSectionalEngineBuilder#

class swordfish._engine.CrossSectionalEngineBuilder(name, table_schema, key_col, *, metrics=None, output=None, triggering_pattern='per_batch', triggering_interval=None, use_system_time=True, time_col=None, last_batch_only=False, context_by_col=None, snapshot_dir=None, snapshot_interval_in_msg_count=None, output_elapsed_microseconds=False, round_time=True, key_filter=None, updated_context_groups_only=False)#
Parameters:
  • name (str)

  • table_schema (Table | Dict[str, DataType | str])

  • key_col (List[str] | str)

  • output (Table)

  • triggering_pattern (Literal['per_batch', 'per_row', 'interval', 'key_count', 'data_interval'])

  • triggering_interval (Any)

  • use_system_time (bool)

  • time_col (str | None)

  • last_batch_only (bool)

  • context_by_col (List[str] | str | None)

  • snapshot_dir (Path | str | None)

  • snapshot_interval_in_msg_count (int | None)

  • output_elapsed_microseconds (bool)

  • round_time (bool)

  • key_filter (MetaCode | None)

  • updated_context_groups_only (bool)

metrics(val=None)#

Specifies the formulas for calculation using MetaCode or an AnyVector.

The value can be:
  • Built-in or user-defined aggregate functions, e.g., <myfunc(qty)>

>>> @F.swordfish_udf
>>> def myFunc(x):
...     return x + 1
...
>>> with sf.meta_code() as m:
...     metircs = myFunc(m.col("qty"))
  • Expressions on previous results, e.g., <avg(price1)>.

>>> with sf.meta_code() as m:
...     metrics = F.avg(m.col("price1"))
  • Calculations on multiple columns, e.g., <[std(price1-price2)]>.

>>> with sf.meta_code() as m:
...     metrics = F.std(m.col("price1") - m.col("price2"))
  • Functions with multiple returns, such as <func(price) as `col1`col2>.

>>> with sf.meta_code() as m:
...     metrics = m.col_alias(func(m.col("price")), ["col1", "col2"])

The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.

Parameters:

val (Union[MetaCode, List[MetaCode]], optional) – MetaCode or an AnyVector specifying the formulas. Defaults to None.

Returns:

The instance itself.

Return type:

Self

output(val=None)#

Specifies the output table for the results.

  • If context_by_col is not specified, the output columns are in the following order:

    • The first column is of TIMESTAMP type, storing the time when each calculation starts. If time_col is specified, it takes the values of time_col.

    • The column(s) storing calculation results. The data types of the column(s) must be the same as the results of metrics.

    • A column of LONG type storing the calculation time of each batch. Output only when output_elapsed_microseconds=True.

    • A column of INT type storing the number of records of each batch. Output only when output_elapsed_microseconds=True.

  • If context_by_col is specified, the output columns are in the following order:

    • The first column is of TIMESTAMP type, storing the time when each calculation starts. If time_col is specified, it takes the values of time_col.

    • The second column is the column specified by context_by_col.

    • The column(s) storing calculation results. The data types of the column(s) must be the same as the results of metrics.

    • A column of LONG type storing the calculation time of each batch. Output only when output_elapsed_microseconds=true.

    • A column of INT type storing the number of records of each batch. Output only when output_elapsed_microseconds=true.

Parameters:

val (Table, optional) – an in-memory table or a DFS table, by default None

Returns:

The instance itself.

Return type:

Self

triggering_pattern(val='per_batch')#

Specifies how to trigger the calculations. The engine returns a result every time a calculation is triggered.

Parameters:
  • val (Literal["per_batch", "per_row", "interval", "key_count", "data_interval"], optional) –

    • ‘per_batch’ (default): Calculates when a batch of data arrives.

    • ’per_row’: Calculates when a new record arrives.

    • ’interval’: Calculates at intervals specified by triggering_interval, using system time.

    • ’key_count’: When data with the same timestamp arrives in batches, the calculation is triggered when the number of keys with the latest timestamp reaches triggering_interval, or data with a newer timestamp arrives.

    • ’data_interval’: Calculates at intervals based on timestamps in the data. Requires time_col to be specified and use_system_time to be False.

  • note:: (..) – To use ‘key_count’ or ‘data_interval’, time_col must be specified and use_system_time must be set to False. Out-of-order data will be discarded in these cases.

Returns:

The instance itself.

Return type:

Self

triggering_interval(val=None)#

Sets the triggering interval for the system based on the triggering pattern.

The behavior of triggering_interval depends on the value of triggering_pattern:

  • If triggering_pattern = ‘interval’, triggering_interval is a positive integer indicating the interval in milliseconds between 2 adjacent calculations. Default is 1,000 milliseconds. A calculation is triggered every triggering_interval milliseconds if the data in the engine has not been calculated.

  • If triggering_pattern = ‘keyCount’, triggering_interval can either be:

    • An integer specifying a threshold for the number of uncalculated records.

    • A tuple of 2 elements:

      • The first element is an integer specifying the threshold of records with the latest timestamp to trigger a calculation.

      • The second element can be either:

        • An int threshold

        • A Duration value. For example, when triggering_interval is set to (c1, c2):

          • If c2 is an integer and the number of keys with the latest timestamp t1 doesn’t reach c1, calculation will not be triggered and the system goes on to save data with greater timestamp t2 (t2>t1). Data with t1 will be calculated when either of the events happens: the number of keys with timestamp t2 reaches c2, or data with greater timestamp t3 (t3>t2) arrives. Note that c2 must be smaller than c1.

          • If c2 is a duration and the number of keys with the latest timestamp t1 doesn’t reach c1, calculation will not be triggered and the system goes on to save data with greater timestamp t2 (t2>t1). Once data with t2 starts to come in, data with t1 will not be calculated until any of the events happens: the number of keys with timestamp t1 reaches c1, or data with greater timestamp t3 (t3>t2) arrives, or the Duration c2 comes to an end.

  • If triggering_pattern = ‘dataInterval’, triggering_interval is a positive integer in the same units as the timestamps in time_col. Default is 1,000 milliseconds. A calculation is triggered for all data in the current window when the first record of the next window arrives. A calculation is triggered only for windows containing data.

Parameters:

val (Any, optional) – The triggering interval or conditions. Defaults to None.

Returns:

The instance itself.

Return type:

Self

use_system_time(val=True)#

Sets whether calculations are performed based on the system time when data is ingested into the engine.

  • If use_system_time is True, the time column of output table is the system time.

  • If use_system_time is False, the time_Col parameter must be specified, and the time column of output table uses the timestamp of each record.

Parameters:

val (bool, optional) – Indicates whether to use system time for calculations. Defaults to True.

Returns:

The instance itself.

Return type:

Self

time_col(val=None)#

Specifies the time column in the stream table to which the engine subscribes when use_system_time is False. The column must be of Timestamp type.

Parameters:

val (Optional[str], optional) – The name of the time column. Defaults to None.

Returns:

The instance itself.

Return type:

Self

last_batch_only(val=False)#

Determines whether to keep only the records with the latest timestamp in the engine.

When last_batch_only is true, triggering_pattern must be set to ‘keyCount’, and the cross-sectional engine will only maintain key values with the latest timestamp for calculation.

Otherwise, the engine updates and retains all values for calculation.

Parameters:

val (bool, optional) – Whether to keep only the latest timestamped records. Defaults to False.

Returns:

The instance itself.

Return type:

Self

context_by_col(val=None)#

Specifies the grouping column(s) by which calculations are performed within groups. This parameter only takes effect if metrics and output are specified.

If metrics contain only aggregate functions, the results will be the same as a SQL query using group by.

Otherwise, the results will be consistent with using context by.

Parameters:

val (Optional[Union[List[str], str]], optional) – The grouping column(s) for the calculation. Defaults to None.

Returns:

The instance itself.

Return type:

Self

snapshot_dir(val=None)#

Sets the directory where the streaming engine snapshot is saved.

The directory must already exist, or an exception will be raised. If a snapshot directory is specified, the system checks for an existing snapshot in the directory when creating the streaming engine.

If found, the snapshot is loaded to restore the engine’s state. Multiple streaming engines can share a directory, with snapshot files named after the engine names.

Snapshot file extensions:
  • <engineName>.tmp: Temporary snapshot.

  • <engineName>.snapshot: A snapshot that is flushed to disk.

  • <engineName>.old: If a snapshot with the same name exists, the previous one is renamed to <engineName>.old.

Parameters:

val (Optional[Union[Path, str]], optional) – The directory path for saving the snapshot. Defaults to None.

Returns:

The instance itself.

Return type:

Self

snapshot_interval_in_msg_count(val=None)#

Sets the number of messages to receive before saving the next snapshot.

Parameters:

val (Optional[int], optional) – The number of messages before the next snapshot. Defaults to None.

Returns:

The instance itself.

Return type:

Self

output_elapsed_microseconds(val=False)#

Determines whether to output the elapsed time (in microseconds).

The elapsed time is measured from when the calculation is triggered to when the result is output for each window. When both output_elapsed_microseconds and useSystemTime parameters are set to true, aggregate function cannot be used in metrics.

Parameters:

val (bool, optional) – Whether to output the elapsed time. Defaults to False.

Returns:

The instance itself.

Return type:

Self

round_time(val=True)#

Aligns the window boundary based on the specified alignment rule.

If the time precision is in milliseconds or seconds and the step is greater than one minute, this method determines whether to apply multi-minute or one-minute alignment.

Parameters:

val (bool, optional) – If True, uses the multi-minute rule for alignment. If False, uses the one-minute rule. Defaults to True.

Returns:

The instance itself.

Return type:

Self

key_filter(val=None)#

Specifies the conditions for filtering keys in the keyed table returned by the engine.

Only data with keys satisfying the filtering conditions will be taken for calculation. The MetaCode represents an expression or function call that returns a bool vector.

Parameters:

val (Optional[MetaCode], optional) – MetaCode of the filtering conditions. Defaults to None.

Returns:

The instance itself.

Return type:

Self

updated_context_groups_only(val=False)#

Indicates whether to compute only the groups updated with new data since the last output.

Parameters:

val (bool, optional) – Whether to compute only updated groups. Defaults to False.

Returns:

The instance itself.

Return type:

Self

submit()#

Abstract method to build a StreamEngine.

Returns:

An instance of a built StreamEngine.

Return type:

StreamEngine