createNearestJoinEngine
Syntax
createNearestJoinEngine(name, leftTable, rightTable, outputTable, kNearest,
metrics, matchingColumn, [timeColumn], [useSystemTime=false], [garbageSize =
5000], [maxDelayedTime], [nullFill], [cachedTableCapacity=1024], [snapshotDir],
[snapshotIntervalInMsgCount])
Details
Creates a nearest-neighbor join streaming engine. The engine groups the left and right input tables by the columns specified in matchingColumn, and for each row in the left table, it retrieves the k nearest records from the right table (with timestamps not later than the current row) within each group, and computes the output based on these records in real time.
Return value: A table.
Arguments
name is a string indicating the name of the nearest-neighbor join streaming engine. It is the unique identifier of the engine on a data/compute node. It can contain letters, numbers and underscores and must start with a letter.
leftTable and rightTable are table objects whose schema must be the same as the stream table to which the engine subscribes.
outputTable is a table to which the engine inserts calculation result. It can be an in-memory table or a DFS table. Before calling a function, an empty table with specified column names must be created.
The columns of outputTable are in the following order:
(1) The first column must be a temporal column.
-
if useSystemTime = true, the data type must be TIMESTAMP;
-
if useSystemTime = false, it has the same data type as timeColumn.
(2) Then followed by one or more columns on which the tables are joined, arranged in the same order as specified in matchingColumn.
(3) Further followed by one or more columns which are the calculation results of metrics.
kNearest is a positive integer. For each row in the left table, the engine retrieves the k nearest rows from the right table whose timestamps are less than or equal to the current row's timestamp.
metrics is metacode (which can be a tuple) specifying the calculation formulas. For more information about metacode, please refer to Metaprogramming.
-
metrics can use one or more expressions, built-in or user-defined functions.
-
metrics can be functions that return multiple values and the columns in the output table to hold the return values must be specified. For example, <func(price) as `col1`col2>.
-
The columns in right table can be converted into array vectors using
toArray
, e.g.,<toArray(price)>
. -
The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.
If you want to specify a column that exists in both the left and the right tables, use the format tableName.colName. By default, the column from the left table is used.
The following functions are optimized in the engine when they are applied only to the
columns from the right table: sum
, sum2
,
avg
, std
, var
,
corr
, covar
, wavg
,
wsum
, beta
, max
,
min
, last
, first
,
med
, percentile
.
- When there is only 1 column to match - If the names of the matching column are the same in both tables, matchingColumn should be specified as a STRING scalar; otherwise it's a tuple of two elements. For example, if the column is named "sym" in the left table and "sym1" in the right table, then matchingColumn = [[`sym],[`sym1]].
- When there are multiple columns to match - If the names of all the columns to match are the same in both tables, matchingColumn is a STRING vector; otherwise it's a tuple of two elements. For example, if the columns are named "timestamp" and "sym" in the left table, whereas in the right table they're named "timestamp" and "sym1", then matchingColumn = [[`timestamp, `sym], [`timestamp,`sym1]].
timeColumn (optional) When useSystemTime = false, it must be specified to indicate the name(s) of the time column in the left table and the right table. The time columns must have the same data type. If the names of the time column in the left table and the right table are the same, timeColumn is a string. Otherwise, it is a vector of 2 strings indicating the time column in each table.
-
useSystemTime = true: join records based on the system time (timestamp with millisecond precision) when they are ingested into the engine.
-
useSystemTime = false (default): join records based on the specified timeColumn from the left table and the right table.
garbageSize (optional) is a positive integer with the default value of 5,000 (rows). As the subscribed data is ingested into the engine, it continues to take up the memory. Within the left/right table, the records are grouped by matchingColumn values; When the number of records in a group exceeds garbageSize, the system will remove those already been calculated from memory.
maxDelayedTime (optional) is a positive integer. maxDelayedTime only takes effect when timeColumn is specified and the two arguments must have the same time precision. Use maxDelayedTime to trigger windows which remain uncalculated long past its end. The default maxDelayedTime is 3 seconds. For more information about this parameter, see "Window triggering rules" in the Details section.
nullFill (optional) is a tuple of the same size as the number of output columns. It is used to fill in the null values in the output table. The data type of each element corresponds to each output column.
cachedTableCapacity (optional) is a positive integer indicating the initial capacity (in terms of the number of rows) of the left and right cache tables for each group. The default value is 1024.
To enable snapshot in the streaming engines, specify parameters snapshotDir and snapshotIntervalInMsgCount.
snapshotDir (optional) is a string indicating the directory where the streaming engine snapshot is saved. The directory must already exist, otherwise an exception is thrown. If the snapshotDir is specified, the system checks whether a snapshot already exists in the directory when creating a streaming engine. If it exists, the snapshot will be loaded to restore the engine state. Multiple streaming engines can share a directory where the snapshot files are named as the engine names.
- <engineName>.tmp: a temporary snapshot
- <engineName>.snapshot: a snapshot that is generated and flushed to disk
- <engineName>.old: if a snapshot with the same name already exists, the previous snapshot is renamed to <engineName>.old.
snapshotIntervalInMsgCount (optional) is a positive integer indicating the number of messages to receive before the next snapshot is saved.
Examples
share streamTable(1:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as leftTable
share streamTable(1:0, `time`sym`val, [TIMESTAMP, SYMBOL, DOUBLE]) as rightTable
share table(100:0, `time`sym`factor1`factor2`factor3, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE[], DOUBLE]) as output
nullFill= [2012.01.01T00:00:00.000, `NONE, 0.0, [], 0.0]
njEngine=createNearestJoinEngine(name="test1", leftTable=leftTable, rightTable=rightTable, outputTable=output, kNearest=8, metrics=<[price,toArray(val),sum(val)]>, matchingColumn=`sym, timeColumn=`time, useSystemTime=false,nullFill=nullFill)
subscribeTable(tableName="leftTable", actionName="joinLeft", offset=0, handler=appendForJoin{njEngine, true}, msgAsTable=true)
subscribeTable(tableName="rightTable", actionName="joinRight", offset=0, handler=appendForJoin{njEngine, false}, msgAsTable=true)
n=10
tp2=table(take(2012.01.01T00:00:00.000+0..10, 2*n) as time, take(`A, n) join take(`B, n) as sym, take(double(1..n),2*n) as val)
tp2.sortBy!(`time)
rightTable.append!(tp2)
tp1=table(take(2012.01.01T00:00:00.003+0..10, 2*n) as time, take(`A, n) join take(`B, n) as sym, take(NULL join rand(10.0, n-1),2*n) as price)
tp1.sortBy!(`time)
leftTable.append!(tp1)
tp2=table(take(2012.01.01T00:00:00.010+0..10, 2*n) as time, take(`A, n) join take(`B, n) as sym, take(double(1..n),2*n) as val)
tp2.sortBy!(`time)
rightTable.append!(tp2)
select * from output
time | sym | factor1 | factor2 | factor3 |
---|---|---|---|---|
2012.01.01 00:00:00.003 | A | 0 | [1, 2, 3, 4] | 10 |
2012.01.01 00:00:00.004 | A | 8.049739237951693 | [1, 2, 3, 4, 5] | 15 |
2012.01.01 00:00:00.005 | A | 6.31845193685475 | [1, 2, 3, 4, 5, 6] | 21 |
2012.01.01 00:00:00.006 | A | 0.01247286192106635 | [1, 2, 3, 4, 5, 6, 7] | 28 |
2012.01.01 00:00:00.007 | A | 8.373015887228414 | [1, 2, 3, 4, 5, 6, 7, 8] | 36 |
2012.01.01 00:00:00.008 | A | 4.636610761119452 | [2, 3, 4, 5, 6, 7, 8, 9] | 44 |
2012.01.01 00:00:00.003 | B | 8.049739237951693 | [2, 3, 4, 5] | 14 |
2012.01.01 00:00:00.004 | B | 6.31845193685475 | [2, 3, 4, 5, 6] | 20 |
2012.01.01 00:00:00.005 | B | 0.01247286192106635 | [2, 3, 4, 5, 6, 7] | 27 |
2012.01.01 00:00:00.006 | B | 8.373015887228414 | [2, 3, 4, 5, 6, 7, 8] | 35 |
2012.01.01 00:00:00.007 | B | 4.636610761119452 | [2, 3, 4, 5, 6, 7, 8, 9] | 44 |
2012.01.01 00:00:00.008 | B | 7.700075873220435 | [3, 4, 5, 6, 7, 8, 9, 10] | 52 |
2012.01.01 00:00:00.009 | B | 0.5831421500989946 | [3, 4, 5, 6, 7, 8, 9, 10] | 52 |
2012.01.01 00:00:00.009 | A | 7.700075873220435 | [3, 4, 5, 6, 7, 8, 9, 10] | 52 |
2012.01.01 00:00:00.010 | A | 0.5831421500989946 | [4, 5, 6, 7, 8, 9, 10, 1] | 50 |
2012.01.01 00:00:00.011 | A | 5.117162734418752 | [5, 6, 7, 8, 9, 10, 1, 2] | 48 |
2012.01.01 00:00:00.012 | A | 8.823084861596655 | [6, 7, 8, 9, 10, 1, 2, 3] | 46 |
2012.01.01 00:00:00.010 | B | 5.117162734418752 | [5, 6, 7, 8, 9, 10, 1, 2] | 48 |
2012.01.01 00:00:00.011 | B | 8.823084861596655 | [6, 7, 8, 9, 10, 1, 2, 3] | 46 |
2012.01.01 00:00:00.013 | B | 0 | [8, 9, 10, 1, 2, 3, 4, 5] | 42 |