SubscriptionHelper#
- class swordfish._swordfishcpp.SubscriptionHelper#
A helper class for managing the stream subscription, allowing further configuration and submit of the subscription.
- offset(val=-1)#
Sets the position of the first message where the subscription begins.
- Parameters:
val (int, optional) – The offset position. Defaults to -1.
- Returns:
The instance itself.
- Return type:
Self
Note
A message is a row of the stream table. The offset is relative to the first row of the stream table when it is created. If
valis unspecified or -1, the subscription starts with the next new message. Ifvalis -2, the system retrieves the persisted offset on disk and starts the subscription from there. If some rows were cleared from memory due to the cache size limit, they are still considered in determining where the subscription starts.
- msg_as_table(val=False)#
Sets whether the subscribed data is ingested into
handleras a table or as an AnyVector.- Parameters:
val (bool, optional) – Whether to ingest the subscribed data into the handler as a table. Defaults to False.
- Returns:
The instance itself.
- Return type:
Self
Note
If
valis True, the subscribed data is ingested into the handler as a table, allowing it to be processed with SQL statements. The default value is False, meaning the subscribed data is ingested as an AnyVector of columns.
- batch_size(val=0)#
Sets the number of unprocessed messages required to trigger the
handler.- Parameters:
val (int, optional) – The batch size threshold. Defaults to 0.
- Returns:
The instance itself.
- Return type:
Self
Note
If
valis positive, the handler does not process messages until the number of unprocessed messages reachesval. Ifvalis unspecified or non-positive, the handler processes incoming messages as soon as they arrive.
- throttle(val=1)#
Sets the maximum waiting time before the
handlerprocesses incoming messages if thebatch_sizecondition has not been met.- Parameters:
val (float, optional) – The maximum waiting time in seconds. Defaults to 1.
- Returns:
The instance itself.
- Return type:
Self
Note
This value is in seconds. This parameter has no effect if
batch_sizeis not specified. To setvalto less than 1 second, thesubThrottleconfiguration must be modified.
- hash(val=-1)#
Sets the hash value determining the subscription executor.
- Parameters:
val (int, optional) – The hash value for assigning an executor. Defaults to -1.
- Returns:
The instance itself.
- Return type:
Self
Note
This non-negative integer specifies which subscription executor will process the incoming messages. If
valis unspecified, the system automatically assigns an executor. To synchronize messages from multiple subscriptions, set the same hash value for all of them to ensure they are processed by the same executor.
- reconnect(val=False)#
Sets whether the subscription can be automatically resumed if interrupted.
- Parameters:
val (bool, optional) – Whether to enable automatic resubscription. Defaults to False.
- Returns:
The instance itself.
- Return type:
Self
Note
If
valis True, the subscription attempts to resume and retrieve all streaming data since the interruption. Behavior depends on the interruption type:If the network is disconnected but both nodes remain running, reconnection occurs automatically when the network is restored.
If the publisher node crashes, the subscriber retries resubscribing after the publisher restarts:
If the publisher adopts data persistence mode, resubscription succeeds only after persisted data has been loaded and the publisher reaches the row of data where the subscription was interrupted.
If the publisher does not adopt data persistence, resubscription fails.
If the subscriber node crashes, automatic resubscription is not possible and subscription must be submitted again.
- filter(val=None)#
Sets the filter condition(s) for the subscription.
- Parameters:
val (optional) – The filter condition(s) for the subscription. Defaults to None.
- Returns:
The instance itself.
- Return type:
Self
Note
Must be used with the
set_filter_columnfunction. The filter can be used in the following ways:Value filtering: A Vector specifying allowed values.
Range filtering: A Pair defining an inclusive lower bound and an exclusive upper bound.
Hash filtering: An AnyVector where:
The first element is the number of buckets.
The second element is either a scalar specifying the bucket index (starting from 0) or a Pair specifying an index range (inclusive lower bound, exclusive upper bound).
Custom function filtering: A FunctionDef or a str (indicating function name or lambda expression). The subscribed data is passed into the function as a table, and the function result is sent to the subscriber.
filterdoes not support Boolean types.
- persist_offset(val=False)#
Sets whether to persist the offset of the last processed message.
- Parameters:
val (bool, optional) – Whether to persist the last processed message offset. Defaults to False.
- Returns:
The instance itself.
- Return type:
Self
Note
This is useful for resubscription and can be retrieved using
Topic.processed_offset.To resubscribe from the persisted offset, set
persist_offsetto True andremove_offsetinunsubscribeto False.
- time_trigger(val=False)#
Sets whether the handler is triggered at intervals even if no new messages arrive.
- Parameters:
val (bool, optional) – Whether to trigger the handler at fixed intervals. Defaults to False.
- Returns:
The instance itself.
- Return type:
Self
Note
If
valis True, the handler triggers at the intervals specified bythrottle, even when no new messages are received.
- handler_need_msg_id(val=False)#
Sets whether the
handlerrequires message IDs.- Parameters:
val (bool, optional) – Whether the handler requires message IDs. Defaults to False.
- Returns:
The instance itself.
- Return type:
Self
Note
If
valis True, the handler must accept two parameters:msg_body: The messages ingested into the streaming engine.msg_id: The ID of the last ingested message.
If
valis False, the handler must accept only one parameter:msg_body.