dynamicGroupCumcount
Syntax
dynamicGroupCumcount(membership, prevMembership, groupCount)
Arguments
membership is a vector, of which elements must be integers in the interval [0, groupCount), indicating tags for the record at the current timestamp.
prevMembership is a vector of INT type, of which elements can be null values (the first record of each group), indicating tags for the record at the previous timestamp of membership.
groupCount is an integer in the interval [2, 8], indicating the number of tags.
Details
The attribute and category of an event are fixed in most cases. In some scenarios, the category of an event, however, will change dynamically. For example, when processing real-time tick data, users may judge whether an order (attribute) is a large or a small one (category) based on the cumulative volume to analyze capital flow. As real-time data continues to flow in, trading volume keeps increasing, and thus a small order may change to a large one.
Function dynamicGroupCumcount
is used in such scenarios to count the
number of dynamically cumulative events of different categories.
Details are as follows:
-
If membership = prevMembership, count remains unchanged.
-
If membership ≠ prevMembership, the count of corresponding group of membership increases by 1, and the count of corresponding group of prevMembership decreases by 1.
-
If prevMembership is a null value (the first record of each group), the count of corresponding group of membership increases by 1.
It returns a tuple of length groupCount. Each element is a vector of the same length as membership, which sequentially records the cumulative count of each tag.
Note: The index of the tuple matches the tags, which means that the count of tag 0 is output to the vector at index 0 of the tuple.
Examples
Data preprocessing:
// Define a function to generate tags
def tag_func(v){
return iif(v <= 5, 0, iif(v <= 10 and v > 5, 1, 2))
# output
}
// original table
time = take(2022.01.01T09:00:00.000 + 1..3, 6)
sym=`st0`st0`st0`st1`st1`st1
orderNo = `10001`10002`10001`10002`10003`10002
volume = 2 4 6 3 2 9
t = table(sym, time, orderNo, volume)
// calculate cumulative sums and tag the results
t1 = select *, cumsum(volume) as sumVolume from t context by sym, orderNo
t2 = lj(t, t1,`sym`time`orderNo)
t3 = select sym, time, orderNo, volume, sumVolume, tag_func(sumVolume) as groupId from t2
For historical data, you can use SQL statements to calculate the cumulative count for each group:
t4 = select sym, time, orderNo, prev(groupId) as prevGroupId from t3 context by sym,orderNo
t5 = lj(t3, t4,`sym`time`orderNo)
re = select sym, time, orderNo, dynamicGroupCumcount(groupId, prevGroupId, 3) as `groupId0`groupId1`groupId2 from t5 context by sym
re
sym | time | orderNo | groupId0 | groupId1 | groupId2 |
---|---|---|---|---|---|
st0 | 2022.01.01T09:00:00.001 | 10001 | 1 | 0 | 0 |
st0 | 2022.01.01T09:00:00.002 | 10002 | 2 | 0 | 0 |
st0 | 2022.01.01T09:00:00.003 | 10001 | 1 | 1 | 0 |
st1 | 2022.01.01T09:00:00.001 | 10002 | 1 | 0 | 0 |
st1 | 2022.01.01T09:00:00.002 | 10003 | 2 | 0 | 0 |
st1 | 2022.01.01T09:00:00.003 | 10002 | 1 | 0 | 1 |
For real-time data, you can use reactive state engine to calculate the cumulative count for each group:
result = table(1000:0, `sym`time`orderNo`groupId0`groupId1`groupId2, [SYMBOL, TIME, SYMBOL,INT,INT,INT])
factor0 = [<time>, <prev(groupId) as prevGroupId>, <groupId>, <volume>]
factor1 = [<time>, <orderNo>, <dynamicGroupCumcount(groupId, prevGroupId, 3)>]
dm1 = table(1000:0, `sym`time`orderNo`volume`sumVolume`groupId, [SYMBOL, TIME, SYMBOL,INT, INT,INT])
dm2 = table(1000:0, `sym`orderNo`time`prevGroupId`groupId`volume, [SYMBOL, SYMBOL, TIME, INT,INT,INT])
res1 = createReactiveStateEngine(name="reactive_ccnt", metrics =factor1, dummyTable=dm2, outputTable=result, keyColumn=`sym, keepOrder=true)
res0 = createReactiveStateEngine(name="reactive_prev", metrics =factor0, dummyTable=dm1, outputTable=res1, keyColumn=`sym`orderNo, keepOrder=true)
res0.append!(t3)
select * from result
sym | time | orderNo | groupId0 | groupId1 | groupId2 |
---|---|---|---|---|---|
st0 | 2022.01.01T09:00:00.001 | 10001 | 1 | 0 | 0 |
st0 | 2022.01.01T09:00:00.002 | 10002 | 2 | 0 | 0 |
st0 | 2022.01.01T09:00:00.003 | 10001 | 1 | 1 | 0 |
st1 | 2022.01.01T09:00:00.001 | 10002 | 1 | 0 | 0 |
st1 | 2022.01.01T09:00:00.002 | 10003 | 2 | 0 | 0 |
st1 | 2022.01.01T09:00:00.003 | 10002 | 1 | 0 | 1 |
dropStreamEngine("reactive_ccnt")
dropStreamEngine("reactive_prev")
Related function: dynamicGroupCumsum