createEquiJoinEngine
Syntax
createEquiJoinEngine(name, leftTable, rightTable, outputTable, metrics,
matchingColumn, timeColumn, [garbageSize=5000], [maxDelayedTime])
Alias: createEqualJoinEngine
Details
Create an equi join streaming engine. Streams are ingested into the engine through left and right tables and joined on matchingColumn and timeColumn. Return a table object that is the equi join result of a left table and a right table. The result holds all records with matching values.
For more application scenarios, see Streaming Engines.
Calculation Rules
When data is ingested into one table, the equi join streaming engine searches for records with matching values in the other table. If matches are found, the engine outputs the combined records with additional columns holding the calculation results of metrics.
Arguments
Some parameters of the equi join engine are the same as those of the asof join engine, please refer to the function createAsofJoinEngine for detailed information. The different parameters are described as below:
name is a string indicating the name of the equi join streaming engine. It is the unique identifier of the engine on a data/compute node. It can contain letters, numbers and underscores and must start with a letter.
timeColumn is a STRING scalar or vector indicating the time columns in the left table and the right table. The time columns in the left and right tables must have the same data type. When the two time columns have the same column name, timeColumn is a string scalar; otherwise, timeColumn is vector of two strings.
garbageSize (optional) is a positive integer with the default value of 5,000 (in unit of rows). When the number of rows of historical data in memory exceeds the garbageSize, the system will remove the historical data that is not needed for the current calculation on the following conditions:
-
The historical data has already been joined and returned.
-
For historical data that has not been joined, if the timestamp difference between the historical data and the new arriving data in left/right table has exceeded the maxDelayedTime, it will also be discarded.
maxDelayedTime (optional) is a positive integer with the default value of 3 (seconds), indicating the maximum time to keep cached data in the engine. This parameter only takes effect when the conditions described in garbageSize are met. It is not recommended to set the maxDelayedTime too small in case data got removed before it is joined.
Examples
Example 1.
share streamTable(1:0, `time`sym`price, [SECOND, SYMBOL, DOUBLE]) as leftTable
share streamTable(1:0, `time`sym`val, [SECOND, SYMBOL, DOUBLE]) as rightTable
share table(100:0, `time`sym`price`val`total, [SECOND, SYMBOL, DOUBLE, DOUBLE, DOUBLE]) as output
ejEngine=createEquiJoinEngine("test1", leftTable, rightTable, output, [<price>, <val>, <price*val>], `sym, `time)
subscribeTable(tableName="leftTable", actionName="joinLeft", offset=0, handler=appendForJoin{ejEngine, true}, msgAsTable=true)
subscribeTable(tableName="rightTable", actionName="joinRight", offset=0, handler=appendForJoin{ejEngine, false}, msgAsTable=true)
tmp1=table(13:30:10+1..20 as time, take(`AAPL, 10) join take(`IBM, 10) as sym, double(1..20) as price)
leftTable.append!(tmp1)
tmp2=table(13:30:10+1..20 as time, take(`AAPL, 10) join take(`IBM, 10) as sym, double(50..31) as val)
rightTable.append!(tmp2)
select count(*) from output
// output: 20
Example 2. The type of the timeColumn is timestamp. The default value of maxDelayedTime is 3000ms (3s).
share streamTable(5000000:0, `timestamp`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as leftTable
share streamTable(5000000:0, `timestamp`sym`val, [TIMESTAMP, SYMBOL, DOUBLE]) as rightTable
share table(5000000:0, `timestamp`sym`price`val`total`diff`ratio, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as output
ejEngine=createEquiJoinEngine("test1", leftTable, rightTable, output, <[price, val, price+val, price-val, price/val]>, `sym, `timestamp, 5000)
topic1=subscribeTable(tableName="leftTable", actionName="writeLeft", offset=0, handler=appendForJoin{ejEngine, true}, batchSize=10000, throttle=1)
topic2=subscribeTable(tableName="rightTable", actionName="writeRight", offset=0, handler=appendForJoin{ejEngine, false}, batchSize=10000, throttle=1)
def writeLeftTable(mutable tb){
batch = 1000
for(i in 1..300){
tmp = table(batch:batch, `timestamp`sym`price, [TIMESTAMP, SYMBOL, DOUBLE])
tmp[`timestamp]=take(2012.01.01T00:00:00.000+i, batch)
tmp[`sym]=shuffle("A"+string(1..batch))
tmp[`price]=rand(100.0, batch)
tb.append!(tmp)
}
}
def writeRightTable(mutable tb){
batch = 500
for(i in 1..200){
tmp = table(batch:batch, `timestamp`sym`val, [TIMESTAMP, SYMBOL, DOUBLE])
tmp[`timestamp]=take(2012.01.01T00:00:00.000+i, batch)
tmp[`sym]=shuffle("A"+string(1..batch))
tmp[`val]=rand(100.0, batch)
tb.append!(tmp)
}
}
job1 = submitJob("writeLeft", "", writeLeftTable, leftTable)
job2 = submitJob("writeRight", "", writeRightTable, rightTable)
select count(*) from output order by sym, timestamp
// output: 100000