dynamicGroupCumcount

Syntax

dynamicGroupCumcount(membership, prevMembership, groupCount)

Arguments

membership is a vector, of which elements must be integers in the interval [0, groupCount), indicating tags for the record at the current timestamp.

prevMembership is a vector of INT type, of which elements can be NULL values (the first record of each group), indicating tags for the record at the previous timestamp of membership.

groupCount is an integer in the interval [2, 8], indicating the number of tags.

Details

The attribute and category of an event are fixed in most cases. In some scenarios, the category of an event, however, will change dynamically. For example, when processing real-time tick data, users may judge whether an order (attribute) is a large or a small one (category) based on the cumulative volume to analyze capital flow. As real-time data continues to flow in, trading volume keeps increasing, and thus a small order may change to a large one.

Function dynamicGroupCumcount is used in such scenarios to count the number of dynamically cumulative events of different categories.

Details are as follows:

  • If membership = prevMembership, count remains unchanged.

  • If membership ≠ prevMembership, the count of corresponding group of membership increases by 1, and the count of corresponding group of prevMembership decreases by 1.

  • If prevMembership is a NULL value (the first record of each group), the count of corresponding group of membership increases by 1.

It returns a tuple of length groupCount. Each element is a vector of the same length as membership, which sequentially records the cumulative count of each tag.

Note: The index of the tuple matches the tags, which means that the count of tag 0 is output to the vector at index 0 of the tuple.

Examples

Data preprocessing:

// Define a function to generate tags
def tag_func(v){

  return iif(v <= 5, 0, iif(v <= 10 and v > 5, 1, 2))
# output
}
// original table
time = take(2022.01.01T09:00:00.000 + 1..3, 6)
sym=`st0`st0`st0`st1`st1`st1
orderNo = `10001`10002`10001`10002`10003`10002
volume = 2 4 6 3 2 9
t = table(sym, time, orderNo, volume)

// calculate cumulative sums and tag the results
t1 = select *, cumsum(volume) as sumVolume from t context by sym, orderNo
t2 = lj(t, t1,`sym`time`orderNo)
t3 = select sym, time, orderNo, volume, sumVolume, tag_func(sumVolume) as groupId from t2

For historical data, you can use SQL statements to calculate the cumulative count for each group:

t4 = select sym, time, orderNo, prev(groupId) as prevGroupId from t3 context by sym,orderNo
t5 = lj(t3, t4,`sym`time`orderNo)
re = select sym, time, orderNo, dynamicGroupCumcount(groupId, prevGroupId, 3) as `groupId0`groupId1`groupId2 from t5 context by sym
re
sym time orderNo groupId0 groupId1 groupId2
st0 2022.01.01T09:00:00.001 10001 1 0 0
st0 2022.01.01T09:00:00.002 10002 2 0 0
st0 2022.01.01T09:00:00.003 10001 1 1 0
st1 2022.01.01T09:00:00.001 10002 1 0 0
st1 2022.01.01T09:00:00.002 10003 2 0 0
st1 2022.01.01T09:00:00.003 10002 1 0 1

For real-time data, you can use reactive state engine to calculate the cumulative count for each group:

result = table(1000:0, `sym`time`orderNo`groupId0`groupId1`groupId2, [SYMBOL, TIME, SYMBOL,INT,INT,INT])
factor0 = [<time>,  <prev(groupId) as prevGroupId>, <groupId>, <volume>]
factor1 = [<time>, <orderNo>, <dynamicGroupCumcount(groupId, prevGroupId, 3)>]
dm1 = table(1000:0, `sym`time`orderNo`volume`sumVolume`groupId, [SYMBOL, TIME, SYMBOL,INT, INT,INT])
dm2 = table(1000:0, `sym`orderNo`time`prevGroupId`groupId`volume, [SYMBOL, SYMBOL, TIME, INT,INT,INT])
res1 = createReactiveStateEngine(name="reactive_ccnt", metrics =factor1, dummyTable=dm2, outputTable=result, keyColumn=`sym, keepOrder=true)
res0 = createReactiveStateEngine(name="reactive_prev", metrics =factor0, dummyTable=dm1, outputTable=res1, keyColumn=`sym`orderNo, keepOrder=true)
res0.append!(t3)

select * from result
sym time orderNo groupId0 groupId1 groupId2
st0 2022.01.01T09:00:00.001 10001 1 0 0
st0 2022.01.01T09:00:00.002 10002 2 0 0
st0 2022.01.01T09:00:00.003 10001 1 1 0
st1 2022.01.01T09:00:00.001 10002 1 0 0
st1 2022.01.01T09:00:00.002 10003 2 0 0
st1 2022.01.01T09:00:00.003 10002 1 0 1
dropStreamEngine("reactive_ccnt")
dropStreamEngine("reactive_prev")

Related function: dynamicGroupCumsum