streamFilter

Syntax

streamFilter(name, dummyTable, filter, [msgSchema], [timeColumn], [conditionColumn])

Details

Create an engine that splits the ingested stream for different handlers. Return a table object.

The engine works as follows:

  1. Deserialize the ingested data.

    Note that this step only takes place when the ingested data is from a heterogeneous stream table, i.e., the output of heterogeneous replay.

  2. Split the ingested stream based on the conditions as specified by filter.

  3. Ingest the split streams to the handlers as specified by filter in the order of their timestamps.

Note:

Starting from version 1.30.18/2.00.6, in addition to heterogeneous stream tables, streamFilter also supports processing data from standard stream tables.

Arguments

name is a STRING scalar indicating the name of the stream filter engine. It must begin with a letter and may contain letters, numbers and underscores.

dummyTable is a table. It has the same schema as the stream table that the stream filter subscribes to. The table can be empty.

filter is a dictionary or a tuple of dictionaries. It defines how to process the ingested data. Each dictionary can have the following key-value pairs:
  • 'timeRange' (optional) is a pair or a tuple of pairs. Apply it to timeColumn to filter for records in the specified time range. Note: When processing a standard stream table, timeRange must have the same data type as timeColumn.

  • 'condition':

    • When processing a heterogeneous stream table: It is a STRING referring to a dictionary key from the inputTables of replay. The engine will filter records by the specified key.

    • When processing a standard stream table: It is a STRING scalar/vector indicating the value(s) from the conditionColumn, or metacode of one or more Boolean expressions (can contain built-in functions; cannot contain partial applications). The engine will filter records by the specified condition.

  • 'handler' is a unary function or a table (can be the table object returned by a streaming engine).

    • If it's a function, the filter result (a table) is passed as the function's sole argument.

    • If it's a table object, the filtered data are inserted into the table directly.

msgSchema (optional) is a dictionary
  • When processing a heterogeneous stream table: The dictionary indicates the input tables of replay. The keys are the table identifiers as specified in the inputTables parameter of replay and the values indicate the schema of each table. The ingested data will be parsed based on msgSchema.

  • When processing a standard stream table: Do not specify the parameter.

The following parameters are only required when processing data from a standard stream table:

timeColumn (optional) is a STRING indicating the name of the temporal column in dummyTable. If unspecified, it takes the name of the first column in the dummyTable.

conditionColumn (optional) is a STRING indicating a column (must be STRING or SYMBOL type) in dummyTable. If this parameter is unspecified, the "condition" key of filter takes no effect.

Examples

(1) Processing a heterogeneous stream table:

Replay the DFS tables "orders" and "trades" to simulate an asof join of the two streams.

If we simply perform an N-to-N replay on the two tables, it is not guaranteed that the records will be ingested to the left and right tables of the asof join engine in chronological order: It may happen that a record with a larger timestamp arrives in the left table before a record with a smaller timestamp arrives in the right table. For more information, see replay.

Therefore, we replay the two tables into one heterogeneous stream table to make sure all records are ordered by timestamp. The stream filter subscribes to the heterogenous stream table, splits the ingested data into two streams and distributes them to the left and right tables of the asof join engine (createAsofJoinEngine). In this way, we can make sure that the data is ingested to the left and right tables of the asof join engine in the order of the timestamps.

//create the "orders" table
n=1000
sym = take(take("IBM",n).join(take("GS",n)), n*2*3)
date=take(2022.01.04..2022.01.06, n*2*3).sort!()
timestamp1=take(2022.01.04 09:30:00.000+rand(1000,n),n) join take(2022.01.04 09:31:00.000+rand(1000,n),n)
timestamp2=take(2022.01.05 09:30:00.000+rand(1000,n),n) join take(2022.01.05 09:31:00.000+rand(1000,n),n)
timestamp3=take(2022.01.06 09:30:00.000+rand(1000,n),n) join take(2022.01.06 09:31:00.000+rand(1000,n),n)
timestamp=timestamp1 join timestamp2 join timestamp3
volume = rand(100, n*2*3)
t=table(sym,date,timestamp,volume)

if(existsDatabase("dfs://test_order")){
dropDatabase("dfs://test_order")
}
db1_or=database("",RANGE, 2022.01.04..2022.01.07)
db2_or=database("",VALUE,`IBM`GS)
db_or=database("dfs://test_order",COMPO,[db1_or, db2_or])
orders=db_or.createPartitionedTable(t,`orders,`date`sym)
orders.append!(t);
select count(*) from orders
// output: 6000

//create the "trades" table
n=2000
sym = take(take("IBM",n).join(take("GS",n)), n*2*3)
date=take(2022.01.04..2022.01.06, n*2*3).sort!()
timestamp1=take(2022.01.04 09:30:00.000+rand(1000,n),n) join take(2022.01.04 09:31:00.000+rand(1000,n),n)
timestamp2=take(2022.01.05 09:30:00.000+rand(1000,n),n) join take(2022.01.05 09:31:00.000+rand(1000,n),n)
timestamp3=take(2022.01.06 09:30:00.000+rand(1000,n),n) join take(2022.01.06 09:31:00.000+rand(1000,n),n)
timestamp=timestamp1 join timestamp2 join timestamp3
volume = rand(100, n*2*3)
price = rand(50.0, n*3) join  rand(20.0, n*3)

t=table(sym,date,timestamp,volume,price)

if(existsDatabase("dfs://test_trades")){
dropDatabase("dfs://test_trades")
}
db1=database("",RANGE, 2022.01.04..2022.01.07)
db2=database("",VALUE,`IBM`GS)
db=database("dfs://test_trades",COMPO,[db1, db2])
trades=db.createPartitionedTable(t,`trades,`date`sym)
trades.append!(t);
select count(*) from trades
// output: 12000

//generate the heterogeneous data sources and create a table as the the outputTable of replay()
ds_or = replayDS(sqlObj=<select * from loadTable(db_or, `orders)>, dateColumn=`date, timeColumn=`timestamp)
ds = replayDS(sqlObj=<select * from loadTable(db, `trades)>, dateColumn=`date, timeColumn=`timestamp)
input_dict=dict(["orders","trades"], [ds_or, ds])
share streamTable(100:0,`timestamp`sym`blob`volume, [TIMESTAMP,SYMBOL, BLOB, INT]) as opt


//subscribe to the output table of replay to ingest the data to the stream filter
share streamTable(100:0,`timestamp`sym`blob`volume, [TIMESTAMP,SYMBOL, BLOB, INT]) as streamFilterOpt
share streamTable(100:0, `sym`date`timestamp`volume, [SYMBOL, DATE, TIMESTAMP, INT] ) as streamOrders
share streamTable(100:0, `sym`date`timestamp`volume`price, [SYMBOL, DATE, TIMESTAMP, INT, DOUBLE] ) as streamTrades
streamOpt=table(100:0, `timestamp`sym`volume`price`result, [TIMESTAMP, SYMBOL, INT, DOUBLE, DOUBLE])

filter1=dict(STRING,ANY)
filter1['condition']=`orders
filter1['timeRange']=09:30:00.000:09:30:00.005

filter2=dict(STRING,ANY)
filter2['condition']=`trades
filter2['timeRange']=09:30:00.000:09:30:00.005

ajEngine=createAsofJoinEngine(name="ajEngine", leftTable=streamOrders, rightTable=streamTrades, outputTable=streamOpt, metrics=<[volume,price,price*volume]>, matchingColumn=`sym, useSystemTime=true)
filter1['handler']=getLeftStream(ajEngine)
filter2['handler']=getRightStream(ajEngine)
schema=dict(["orders","trades"], [streamOrders, streamTrades])

engine=streamFilter(name=`streamFilter,dummyTable=streamFilterOpt, filter=[filter1,filter2],msgSchema=schema)
subscribeTable(tableName="opt", actionName="sub1", offset=0, handler=engine, msgAsTable=true)

//replay the heterogeneous data sources and output to the table "opt"
replay(inputTables=input_dict,outputTables=opt, timeColumn=`timestamp)

select count(*) from streamOpt
// output: 20

//drop the subscription
unsubscribeTable(tableName="opt", actionName="sub1")
dropStreamEngine(`streamFilter)
dropStreamEngine(`ajEngine)

(2) Processing standard stream table:

In this example, data from the standard stream table "trades" is ingested to the stream filter, where the records are filtered and assigned to handlers for further processing.

n=20
sym = symbol(take(`A`B`C,n))
name = string(rand(1..10,n))
date = temporalAdd(2012.12.06,0..(n-1),'d')
time = temporalAdd(09:30:00.000,0..(n-1),'ms')
vol = 100+take(1..8,20)
t = table(date,time,sym,name,vol)

share streamTable(100:0,`date`time`sym`name`vol,[DATE,TIME,SYMBOL,STRING,INT]) as st1
share streamTable(100:0,`date`time`sym`name`vol,[DATE,TIME,SYMBOL,STRING,INT]) as st2
share streamTable(100:0,`date`time`sym`name`vol,[DATE,TIME,SYMBOL,STRING,INT]) as st3


share streamTable(100:0,`time`sym`sum_vol,[TIME,SYMBOL,INT]) as output1
share streamTable(100:0,`time`avg_vol,[TIME,INT]) as output2

// create 2 streaming engines has the handlers of the stream filter
engine1=createTimeSeriesEngine(name="timeEngine", windowSize=3, step=3, metrics=<[sum(vol)]>, dummyTable=st3, outputTable=output1, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50)
engine2=createReactiveStateEngine(name="reactiveEngine", metrics=<[mavg(vol, 3)]>, dummyTable=st1, outputTable=output2, keyColumn=`sym)

//share "trades" as the stream table to be subscribed by the stream filter
share streamTable(100:0,`date`time`sym`name`vol,[DATE,TIME,SYMBOL,STRING,INT]) as trades

//set the first filter and ingest the result to engine2
filter1 = dict(STRING,ANY)
filter1['condition']=`A
filter1['handler']=engine2
filter1['timeRange']=(09:30:00.001:09:30:00.010,09:29:00.000:09:30:00.000)

//set the second filter and ingest the result to st2
filter2 = dict(STRING,ANY)
filter2['handler']=st2
filter2['timeRange']=09:30:00.002:09:30:00.005

//set the first filter and ingest the result to engine1
filter3 = dict(STRING,ANY)
filter3['condition']=`C`A
filter3['handler']=engine1

///The stream filter subscribes to the stream table "trades" and distributes the ingested data based on the specified conditions
streamFilter2=streamFilter(name="streamFilterDemo",dummyTable=trades,filter=[filter1,filter2,filter3], timeColumn=`time, conditionColumn=`sym)
subscribeTable(tableName="trades", actionName="sub1", offset=0, handler=streamFilter2, msgAsTable=true)
trades.append!(t)
select * from output1
time sym sum_vol
09:30:00.003 A 101
09:30:00.003 C 103
09:30:00.006 A 104
09:30:00.006 C 106
09:30:00.009 A 107
09:30:00.009 C 101
09:30:00.012 A 102
09:30:00.012 C 104
09:30:00.015 A 105
09:30:00.015 C 107
09:30:00.018 A 108
select * from output2
time avg_vol
00:00:00.001
00:00:00.001
00:00:00.001 104
00:00:00.001 104
select * from st2
date time sym name vol
2012.12.08 09:30:00.002 C 6 103
2012.12.09 09:30:00.003 A 8 104
2012.12.10 09:30:00.004 B 10 105
2012.12.11 09:30:00.005 C 10 106
2012.12.12 09:30:00.006 A 10 107
2012.12.13 09:30:00.007 B 1 108
2012.12.14 09:30:00.008 C 3 101
2012.12.15 09:30:00.009 A 4 102
2012.12.16 09:30:00.010 B 9 103

"condition" can also be specified as Boolean expressions to support more complex filter logic. In the example above, replace the value of "condition" in filter2 with a Boolean expression to filter the data based on the columns "vol" and "date".

filter2 = dict(STRING,ANY)
filter2['condition'] = <sym==`A and 101<vol<105 and date<2012.12.15>
filter2['handler'] = st2
filter2['timeRange'] = 09:30:00.002:09:30:00.010

select * from st2
date time sym name vol
2012.12.09 09:30:00.003 A 7 104