Complex Event Processing

The DolphinDB 3.00.0 introduces a new feature - Complex Event Processing (CEP) - to find patterns in event data that enable detection of opportunities and threats. To better interact with server, Java API 3.00.0.0 provides EventSchema, EventSender and EventClient classes for constructing, writing, and subscribing to events.

This section describes event-related operations, including:

  • Defining events.
  • Writing events to the heterogeneous stream table as a data source for the CEP engine on the DolphinDB server.
  • Subscribing to events in the heterogeneous stream table output by the CEP engine on the DolphinDB server.

For more details, see the "Complex Event Processing" chapter of the DolphinDB server user manual.

Defining Events (EventSchema)

EventSchema can be used to define events on API client.

public class EventSchema {
  private String eventType;                         
  private List<String> fieldNames;                  
  private List<Entity.DATA_TYPE> fieldTypes;           
  private List<Entity.DATA_FORM> fieldForms;            
  private List<Integer> fieldExtraParams;              
}

Parameters

  • eventType: A String indicating the event.
  • fieldNames: A String indicating the field names.
  • fieldTypes: Data types of fields.
  • fieldForms: Data forms of fields.
  • fieldExtraParams: A non-negative int used to specify the scale when the field type is DECIMAL.

Sending Events (EventSender)

EventSender can be used to write serialized event data into heterogeneous stream tables, which are subscribed by CEP engines in DolphinDB to capture and process the events.

Constructor

public EventSender(DBConnection conn, String tableName, List<EventSchema> eventSchemas, List<String> eventTimeKeys, List<String> commonKeys)

Parameters

  • conn: A successfully connected DBConnection object, which should not be in asynchronous mode.

  • tableName: A String indicating the name of the heterogeneous stream table to write into.

  • eventSchemas: All event schemas the CEP engine can process, i.e., all events to be sent. Note that the event schemas must match those specified when creating the CEP engine.

  • eventTimeKeys (optional): The time field for each event. It must be specified if the first column of the heterogeneous stream table is of temporal type.

    • It is a String if the field name is the same for all events.
    • It is a String list if field names vary across events. The order of strings must match that of the events specified in eventSchemas.
  • commonKeys(optional): A String list indicating the fields with the same name across events. These fields can be stored as separate columns in the stream table for filtering during subscription.

Sending Events

Send events to the DolphinDB server.

public void sendEvent(String eventType, List<Entity> attributes)

Parameters

  • eventType: The event type.
  • attributes: The values of each member in the event. Note that the order of the values must match the field order specified when defining the EventSchema, and their data types and forms must also match.

Subscribing to Events (EventClient)

The API provides the EventClient tool for subscribing to events in heterogeneous stream tables. These tables typically receive event data processed and serialized by the DolphinDB CEP engine.

Constructor

public EventClient(List<EventSchema> eventSchemas, List<String> eventTimeKeys, List<String> commonKeys)

subscribe

public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, String userName, String password)

Parameters

  • host: IP address of the publisher node.
  • port: Port number of the publisher node.
  • tableName: Name of the publisher stream table.
  • actionName (optional, default "javaStreamingApi"): The subscription task.
  • handler: A user-defined function to process the subscribed data.
  • offset (optional, default -1): An integer indicating the position of the first message where the subscription begins. A message is a row of the stream table. If offset is unspecified, negative or exceeding the number of rows in the stream table, the subscription starts with the next new message. offset is relative to the first row of the stream table when it is created. If some rows were cleared from memory due to cache size limit, they are still considered in determining where the subscription starts.
  • reconnect (optional, default false): Whether to enable reconnection.
  • userName (optional, default ""): Username used to connect to the server
  • password (optional, default ""): Password.

Callback Function

For the callback function, the first parameter indicates the event name, and the second parameter represents the values of each member.

EventMessageHandler handler = new EventMessageHandler() {
    @Override
    public void doEvent(String eventType, List<Entity> attribute) {
        // Handling logic
    }
};

Canceling Subscription

public void unsubscribe(String host, int port, String tableName, String actionName)

Parameters

  • host: IP address of the publisher node.
  • port: Port number of the publisher node.
  • tableName: Name of the publisher stream table.
  • actionName: The subscription task.

Usage Example

This example introduces how to create events on the server, interfaces for serializing events, heterogeneous stream tables for writing and outputting events, and the CEP engine for processing events. Then use EventSender on the API to send events to the heterogeneous stream table. After the CEP engine processes and outputs messages to another heterogeneous stream table through subscription, use EventClient on the API side to subscribe to the output events.

The following script will create two heterogeneous stream tables input and output, two streamEventSerializer (serialization interfaces), a CEP engine, and a MarketData event on the DolphinDB server.

login("admin","123456");

class MarketData{
  market :: STRING
  code :: SYMBOL
  price :: DOUBLE
  qty :: INT
  eventTime :: TIMESTAMP
  def MarketData(m,c,p,q){
    market = m
    code = c
    price = p
    qty = q
    eventTime = now()   
  }
}   

class MainMonitor{
  
  def MainMonitor(){
  }
  def updateMarketData(event)

  def onload(){
    addEventListener(updateMarketData,'MarketData',,'all')
      }

  def updateMarketData(event){
    emitEvent(event)
  }
}

dummy = table(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
share streamTable(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as intput
share streamTable(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as output
schema = table(1:0, `eventType`eventKeys`eventValuesTypeString`eventValueTypeID`eventValuesFormID, [STRING, STRING, STRING, INT[], INT[]])
insert into schema values("MarketData", "market,code,price,qty,eventTime", "STRING,STRING,DOUBLE,INT,TIMESTAMP", [18 18 16 4 12], [0 0 0 0 0])
inputSerializer = streamEventSerializer(name=`serInput, eventSchemes=schema, outputTable=intput, eventTimeKey = "eventTime")
outputSerializer = streamEventSerializer(name=`serOutput, eventSchemes=schema, outputTable=output, eventTimeKey = "eventTime")

engine = createCEPEngine('cep1', <MainMonitor()>, dummy, [MarketData], 1, 'eventTime', 10000, outputSerializer)
subscribeTable(,`intput, `subopt, 0, getStreamEngine('cep1'),true)

marketData1 = MarketData('sz', 's001', 9.8, 100)
marketData2 = MarketData('sz', 's002', 6.9, 100)
marketData3 = MarketData('sz', 's003', 4.8, 100)
marketData4 = MarketData('sz', 's004', 9.8, 100)
marketData5 = MarketData('sz', 's005', 9.8, 100)
appendEvent(inputSerializer, [marketData1,marketData2,marketData3,marketData4,marketData5])

Then write data to input via EventSender on API.

// Define schema
EventSchema schema = new EventSchema();
schema.setEventType("MarketData");
schema.setFieldNames(Arrays.asList("market", "code", "price", "qty", "eventTime"));
schema.setFieldTypes(Arrays.asList(DT_STRING, DT_STRING, DT_DOUBLE, DT_INT, DT_TIMESTAMP));
schema.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR, DF_SCALAR, DF_SCALAR, DF_SCALAR));
List<EventSchema> eventSchemas = Collections.singletonList(scheme);
List<String> eventTimeKeys = Collections.singletonList("eventTime");
List<String> commonKeys = new ArrayList<>();

// Create EventSender
DBConnection conn = new DBConnection(true);
conn.connect(HOST, PORT, "admin", "123456");
EventSender sender = new EventSender(conn, "input", eventSchemas, eventTimeKeys, commonKeys);

// Prepare data
List<Entity> attributes = new ArrayList<>();
attributes.add(new BasicString("sz"));
attributes.add(new BasicString("s001"));
attributes.add(new BasicDouble(9.8));
attributes.add(new BasicInt(100));
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,13,15,11,10,630)));

sender.sendEvent("MarketData", attributes);

The stream table input is subscribed by a CEP engine, which generates events to the output table.

Then use EventClient to subscribe to the output table.

EventSchema scheme = new EventSchema();
scheme.setEventType("MarketData");
scheme.setFieldNames(Arrays.asList("market", "code", "price", "qty", "eventTime"));
scheme.setFieldTypes(Arrays.asList(DT_STRING, DT_STRING, DT_DOUBLE, DT_INT, DT_TIMESTAMP));
scheme.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR, DF_SCALAR, DF_SCALAR, DF_SCALAR));
List<EventSchema> eventSchemas = Collections.singletonList(scheme);
List<String> eventTimeKeys = Collections.singletonList("eventTime");
List<String> commonKeys = new ArrayList<>();

// user-defined callback
EventMessageHandler handler = new EventMessageHandler() {
    @Override
    public void doEvent(String eventType, List<Entity> attribute) {
        // user-defined processing
        System.out.println("eventType: " + eventType);
        for (Entity entity : attribute) {
            System.out.println(entity.getString());
        }
        System.out.println("\n");
    }
};

// create EventClient
EventClient client = new EventClient(eventSchemas, eventTimeKeys, commonKeys);
// create subscription
client.subscribe(HOST, PORT, "output", "ss1", handler, 0, true, "admin", "123456");
Thread.sleep(10000);

// cancel subscription
client.unsubscribe(HOST, PORT, "output", "ss1");