EventClient

The Python API 3.0.0.0 provides the EventClient class for subscribing to serialized event data from heterogeneous stream tables on server.

Methods

Constructor

The constructor declaration for EventClient is as follows:

EventClient(eventSchema, eventTimeFields=None, commonFields=None)

Arguments

  • eventSchema: The list of events to be sent, e.g., [EventA, EventB].
    • The events specified must be inherited from the Event class.
    • The schema of each event defined in API and DolphinDB must be the same.
  • eventTimeFields: str or list of strings, defaults to None. The time field for each event. It must be specified when the heterogeneous stream table "tableName" contains the time column.
    • It is a string if the field name is the same for all events.
    • It is a list of strings if field names vary across events. The order of strings must match that of the events specified in eventSchema.
  • commonFields: list of strings, defaults to None. It must be specified when the heterogeneous stream table "tableName" contains the common column.

subscribe

Use the subscribe method to subscribe to DolphinDB stream tables.

subscribe(host, port, handler, tableName
          actionName=None, offset=-1, resub=False,
          userName=None, password=None)

Arguments for Connection

  • host: str, required. The IP address of the publisher node.
  • port: str, required. The port number of the publisher node.
  • userName: str, optional. The username used to connect to the server.
  • password: str, optional. The password used to connect to the server.

Argument for Data Handling

  • handler: required. A user-defined function to process the received data. The msg passed to the handler in an event subscription is event instances specified by eventSchema in EventClient. For example:
def handler(msg):
    print(msg)

Arguments for Subscription

  • tableName: required. The name of the subscribed stream table.
  • actionName: required. The name of the subscription task.
    • Each subscription is uniquely identified with a subscription topic, which is a combination of the information about the node that the stream table is located (in the format of <IPAddress>/<port>), the stream table name, and the subscription task name (if actionName is specified) separated by "/". Subscription fails if an identical subscription topic already exists.
  • offset: int. The position of the first message where the subscription begins. A message is a row of the stream table. Offset is relative to the first row of the subscribed stream table when it was created. If some rows were cleared from memory due to cache size limit, they are still considered in determining where the subscription starts.
    • If offset is unspecified or -1, the subscription starts with the next new message.
    • If offset=-2, the system will get the persisted offset on disk and start subscription from there.
  • resub: bool, default False. Whether to re-subscribe after a network disconnection.

unsubscribe

The unsubscribe method cancels an existing subscription:

unsubscribe(host, port, tableName, actionName=None)
  • host: str, required. The IP address of the publisher node.
  • port: str, required. The port number of the publisher node.
  • tableName: required. The name of the subscribed stream table.
  • actionName: required. The name of the subscription task.

getSubscriptionTopics

The getSubscriptionTopics method returns all subscription topics for the current EventClient. Each topic is unique, which consists of host/port/tableName/actionName.

client.getSubscriptionTopics()

A Complete Example

The following example demonstrates the process of subscribing to event data from DolphinDB stream table:

DolphinDB server:

  1. Define events and the monitor.
  2. Define an output table and serializer.
  3. Create a CEP engine.
  4. Send events to the output queue of the CEP engine, outputting to a heterogeneous stream table.

Python API:

  1. Define events.
  2. Subscribe to the heterogeneous stream table.

Defining Events and the Monitor

In the following example, we define two events (EventA and EventB) and a monitor (Monitor).

Events

Both EventA and EventB contain attribute eventTime of TIMESTAMP type and a of INT type. EventB contains another attribute b of DOUBLE type.

class EventA {
    eventTime::TIMESTAMP
    a::INT
    def EventA(a_) {
        eventTime = now()
        a = a_
    }
}

class EventB {
    eventTime::TIMESTAMP
    a::INT
    b::DOUBLE
    def EventB(a_, b_) {
        eventTime = now()
        a = a_
        b = b_
    }
}

Monitor

For the Monitor class, define member functions updateEventA, updateEventB, and onload. The onload method uses the addEventListener function to register two listeners that continuously send event data of EventA and EventB to a serializer (specified in the CEP engine) as long as new instances of EventA or EventB are captured.

class Monitor {
  def Monitor(){}
  def updateEventA(event) {
      emitEvent(event)
  }
  def updateEventB(event) {
      emitEvent(event)
  }
  def onload() {
      addEventListener(updateEventA,"EventA",,"all")
      addEventListener(updateEventB,"EventB",,"all")
  }
}

Defining Heterogeneous Stream Table and Serializer

A heterogeneous stream table should be created as the output table for the serializer to receive the serialized event data. The table schema contains the following columns:

(1) optional. A time column for event times. To filter the stream table using event times, or to use time fields for processing, you can add a time column as the first column.

(2) required. A STRING/SYMBOL column for event type.

(3) required. A BLOB type for serialized event data.

(4) optional. Columns for event fields with the same attibute. To filter the stream table using common fields, you can add one or more columns following the above columns.

Follow the example above, as both EventA and EventB have attributes eventTime and a, we specify a time column ""eventTime and a column "a" as the common field.

The schema of the output table is . Use the following DolphinDB script to create a serializer:

share streamTable(100:0, `eventTime`eventType`blob`a, [TIMESTAMP, STRING, BLOB, INT]) as output
serializer = streamEventSerializer(name='serOutput', eventSchema=[EventA, EventB], outputTable=output, eventTimeField = "eventTime", commonField=["a"])

Creating a CEP Engine and Sending Events

Create a CEP engine "engine". Define a dummy table whose schema is the same as the heterogeneous stream table "output".

dummy = table(100:0, `eventTime`eventType`blob`a, [TIMESTAMP, STRING, BLOB, INT])
engine = createCEPEngine('cep', <Monitor()>, dummy, [EventA, EventB], 1, "eventTime", outputTable=serializer)

Use the appendEvent function to write events EventA and EventB to the "engine".

appendEvent(engine, EventA(1))
appendEvent(engine, EventB(2, 3.3))

Defining Events on API

In Python API, define events EventA and EventB with the same schema as events defined in server.

from dolphindb.cep import Event
from dolphindb.typing import Scalar
import dolphindb as ddb
import dolphindb.settings as keys

class EventA(Event):
    eventTime: Scalar[keys.DT_TIMESTAMP]
    a: Scalar[keys.DT_INT]

class EventB(Event):
    eventTime: Scalar[keys.DT_TIMESTAMP]
    a: Scalar[keys.DT_INT]
    b: Scalar[keys.DT_DOUBLE]

Subscribing to Event Data from Stream Table

Define function handler to print the received event data (msg) and use subscribe to subscribe to the "output" table.

from dolphindb.cep import EventClient
client = EventClient([EventA, EventB], "eventTime", ["a"])
client.subscribe("localhost", 8848, handler, "output", "action", offset=0)

from threading import Event as tEvent
tEvent().wait()

The output is as follows:

EventA({'eventTime': numpy.datetime64('2024-03-31T11:23:13.730'), 'a': 1})
EventB({'eventTime': numpy.datetime64('2024-03-31T11:23:13.730'), 'a': 2, 'b': 3.3})