appendEventWithResponse
Syntax
appendEventWithResponse(engine, event, responseType, [timeout=5000],
[condition], [returnType="instance"])
Details
A synchronous event-processing function for CEP engines. It sends an event to the engine and blocks the current thread until a matching response event is received or the wait times out.
Unlike appendEvent that operates asynchronously, this function adopts a synchronous request-response pattern.
Parameters
engine is a CEP engine handle or name.
event is a class object of the event instance or a dictionary. If it is a dictionary, the event instances will be automatically constructed with the key-value pairs provided. The keys of the dictionary must include the "eventType" and all fields declared by the event type.
responseType is a STRTING scalar specifying the expected type of the response event.
timeout (optional) is an INT scalar specifying the maximum wait time for the response in milliseconds. The default is 5000 ms.
condition (optional) is metacode of Boolean expression to ifilter response
events, e.g. <event.id = 1>. If condition is specified,
both the event type and the filter condition must be satisfied for an event to be
accepted.
returnType is a STRING scalar specifying the format of the returned response event. The options include:
-
"dict": Returns the response event as a dictionary.
-
"instance" (default): Returns the response event as an event instance.
Returns
The return value depends on the returnType parameter:
- If returnType="dict", the function returns a dictionary.
- If returnType="instance", the function returns an event instance.
Examples
This example demonstrates how to implement strategy start requests with synchronous responses.
An external caller sends a StartRequest event to the CEP engine to initiate a strategy start request. The CEP monitor receives and processes the event, executes the corresponding startup logic, and returns the result via a StartResponse event.
The caller uses the appendEventWithResponse function to
synchronously wait for the startup response of a specific strategy instance, matched
by instanceId, thereby implementing an event-based request–response interaction
pattern.
try {
dropStreamEngine(`cep)
} catch(ex) { print(ex) }
try {
dropStreamEngine(`streamEventSerializer)
} catch(ex) { print(ex) }
class StartRequest{
instanceId :: STRING
strategyParams :: INT
def StartRequest(instanceId_, strategyParams_){
instanceId = instanceId_
strategyParams = strategyParams_
}
}
class StartResponse{
instanceId :: STRING
status :: STRING
def StartResponse(instanceId_, status_){
instanceId = instanceId_
status = status_
}
}
class Stratege:CEPMonitor {
// Custructor
def Stratege(){
}
def startStratege(StartRequestEvent){
// Start a strategy instance
}
def processStartRequest(StartRequestEvent){
startStratege(StartRequestEvent)
// Send a strategy start success response
emitEvent(StartResponse(StartRequestEvent.instanceId, "success"))
}
def onload() {
// Register an event listener for strategy start requests
addEventListener(handler=processStartRequest, eventType="StartRequest", times="all")
}
}
dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as outputTable
serializer = streamEventSerializer(name=`streamEventSerializer, eventSchema=[StartResponse], outputTable=outputTable)
// Create a CEP engine
engine = createCEPEngine(name='cep1', monitors=<Stratege()>, dummyTable=dummy, eventSchema=[StartRequest], outputTable=serializer)
// Send a strategy start request and synchronously wait for the successful startup response of strategy instance instance_001
request = StartRequest(`instance_001, 100)
response = appendEventWithResponse(engine=getStreamEngine(`cep1), event=request, responseType="StartResponse", timeout=3000, condition=<StartResponse.instanceId==request.instanceId>)
Related function: appendEvent
