Complex Event Processing
The DolphinDB 3.00.0 introduces a new feature - Complex Event Processing (CEP) - to find patterns in event data that enable detection of opportunities and threats. To better interact with server, C++ API 3.00.0.0 provides the cep module to define, send, and subscribe to events.
This section describes event-related operations, including:
- Defining events.
- Writing events to the heterogeneous stream table as a data source for the CEP engine on the DolphinDB server.
- Subscribing to events in the heterogeneous stream table output by the CEP engine on the DolphinDB server.
For more details, see the "Complex Event Processing" chapter of the DolphinDB server user manual.
Defining Events (EventSchema
)
EventSchema
can be used to define events on API client.
struct EventSchema{
std::string eventType_; // event type
std::vector<std::string> fieldNames_; // field name
std::vector<DATA_TYPE> fieldTypes_; // field type
std::vector<DATA_FORM> fieldForms_; // field form
std::vector<int> fieldExtraParams_; // specify the scale when the field type is DECIMAL
};
Sending Events (EventSender
)
EventSender
can be used to write serialized event data into
heterogeneous stream tables, which are subscribed by CEP engines in DolphinDB to
capture and process the events.
Note:
To use the event sending feature, include the following .h file.
#include <EventHandler.h>
Constructor
The constructor declaration for EventSender
is as follows:
EventSender(DBConnectionSP conn, const std::string& tableName, const std::vector<EventSchema>& eventSchema, const std::vector<std::string>& eventTimeFields, const std::vector<std::string>& commonFields);
Arguments
- conn: A successfully connected
DBConnection
object, which should not be in asynchronous mode. - tableName: A string indicating the name of the heterogeneous stream table to write into.
- eventSchema: All events to be sent. Note that the event schema must match those specified when creating the CEP engine.
- eventTimeFields: The time field for each event. It must be specified
if the first column of the heterogeneous stream table is of temporal
type.
- It is a string if the field name is the same for all events.
- It is a string vector if field names vary across events. The order of strings must match that of the events specified in eventSchema.
- commonFields: A string vector indicating the fields with the same name across events. These fields can be stored as separate columns in the stream table for filtering during subscription.
Sending Events
Send events to the DolphinDB server.
void sendEvent(const std::string& eventType, const std::vector<ConstantSP>& attributes);
Arguments
- eventType: The event type.
- attributes: The values of each member in the event. Note that the
order of the values must match the field order specified when defining the
EventSchema
, and their data types and forms must also match.
Usage Example
#include "EventHandler.h"
int main() {
EventSchema scheme;
scheme.eventType_ = "EventVectorInt";
scheme.fieldNames_ = {"custom", "eventTime"};
scheme.fieldTypes_ = {DT_INT, DT_TIMESTAMP};
scheme.fieldForms_ = {DF_SCALAR, DF_SCALAR};
scheme.fieldExtraParams_ = {0, 0};
std::vector<EventSchema> eventSchemes{scheme};
std::vector<std::string> eventTimeFields{"eventTime"};
std::vector<std::string> commonFields{"custom"};
auto sender = new EventSender(eventSchemes, eventTimeFields, commonFields);
DBConnectionSP conn = new DBConnection;
conn->connect("127.0.0.1", 8848, "admin", "123456");
sender->connect(conn, "input");
std::vector<ConstantSP> attributes;
VectorSP v = Util::createVector(DT_INT, 0, 10);
int ss = 1;
v->appendInt(&ss, 1);
v->appendInt(&ss, 1);
attributes.push_back(v);
attributes.push_back(Util::createTimestamp(112));
sender->sendEvent("EventVectorInt", attributes);
delete sender;
}
Subscribing to Events (EventClient
)
The API provides the EventClient
tool for subscribing to events in
heterogeneous stream tables. These tables receive event data processed and
serialized by the DolphinDB CEP engine.
Note:
To use the event subscription feature, include the following .h file.
#include "Streaming.h"
Constructor
The constructor declaration for EventClient
is as follows:
EventClient(eventSchema, eventTimeFields=None, commonFields=None)
Arguments
- eventSchema: All events to be sent. Note that the event schema must match those specified when creating the CEP engine.
- eventTimeFields: The time field for each event. It must be specified
if the first column of the heterogeneous stream table is of temporal
type.
- It is a string if the field name is the same for all events.
- It is a string vector if field names vary across events. The order of strings must match that of the events specified in eventSchema.
- commonFields: A string vector indicating the fields with the same name across events. These fields can be stored as separate columns in the stream table for filtering during subscription.
subscribe
ThreadSP subscribe(const string& host, int port, const EventMessageHandler &handler, const string& tableName, const string& actionName = DEFAULT_ACTION_NAME, int64_t offset = -1,
bool resub = true, const string& userName="", const string& password="");
Arguments
- host: IP address of the publisher node.
- port: Port number of the publisher node.
- handler: A user-defined function to process the subscribed data. For
the function called by handler, the first parameter indicates the
event type, and the second parameter represents the event
fields.
using EventMessageHandler = std::function<void(const std::string&, std::vector<ConstantSP>&)>;
-
- tableName: Name of the publisher stream table.
- actionName: The subscription task.
- offset: An integer indicating the position of the first message where the subscription begins. Defaults to -1. A message is a row of the stream table. If offset is unspecified, negative or exceeding the number of rows in the stream table, the subscription starts with the next new message. offset is relative to the first row of the stream table when it is created. If some rows were cleared from memory due to cache size limit, they are still considered in determining where the subscription starts.
- resub: Whether to re-subscribe after a network disconnection.
- userName: Username used to connect to the server
- password: Password.
unsubscribe
The unsubscribe
method cancels an existing subscription:
void unsubscribe(const string& host, int port, const string& tableName, const string& actionName = DEFAULT_ACTION_NAME);
Arguments
- host: The IP address of the publisher node.
- port: The port number of the publisher node.
- tableName: The name of the subscribed stream table.
- actionName: The name of the subscription task.
Usage Example
#include "Streaming.h"
using namespace dolphindb
int main() {
EventSchema scheme;
scheme.eventType_ = "EventVectorBool";
scheme.fieldNames_ = {"custom", "eventTime"};
scheme.fieldTypes_ = {DT_INT, DT_TIMESTAMP};
scheme.fieldForms_ = {DF_VECTOR, DF_SCALAR};
scheme.fieldExtraParams_ = {0, 0};
std::vector<EventSchema> eventSchemes{scheme};
std::vector<std::string> eventTimeFields{"eventTime"};
std::vector<std::string> commonFields{"custom"};
EventMessageHandler handler = [](const std::string& eventType, std::vector<ConstantSP>& datas){
std::cout << "in handler!\n";
std::cout << "eventType " << eventType << std::endl;
for(auto& data : datas){
std::cout << "data " << data->getString() << std::endl;
}
};
EventClient client(eventSchemes, eventTimeFields, commonFields);
client.subscribe("127.0.0.1", 8848, handler, "output", "ss1", 0);
Util::sleep(10000);
client.unsubscribe("127.0.0.1", 8848, "output", "ss1");
return 0;
}