appendEventWithResponse

语法

appendEventWithResponse(engine, event, responseType, [timeout=5000], [condition], [returnType="instance"])

详情

CEP 引擎的同步事件处理函数,用于发送事件并阻塞等待特定响应事件返回。

与 appendEvent 的异步模式不同,该函数采用同步等待响应模式:向 CEP 引擎发送事件后,当前线程会阻塞等待,直到收到匹配的响应事件或超时。

参数

engine CEP 引擎句柄或引擎名。

event 事件类型实例或者字典,指定要发送的事件。如果指定为字典,系统会根据键值对构造出事件实例,因此字典的键必须包含 "eventType"(事件类型)和事件类型中声明的字段名(eventField)。

responseType 字符串标量,指定期望接收的响应事件类型。

timeout 可选,整型标量,单位为毫秒,默认为 5000 毫秒。等待响应的超时时间。

condition 可选,元代码类型。指定过滤表达式响应事件进行筛选,如 <event.id=1>。若指定了 condition 参数,系统将在类型匹配的基础上增加条件筛选,只有同时满足类型要求和条件表达式的响应事件才会被接受。

returnType 可选,字符串标量,指定返回的响应事件类型的数据形式。可选值为:

  • "dict":以字典形式返回响应事件。

  • "instance":默认值,返回响应事件实例。

返回值

返回值形式由 returnType 参数控制:

  • returnType="dict" 时,返回一个字典。
  • returnType="instance" 时,返回一个事件实例。

例子

该示例展示了如何实现策略启动请求的处理与同步响应机制。

外部调用方发送 startRequest 事件向 CEP 引擎发起策略启动请求,CEP Monitor 接收并处理该事件,在完成相应的启动逻辑后,通过 StartResponse 事件返回启动结果。

调用方通过 appendEventWithResponse 函数同步等待指定策略实例(以 instanceId 匹配)的启动响应,从而实现基于事件的请求-响应交互模式。

try { 
    dropStreamEngine(`cep) 
} catch(ex) { print(ex) }

try {
    dropStreamEngine(`streamEventSerializer)
} catch(ex) { print(ex) }


class StartRequest{
    instanceId :: STRING
    strategyParams :: INT
    def StartRequest(instanceId_, strategyParams_){
        instanceId = instanceId_
        strategyParams = strategyParams_
    }
}

class StartResponse{
    instanceId :: STRING
    status :: STRING
    def StartResponse(instanceId_, status_){
        instanceId = instanceId_
        status = status_
    }
}

class Stratege:CEPMonitor {
	//构造函数
	def Stratege(){
	}
    def startStratege(StartRequestEvent){
        // 启动策略实例
    }
	def processStartRequest(StartRequestEvent){
        startStratege(StartRequestEvent)
        // 发送策略启动成功响应
        emitEvent(StartResponse(StartRequestEvent.instanceId, "success"))        
    }
	def onload() {
        // 注册事件监听,监听策略启动请求
		addEventListener(handler=processStartRequest, eventType="StartRequest", times="all")
	}
}

dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as outputTable
serializer = streamEventSerializer(name=`streamEventSerializer, eventSchema=[StartResponse], outputTable=outputTable)
// 创建 CEP 引擎
engine = createCEPEngine(name='cep1', monitors=<Stratege()>, dummyTable=dummy, eventSchema=[StartRequest], outputTable=serializer)

// 发送策略启动请求,同步等待策略实例 instance_001 启动成功的响应
request = StartRequest(`instance_001, 100)
response = appendEventWithResponse(engine=getStreamEngine(`cep1), event=request, responseType="StartResponse", timeout=3000, condition=<StartResponse.instanceId==request.instanceId>)

相关函数appendEvent