Complex Event Processing
The DolphinDB 3.00.0 introduces a new feature - Complex Event Processing (CEP) -
to find patterns in event data that enable detection of opportunities and threats. To
better interact with server, Java API 3.00.0.0 provides EventSchema
,
EventSender
and EventClient
classes for
constructing, writing, and subscribing to events.
This section describes event-related operations, including:
- Defining events.
- Writing events to the heterogeneous stream table as a data source for the CEP engine on the DolphinDB server.
- Subscribing to events in the heterogeneous stream table output by the CEP engine on the DolphinDB server.
For more details, see the "Complex Event Processing" chapter of the DolphinDB server user manual.
Defining Events (EventSchema
)
EventSchema
can be used to define events on API client.
public class EventSchema { private String eventType; private List<String> fieldNames; private List<Entity.DATA_TYPE> fieldTypes; private List<Entity.DATA_FORM> fieldForms; private List<Integer> fieldExtraParams; }
Parameters
- eventType: A String indicating the event.
- fieldNames: A String indicating the field names.
- fieldTypes: Data types of fields.
- fieldForms: Data forms of fields.
- fieldExtraParams: A non-negative int used to specify the scale when the field type is DECIMAL.
Sending Events (EventSender
)
EventSender
can be used to write serialized event data into
heterogeneous stream tables, which are subscribed by CEP engines in DolphinDB to
capture and process the events.
Constructor
public EventSender(DBConnection conn, String tableName, List<EventSchema> eventSchemas, List<String> eventTimeFields, List<String> commonFields)
Parameters
-
conn: A successfully connected
DBConnection
object, which should not be in asynchronous mode. -
tableName: A String indicating the name of the heterogeneous stream table to write into.
-
eventSchemas: All event schemas the CEP engine can process, i.e., all events to be sent. Note that the event schemas must match those specified when creating the CEP engine.
-
eventTimeFileds (optional): The time field for each event. It must be specified if the first column of the heterogeneous stream table is of temporal type.
- It is a String if the field name is the same for all events.
- It is a String list if field names vary across events. The order of strings must match that of the events specified in eventSchemas.
-
commonFields(optional): A String list indicating the fields with the same name across events. These fields can be stored as separate columns in the stream table for filtering during subscription.
Sending Events
Send events to the DolphinDB server.
public void sendEvent(String eventType, List<Entity> attributes)
Parameters
- eventType: The event type.
- attributes: The values of each member in the event. Note that the
order of the values must match the field order specified when defining the
EventSchema
, and their data types and forms must also match.
Subscribing to Events (EventClient
)
The API provides the EventClient
tool for subscribing to events in
heterogeneous stream tables. These tables typically receive event data processed and
serialized by the DolphinDB CEP engine.
Constructor
public EventClient(List<EventSchema> eventSchemas, List<String> eventTimeFileds, List<String> commonFields)
subscribe
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, String userName, String password)
Parameters
- host: IP address of the publisher node.
- port: Port number of the publisher node.
- tableName: Name of the publisher stream table.
- actionName (optional, default "javaStreamingApi"): The subscription task.
- handler: A user-defined function to process the subscribed data.
- offset (optional, default -1): An integer indicating the position of the first message where the subscription begins. A message is a row of the stream table. If offset is unspecified, negative or exceeding the number of rows in the stream table, the subscription starts with the next new message. offset is relative to the first row of the stream table when it is created. If some rows were cleared from memory due to cache size limit, they are still considered in determining where the subscription starts.
- reconnect (optional, default false): Whether to enable reconnection.
- userName (optional, default ""): Username used to connect to the server
- password (optional, default ""): Password.
Callback Function
For the callback function, the first parameter indicates the event name, and the second parameter represents the values of each member.
EventMessageHandler handler = new EventMessageHandler() {
@Override
public void doEvent(String eventType, List<Entity> attribute) {
// Handling logic
}
};
Canceling Subscription
public void unsubscribe(String host, int port, String tableName, String actionName)
Parameters
- host: IP address of the publisher node.
- port: Port number of the publisher node.
- tableName: Name of the publisher stream table.
- actionName: The subscription task.
Usage Example
This example introduces how to create events on the server, interfaces for
serializing events, heterogeneous stream tables for writing and outputting events,
and the CEP engine for processing events. Then use EventSender
on
the API to send events to the heterogeneous stream table. After the CEP engine
processes and outputs messages to another heterogeneous stream table through
subscription, use EventClient
on the API side to subscribe to the
output events.
The following script will create two heterogeneous stream tables
input
and output
, two
streamEventSerializer
(serialization interfaces), a CEP engine,
and a MarketData
event on the DolphinDB server.
login("admin","123456"); class MarketData{ market :: STRING code :: STRING price :: DOUBLE qty :: INT eventTime :: TIMESTAMP def MarketData(m,c,p,q){ market = m code = c price = p qty = q eventTime = now() } } class MainMonitor{ def MainMonitor(){ } def updateMarketData(event) def onload(){ addEventListener(updateMarketData,'MarketData',,'all') } def updateMarketData(event){ emitEvent(event) } } dummy = table(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs) share streamTable(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as intput share streamTable(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as output schema = table(1:0, `eventType`eventFields`eventValuesTypeString`eventValueTypeID`eventValuesFormID, [STRING, STRING, STRING, INT[], INT[]]) insert into schema values("MarketData", "market,code,price,qty,eventTime", "STRING,STRING,DOUBLE,INT,TIMESTAMP", [18 18 16 4 12], [0 0 0 0 0]) inputSerializer = streamEventSerializer(name=`serInput, eventSchemes=schema, outputTable=intput, eventTimeField = "eventTime") outputSerializer = streamEventSerializer(name=`serOutput, eventSchemes=schema, outputTable=output, eventTimeField = "eventTime") engine = createCEPEngine('cep1', <MainMonitor()>, dummy, [MarketData], 1, 'eventTime', 10000, outputSerializer) subscribeTable(,`intput, `subopt, 0, getStreamEngine('cep1'),true) marketData1 = MarketData('sz', 's001', 9.8, 100) marketData2 = MarketData('sz', 's002', 6.9, 100) marketData3 = MarketData('sz', 's003', 4.8, 100) marketData4 = MarketData('sz', 's004', 9.8, 100) marketData5 = MarketData('sz', 's005', 9.8, 100) appendEvent(inputSerializer, [marketData1,marketData2,marketData3,marketData4,marketData5])
Then write data to input
via EventSender
on
API.
// Define schema EventSchema schema = new EventSchema(); schema.setEventType("MarketData"); schema.setFieldNames(Arrays.asList("market", "code", "price", "qty", "eventTime")); schema.setFieldTypes(Arrays.asList(DT_STRING, DT_STRING, DT_DOUBLE, DT_INT, DT_TIMESTAMP)); schema.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR, DF_SCALAR, DF_SCALAR, DF_SCALAR)); List<EventSchema> eventSchemas = Collections.singletonList(scheme); List<String> eventTimeFields = Collections.singletonList("eventTime"); List<String> commonFields = new ArrayList<>(); // Create EventSender DBConnection conn = new DBConnection(); conn.connect(HOST, PORT, "admin", "123456"); EventSender sender = new EventSender(conn, "input", eventSchemas, eventTimeFields, commonFields); // Prepare data List<Entity> attributes = new ArrayList<>(); attributes.add(new BasicString("sz")); attributes.add(new BasicString("s001")); attributes.add(new BasicDouble(9.8)); attributes.add(new BasicInt(100)); attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,13,15,11,10,630))); sender.sendEvent("MarketData", attributes);
The stream table input is subscribed by a CEP engine, which generates events to the output table.
Then use EventClient
to subscribe to the output table.
EventSchema scheme = new EventSchema(); scheme.setEventType("MarketData"); scheme.setFieldNames(Arrays.asList("market", "code", "price", "qty", "eventTime")); scheme.setFieldTypes(Arrays.asList(DT_STRING, DT_STRING, DT_DOUBLE, DT_INT, DT_TIMESTAMP)); scheme.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR, DF_SCALAR, DF_SCALAR, DF_SCALAR)); List<EventSchema> eventSchemas = Collections.singletonList(scheme); List<String> eventTimeFields = Collections.singletonList("eventTime"); List<String> commonFields = new ArrayList<>(); // user-defined callback EventMessageHandler handler = new EventMessageHandler() { @Override public void doEvent(String eventType, List<Entity> attribute) { // user-defined processing System.out.println("eventType: " + eventType); for (Entity entity : attribute) { System.out.println(entity.getString()); } System.out.println("\n"); } }; // create EventClient EventClient client = new EventClient(eventSchemas, eventTimeFields, commonFields); // create subscription client.subscribe(HOST, PORT, "output", "ss1", handler, 0, true, "admin", "123456"); Thread.sleep(10000); // cancel subscription client.unsubscribe(HOST, PORT, "output", "ss1");