appendEventWithResponse

Syntax

appendEventWithResponse(engine, event, responseType, [timeout=5000], [condition], [returnType="instance"])

Details

A synchronous event-processing function for CEP engines. It sends an event to the engine and blocks the current thread until a matching response event is received or the wait times out.

Unlike appendEvent that operates asynchronously, this function adopts a synchronous request-response pattern.

Parameters

engine is a CEP engine handle or name.

event is a class object of the event instance or a dictionary. If it is a dictionary, the event instances will be automatically constructed with the key-value pairs provided. The keys of the dictionary must include the "eventType" and all fields declared by the event type.

responseType is a STRTING scalar specifying the expected type of the response event.

timeout (optional) is an INT scalar specifying the maximum wait time for the response in milliseconds. The default is 5000 ms.

condition (optional) is metacode of Boolean expression to ifilter response events, e.g. <event.id = 1>. If condition is specified, both the event type and the filter condition must be satisfied for an event to be accepted.

returnType is a STRING scalar specifying the format of the returned response event. The options include:

  • "dict": Returns the response event as a dictionary.

  • "instance" (default): Returns the response event as an event instance.

Returns

The return value depends on the returnType parameter:

  • If returnType="dict", the function returns a dictionary.
  • If returnType="instance", the function returns an event instance.

Examples

This example demonstrates how to implement strategy start requests with synchronous responses.

An external caller sends a StartRequest event to the CEP engine to initiate a strategy start request. The CEP monitor receives and processes the event, executes the corresponding startup logic, and returns the result via a StartResponse event.

The caller uses the appendEventWithResponse function to synchronously wait for the startup response of a specific strategy instance, matched by instanceId, thereby implementing an event-based request–response interaction pattern.

try { 
    dropStreamEngine(`cep) 
} catch(ex) { print(ex) }

try {
    dropStreamEngine(`streamEventSerializer)
} catch(ex) { print(ex) }


class StartRequest{
    instanceId :: STRING
    strategyParams :: INT
    def StartRequest(instanceId_, strategyParams_){
        instanceId = instanceId_
        strategyParams = strategyParams_
    }
}

class StartResponse{
    instanceId :: STRING
    status :: STRING
    def StartResponse(instanceId_, status_){
        instanceId = instanceId_
        status = status_
    }
}

class Stratege:CEPMonitor {
	// Custructor
	def Stratege(){
	}
    def startStratege(StartRequestEvent){
        // Start a strategy instance
    }
	def processStartRequest(StartRequestEvent){
        startStratege(StartRequestEvent)
        // Send a strategy start success response
        emitEvent(StartResponse(StartRequestEvent.instanceId, "success"))        
    }
	def onload() {
        // Register an event listener for strategy start requests
		addEventListener(handler=processStartRequest, eventType="StartRequest", times="all")
	}
}

dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as outputTable
serializer = streamEventSerializer(name=`streamEventSerializer, eventSchema=[StartResponse], outputTable=outputTable)
// Create a CEP engine
engine = createCEPEngine(name='cep1', monitors=<Stratege()>, dummyTable=dummy, eventSchema=[StartRequest], outputTable=serializer)

// Send a strategy start request and synchronously wait for the successful startup response of strategy instance instance_001
request = StartRequest(`instance_001, 100)
response = appendEventWithResponse(engine=getStreamEngine(`cep1), event=request, responseType="StartResponse", timeout=3000, condition=<StartResponse.instanceId==request.instanceId>)

Related function: appendEvent