createLookupJoinEngine

Syntax

createLookupJoinEngine(name, leftTable, rightTable, outputTable, metrics, matchingColumn, [rightTimeColumn], [checkTimes], [outputElapsedMicroseconds=false], [keepDuplicates=false])

Details

Create a lookup join streaming engine. The engine left joins two stream tables, or a stream table and a non-stream table (refreshed regularly), on matchingColumn. Use this engine when the right table has infrequent updates (e.g., a dimension table with intraday indicators).

Note:

A left join is triggered only when new data is ingested to the left table.

Data in the right table is grouped by matchingColumn. When keepDuplicates = false, only the latest record in each group is kept by the engine. When keepDuplicates = true, all records in each group are kept.
  • If the right table is a stream table, the data in each group will be updated as new data is ingested into the right table;

  • If the right table is an in-memory table, dimension table or SQL metacode, the system refreshes the right table at regular intervals as specified by checkTimes.

The lookup join engine and the asof join engine are different in the following aspects:

  • For the asof join engine, the first column of its output table is always the time column. There is no such restriction with the lookup join engine.

  • With the lookup join engine, a join is triggered as soon as a new record is ingested into the left table. With the asof join engine, when the timeColumn is specified, there can be a delay before a record in the left table is joined.

Note the difference between the lookup join engine and the left-semi join engine: with the left semi join engine, there will be no return until a match in the right table is found.

For more application scenarios, see Streaming Engines.

Arguments

name is a string indicating the name of the lookup join engine. It is the unique identifier of the engine on a data/compute node. It can contain letters, numbers and underscores and must start with a letter.

leftTable is a table object whose schema must be the same as the stream table to which the engine subscribes.

rightTable is a table object. It can be an in-memory table, stream table, dimension table, or SQL metacode that returns a table. For regularly-updated rightTable that is not subscribed to, checkTimes must be specified for timed data refreshing.

outputTable is a table object to hold the calculation results. Create an empty table and specify the column names and types before calling the function.

The columns of outputTable are in the following order:

(1) The first column(s) are the column(s) on which the tables are joined, arranged in the same order as specified in matchingColumn.

(2) Then followed by the calculation results of metrics.

(3) If the outputElapsedMicroseconds is set to true, specify two more columns: a LONG column and an INT column.

metrics is metacode (can be a tuple) specifying the calculation formulas. For more information about metacode, refer to Metaprogramming.

  • metrics can use one or more expressions, built-in or user-defined functions (but not aggregate functions), or a constant scalar/vector. Note that the output column for a constant vector must be in array vector form.

  • metrics can be functions that return multiple values and the columns in the output table to hold the return values must be specified. For example, <func(price) as col1col2>.

To specify a column that exists in both the left and the right tables, use the format tableName.colName. By default, the column from the left table is used.

Note: The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.

matchingColumn is a STRING scaler/vector/tuple indicating the column(s) on which the tables are joined. It supports integral, temporal or literal (except UUID) types.

  • When there is only 1 column to match - If the names of the columns to match are the same in both tables, matchingColumn should be specified as a STRING scalar; otherwise it's a tuple of two elements. For example, if the column is named "sym" in the left table and "sym1" in the right table, then matchingColumn = [[`sym],[`sym1]].

  • When there are multiple columns to match - If both tables share the names of all columns to match, matchingColumn is a STRING vector; otherwise it's a tuple of two elements. For example, if the columns are named "timestamp" and "sym" in the left table, whereas in the right table they're named "timestamp" and "sym1", then matchingColumn = [[`timestamp, `sym], [`timestamp,`sym1]].

rightTimeColumn (optional) is a STRING scalar indicating the time column in the right table. If the parameter is specified, the right table will keep the record with the latest timestamp. If there are multiple records with identical timestamps, only the latest record is retained. If the parameter is not specified, the latest ingested record (based) on the system time will be kept.

checkTimes (optional) is a vector of temporal values or a DURATION scalar. If it is specified, the system will regularly update the right table (keeping only the latest data) and ingests the latest data to the lookup join engine. If the right table does not need to be updated regularly, you can leave checkTimes empty, but make sure to manually ingest the table data to the engine after it has been created.

  • If checkTimes is a vector of temporal values, it must be of SECOND, TIME or NANOTIME type. The lookup join engine updates the right table according to the time specified by each element in the vector on a daily basis.

  • If checkTimes is a DURATION scalar, it indicates the interval to update the right table.

outputElapsedMicroseconds (optional) is a Boolean value. The default value is false. It determines whether to output:

  • the elapsed time (in microseconds) from the ingestion of data to the output of result in each batch.

  • the total number of each batch.

keepDuplicates (optional) is a Boolean value indicating whether to keep all records in each group of the right table. When set to false (default), the engine keeps the latest record in each group. When set to true, the engine keeps all records in each group.

Examples

Example 1

login(`admin, `123456)
share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
share streamTable(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as prices
share table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE]) as output

LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym)
subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
subscribeTable(tableName="prices", actionName="append_rightTable", offset=0, handler=appendForJoin{LjEngine, false}, msgAsTable=true)

n = 15
tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
prices.append!(tem1)
sleep(2000)
n  = 10
tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
trades.append!(tem2)
sleep(100)
select * from output
sym factor1 factor2 factor3
A 10.1 13 131.3
B 11.1 14 155.4
C 12.1 15 181.5
A 13.1 13 170.3
B 14.1 14 197.4
C 15.1 15 226.5
A 16.1 13 209.3
B 17.1 14 239.4
C 8.1 15 271.5
A 19.1 13 248.3

Example 2

share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
share streamTable(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as prices
share table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE]) as output
LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym, rightTimeColumn=`timestamps)
subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
subscribeTable(tableName="prices", actionName="append_rightTable", offset=0, handler=appendForJoin{LjEngine, false}, msgAsTable=true)

n = 15
tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
prices.append!(tem1)
sleep(2000)
n  = 10
tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
trades.append!(tem2)
sleep(100)
select * from output
sym factor1 factor2 factor3
A 10.1 10 101
B 11.1 11 122.1
C 12.1 12 145.2
A 13.1 10 131
B 14.1 11 155.1
C 15.1 12 181.2
A 16.1 10 161
B 17.1 11 188.1
C 18.1 12 217.2
A 19.1 10 191

Example 3. The right table is an in-memory table, so checkTimes must be set.

share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
share table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE]) as output
prices=table(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT])
LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym, rightTimeColumn=`timestamps, checkTimes=1s)
subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)

n = 15
tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
prices.append!(tem1)
sleep(2000)
n  = 10
tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
trades.append!(tem2)
sleep(100)
select * from output
sym factor1 factor2 factor3
A 10.1 10 101
B 11.1 11 122.1
C 12.1 12 145.2
A 13.1 10 131
B 14.1 11 155.1
C 15.1 12 181.2
A 16.1 10 161
B 17.1 11 188.1
C 18.1 12 217.2
A 19.1 10 191

Example 4. Join the left table "trades" (a real-time stream table) and the right table "prices" (a dimension table with infrequent updates) to look up the matched records in column "id" from the right table.

share streamTable(1000:0, `time`volume`id, [TIMESTAMP, INT,INT]) as trades
dbPath="dfs://testlj"
if(existsDatabase(dbPath)){
   dropDatabase(dbPath)
}
rt=table(1000:0, `time`price`id, [TIMESTAMP, DOUBLE, INT])
db=database(dbPath, VALUE, `A`B`C)
prices=db.createDimensionTable(rt,`rightTable)
share table(10000:0, `id`volume`price`prod, [INT,INT,DOUBLE,DOUBLE]) as outputTable

tradesLookupJoin = createLookupJoinEngine(name="streamLookup1", leftTable=trades, rightTable=prices, outputTable=outputTable, metrics=<[volume,price,volume*price]>, matchingColumn=`id, rightTimeColumn=`time,checkTimes=1s)
subscribeTable(tableName="trades", actionName="append_trades", offset=0, handler=appendForJoin{tradesLookupJoin, true}, msgAsTable=true)

def writeData(t,n){
    timev = 2021.10.08T01:01:01.001 + timestamp(1..n)
    volumev = take(1..n, n)
    id = take(1 2 3, n)
    insert into t values(timev, volumev, id)
}

writeData(rt, 10)
prices.append!(rt)
sleep(2000)
writeData(trades, 6)
sleep(2)

select * from outputTable
id volume price prod
1 1 10 10
2 2 8 16
3 3 9 27
1 4 10 40
2 5 8 40
3 6 9 54

Example 5. Specify SQL metacode for rightTable to join columns queried from a DFS partitioned table.

share streamTable(1000:0, `time`volume`id, [TIMESTAMP, INT,INT]) as trades
dbPath="dfs://lookupjoinDB"
if(existsDatabase(dbPath)){
   dropDatabase(dbPath)
}
rt=table(1000:0, `time`price`id, [TIMESTAMP, DOUBLE, INT])
db=database(dbPath, HASH, [INT,5])
prices=db.createPartitionedTable(rt,`rightTable, `id)
share table(10000:0, `id`volume`price`prod, [INT,INT,DOUBLE,DOUBLE]) as outputTable

tradesLookupJoin = createLookupJoinEngine(name="streamLookup1", leftTable=trades, rightTable=<select * from loadTable(dbPath, `rightTable)>, outputTable=outputTable, metrics=<[volume,price,volume*price]>, matchingColumn=`id, rightTimeColumn=`time,checkTimes=1s)
subscribeTable(tableName="trades", actionName="append_trades", offset=0, handler=appendForJoin{tradesLookupJoin, true}, msgAsTable=true)

def writeData(t,n){
    timev = 2021.10.08T01:01:01.001 + timestamp(1..n)
    volumev = take(1..n, n)
    id = take(1 2 3, n)
    insert into t values(timev, volumev, id)
}

writeData(rt, 10)
prices.append!(rt)
sleep(2000)
writeData(trades, 6)
sleep(2)

select * from outputTable
id volume price prod
1 1 10 10
2 2 8 16
3 3 9 27
1 4 10 40
2 5 8 40
3 6 9 54