EventSender
The Python API 3.0.0.0 provides the EventSender
class for writing serialized event data to heterogeneous stream tables. These stream tables will be subscribed by DolphinDB CEP engine to capture and process the corresponding events as they occur.
Methods
Constructor
The constructor declaration for EventSender
is as follows:
EventSender(ddbSession, tableName, eventSchema, eventTimeFields=None, commonFields=None)
Arguments
- ddbSession: required. A session already connected to DolphinDB.
- tableName: str, required. The name of the heterogeneous stream table to be written.
- eventSchema: required. The list of events to be sent, e.g., .
- The events specified must be inherited from the
Event
class. - The schema of each event defined in API and DolphinDB must be the same.
- The events specified must be inherited from the
- eventTimeFields: str or list of strings, defaults to None, optional. The time field for each event. It must be specified when the heterogeneous stream table "tableName" contains the time column.
- It is a string if the field name is the same for all events.
- It is a list of strings if field names vary across events. The order of strings must match that of the events specified in eventSchema.
- commonFields: list of strings, defaults to None, optional. It must be specified when the heterogeneous stream table "tableName" contains the common column.
sendEvent
Use the sendEvent
method to send events to DolphinDB server.
sendEvent(event)
Arguments
- event: The event instance.
A Complete Example
The following example demonstrates the process of sending events from API to DolphinDB server:
DolphinDB server:
- Define events and the monitor.
- Create a CEP engine.
- Create and subscribe to a heterogeneous stream table.
Python API:
- Define events.
- Write events to the heterogeneous stream table.
Defining Events and the Monitor
In the following example, we define two events (EventA
and EventB
), a shared in-memory table ("result"), and a monitor (Monitor
).
Events
Both EventA
and EventB
contain attribute eventTime of TIMESTAMP type and a of INT type. EventB
contains another attribute b of DOUBLE type.
class EventA {
eventTime::TIMESTAMP
a::INT
def EventA(a_) {
eventTime = now()
a = a_
}
}
class EventB {
eventTime::TIMESTAMP
a::INT
b::DOUBLE
def EventB(a_, b_) {
eventTime = now()
a = a_
b = b_
}
}
Shared In-Memory Table
Create table "result".
share table(100:0, `eventTime`eventType`a`b, [TIMESTAMP, SYMBOL, INT, DOUBLE]) as result
Monitor
For the Monitor
class,
- Define two member functions
updateEventA
andupdateEventB
to update data ofEventA
andEventB
to "result". - Define member function
onload
. Theonload
method uses theaddEventListener
function to register two listeners that continuously write event data to the "result" table as long as new instances of EventA or EventB are captured.
class Monitor {
def Monitor(){}
def updateEventA(event) {
insert into result values(event.eventTime, "A", event.a, NULL)
}
def updateEventB(event) {
insert into result values(event.eventTime, "B", event.a, event.b)
}
def onload() {
addEventListener(updateEventA,"EventA",,"all")
addEventListener(updateEventB,"EventB",,"all")
}
}
Creating a CEP Engine
When creating a CEP engine, you need to define a dummy table whose schema is the same as the heterogeneous stream table to which the engine subscribes. The table schema contains the following columns:
(1) optional. A time column for event times. To filter the stream table using event times, or to use time fields for processing, you can add a time column as the first column.
(2) required. A STRING/SYMBOL column for events.
(3) required. A BLOB type for serialized event data.
(4) optional. Columns for event fields with the same attibute. To filter the stream table using common fields, you can add one or more columns following the above columns.
Follow the example above, as both EventA
and EventB
have attributes eventTime and a, we specify a time column "eventTime" and a column "a" as the common field.
The schema of the stream table is . Use the following DolphinDB script to create a CEP engine:
dummy = table(100:0, `eventTime`eventType`blob`a, [TIMESTAMP, STRING, BLOB, INT])
engine = createCEPEngine('cep', <Monitor()>, dummy, [EventA, EventB], 1, "eventTime")
Subscribing to Events
The CEP engine receives event data with the subscription to a heterogeneous stream table. Define a stream table "input" in DolphinDB and initiate the subscription:
share streamTable(100:0, `eventTime`eventType`blob`a, [TIMESTAMP, STRING, BLOB, INT]) as input
subscribeTable(,`input, `action, 0, engine, true)
Defining Events on API
In Python API, define events EventA
and EventB
with the same schema as events defined in server.
from dolphindb.cep import Event
from dolphindb.typing import Scalar
import dolphindb.settings as keys
class EventA(Event):
eventTime: Scalar[keys.DT_TIMESTAMP]
a: Scalar[keys.DT_INT]
class EventB(Event):
eventTime: Scalar[keys.DT_TIMESTAMP]
a: Scalar[keys.DT_INT]
b: Scalar[keys.DT_DOUBLE]
Writing Event Data to Stream Table
Construct a EventSender
. The EventA
and EventB
(specified by eventSchema) will be sent to stream table "input" (specified by tableName). Since the "input" table contains time column and column for common field, eventTimeFields and commonFields are specified as "eventTime" and , respectively.
import dolphindb as ddb
from dolphindb.cep import EventSender
s = ddb.Session()
s.connect("localhost", 8848, "admin", "123456")
sender = EventSender(s, "input", [EventA, EventB], "eventTime", ["a"])
Then, use sendEvent
method to write event instances to EventA
and EventB
.
import numpy as np
sender.sendEvent(EventA(np.datetime64("2024-03-31T12:00:00.123", "ms"), 1))
sender.sendEvent(EventB(np.datetime64("2024-03-31T12:00:00.456", "ms"), 2, 3.3))
Querying Results
The output is as follows:
print(s.run("select * from result"))
# output
eventTime eventType a b
0 2024-03-31 12:00:00.123 A 1 NaN
1 2024-03-31 12:00:00.456 B 2 3.3