Complex Event Processing

The DolphinDB 3.00.0 introduces a new feature - Complex Event Processing (CEP) - to find patterns in event data that enable detection of opportunities and threats. To better interact with server, C++ API 3.00.0.0 provides the cep module to define, send, and subscribe to events.

This section describes event-related operations, including:

  • Defining events.
  • Writing events to the heterogeneous stream table as a data source for the CEP engine on the DolphinDB server.
  • Subscribing to events in the heterogeneous stream table output by the CEP engine on the DolphinDB server.

For more details, see the "Complex Event Processing" chapter of the DolphinDB server user manual.

Defining Events (EventSchema)

EventSchema can be used to define events on API client.

struct EventSchema{
    std::string                 eventType_;             // event type
    std::vector<std::string>    fieldNames_;            // field name
    std::vector<DATA_TYPE>      fieldTypes_;            // field type
    std::vector<DATA_FORM>      fieldForms_;            // field form
    std::vector<int>            fieldExtraParams_;      // specify the scale when the field type is DECIMAL
};

Sending Events (EventSender)

EventSender can be used to write serialized event data into heterogeneous stream tables, which are subscribed by CEP engines in DolphinDB to capture and process the events.

Note:

To use the event sending feature, include the following .h file.

#include <EventHandler.h>

Constructor

The constructor declaration for EventSender is as follows:

EventSender(DBConnectionSP conn, const std::string& tableName, const std::vector<EventSchema>& eventSchema, const std::vector<std::string>& eventTimeFields, const std::vector<std::string>& commonFields);

Arguments

  • conn: A successfully connected DBConnection object, which should not be in asynchronous mode.
  • tableName: A string indicating the name of the heterogeneous stream table to write into.
  • eventSchema: All events to be sent. Note that the event schema must match those specified when creating the CEP engine.
  • eventTimeFields: The time field for each event. It must be specified if the first column of the heterogeneous stream table is of temporal type.
    • It is a string if the field name is the same for all events.
    • It is a string vector if field names vary across events. The order of strings must match that of the events specified in eventSchema.
  • commonFields: A string vector indicating the fields with the same name across events. These fields can be stored as separate columns in the stream table for filtering during subscription.

Sending Events

Send events to the DolphinDB server.

void sendEvent(const std::string& eventType, const std::vector<ConstantSP>& attributes);

Arguments

  • eventType: The event type.
  • attributes: The values of each member in the event. Note that the order of the values must match the field order specified when defining the EventSchema, and their data types and forms must also match.

Usage Example

#include "EventHandler.h"
int main() {
    EventSchema scheme;
    scheme.eventType_ = "EventVectorInt";
    scheme.fieldNames_ = {"custom", "eventTime"};
    scheme.fieldTypes_ = {DT_INT, DT_TIMESTAMP};
    scheme.fieldForms_ = {DF_SCALAR, DF_SCALAR};
    scheme.fieldExtraParams_ = {0, 0};
    std::vector<EventSchema> eventSchemes{scheme};
    std::vector<std::string> eventTimeKeys{"eventTime"};
    std::vector<std::string> commonKeys{"custom"};
    auto sender = new EventSender(eventSchemes, eventTimeKeys, commonKeys);
    DBConnectionSP conn = new DBConnection;
    conn->connect("127.0.0.1", 8848, "admin", "123456");
    sender->connect(conn, "input");
    std::vector<ConstantSP> attributes;
    VectorSP v = Util::createVector(DT_INT, 0, 10);
    int ss = 1;
    v->appendInt(&ss, 1);
    v->appendInt(&ss, 1);
    attributes.push_back(v);
    attributes.push_back(Util::createTimestamp(112));
    sender->sendEvent("EventVectorInt", attributes);
    delete sender;
}

Subscribing to Events (EventClient)

The API provides the EventClient tool for subscribing to events in heterogeneous stream tables. These tables receive event data processed and serialized by the DolphinDB CEP engine.

Note:

To use the event subscription feature, include the following .h file.

#include "Streaming.h"

Constructor

The constructor declaration for EventClient is as follows:

EventClient(eventSchema, eventTimeFields=None, commonFields=None)

Arguments

  • eventSchema: All events to be sent. Note that the event schema must match those specified when creating the CEP engine.
  • eventTimeFields: The time field for each event. It must be specified if the first column of the heterogeneous stream table is of temporal type.
    • It is a string if the field name is the same for all events.
    • It is a string vector if field names vary across events. The order of strings must match that of the events specified in eventSchema.
  • commonFields: A string vector indicating the fields with the same name across events. These fields can be stored as separate columns in the stream table for filtering during subscription.

subscribe

ThreadSP subscribe(const string& host, int port, const EventMessageHandler &handler, const string& tableName, const string& actionName = DEFAULT_ACTION_NAME, int64_t offset = -1,
        bool resub = true, const string& userName="", const string& password="");

Arguments

  • host: IP address of the publisher node.
  • port: Port number of the publisher node.
  • handler: A user-defined function to process the subscribed data. For the function called by handler, the first parameter indicates the event type, and the second parameter represents the event fields.
    using EventMessageHandler = std::function<void(const std::string&, std::vector<ConstantSP>&)>;
    • tableName: Name of the publisher stream table.
    • actionName: The subscription task.
    • offset: An integer indicating the position of the first message where the subscription begins. Defaults to -1. A message is a row of the stream table. If offset is unspecified, negative or exceeding the number of rows in the stream table, the subscription starts with the next new message. offset is relative to the first row of the stream table when it is created. If some rows were cleared from memory due to cache size limit, they are still considered in determining where the subscription starts.
    • resub: Whether to re-subscribe after a network disconnection.
    • userName: Username used to connect to the server
    • password: Password.

unsubscribe

The unsubscribe method cancels an existing subscription:

void unsubscribe(const string& host, int port, const string& tableName, const string& actionName = DEFAULT_ACTION_NAME);

Arguments

  • host: The IP address of the publisher node.
  • port: The port number of the publisher node.
  • tableName: The name of the subscribed stream table.
  • actionName: The name of the subscription task.

Usage Example

#include "Streaming.h"
using namespace dolphindb
int main() {
    EventSchema scheme;
    scheme.eventType_ = "EventVectorBool";
    scheme.fieldNames_ = {"custom", "eventTime"};
    scheme.fieldTypes_ = {DT_INT, DT_TIMESTAMP};
    scheme.fieldForms_ = {DF_VECTOR, DF_SCALAR};
    scheme.fieldExtraParams_ = {0, 0};
    std::vector<EventSchema> eventSchemes{scheme};
    std::vector<std::string> eventTimeKeys{"eventTime"};
    std::vector<std::string> commonKeys{"custom"};
    EventMessageHandler handler = [](const std::string& eventType, std::vector<ConstantSP>& datas){
        std::cout << "in  handler!\n";
        std::cout << "eventType " << eventType << std::endl;
        for(auto& data : datas){
            std::cout << "data " << data->getString() << std::endl;
        }
    };
    EventClient client(eventSchemes, eventTimeKeys, commonKeys);
    client.subscribe("127.0.0.1", 8848, handler, "output", "ss1", 0);
    Util::sleep(10000);
    client.unsubscribe("127.0.0.1", 8848, "output", "ss1");
    return 0;
}