createPartitionedTable

Syntax

createPartitionedTable(dbHandle, table, tableName, [partitionColumns], [compressMethods], [sortColumns|primaryKey], [keepDuplicates=ALL], [sortKeyMappingFunction], [softDelete=false], [indexes])

Arguments

Note: The parameters sortColumns, keepDuplicates and sortKeyMappingFunction take effect only in a TSDB storage engine (i.e., database().engine = TSDB).

dbHandle (optional) is a DFS database handle returned by function database. The database can be either in the local file system, or in the distributed file system.

table (optional) is a table or a tuple of tables. The table schema will be used to construct the new partitioned table.

tableName (optional) is a string indicating the name of the partitioned table.

partitionColumns (optional) is a STRING scalar/vector indicating the partitioning column(s). For non-sequential partitioning, this parameter is required. For a composite partition, this parameter is a STRING vector. A string can be a column name, or a function call applied to a column (e.g. partitionFunc(id)). If a function call is specified, it must have exactly one column argument, and the other arguments must be constant scalars. In this case, the table is partitioned based on the result of the function call. Note that optimization may be limited for SQL queries involving the partitioning column (e.g., the id column in partitionFunc(id)).

compressMethods (optional) is a dictionary indicating which compression methods are used for specified columns. The keys are columns name and the values are compression methods ("lz4", "delta", "zstd" or "chimp"). If unspecified, use LZ4 compression method. Note:

  • The delta compression method applies delta-of-delta algorithm, which is particularly suitable for data types like SHORT, INT, LONG, and date/time data.
  • Save strings as SYMBOL type to enable compression of strings.
  • The chimp compression method can be used for DOUBLE type data with decimal parts not exceeding three digits in length.

sortColumns (optional) is a STRING scalar/vector that specifies the column(s) used to sort the ingested data within each partition of the TSDB database. The sort columns must be of Integral, Temporal, STRING, SYMBOL, or DECIMAL type. Note that sortColumns is not necessarily consistent with the partitioning column.

  • If multiple columns are specified for sortColumns, the last column must be a time column. The preceding columns are used as the sort keys and they cannot be of TIME, TIMESTAMP, NANOTIME, or NANOTIMESTAMP type.
  • If only one column is specified for sortColumns, the column is used as the sort key, and it can be a time column or not. If the sort column is a time column and sortKeyMappingFunction is specified, the sort column specified in a SQL where condition can only be compared with temporal values of the same data type.
  • It is recommended to specify no more than 4 frequently-queried columns for sortColumns and sort them in the descending order of query frequency, which ensures that frequently-used data is readily available during query processing.
  • The number of sort key entries (which are the combinations of the values of the sort keys) within each partition may not exceed 2000 for optimal performance. This limitation prevents excessive memory usage and ensures efficient query processing.

primaryKey (optional) is a STRING scalar/vector that specifies the primary key column(s), uniquely identifying each record in a DFS table of the PKEY database. For records with the same primary key, only the latest one is retained. Note that:

  • primaryKey must include all partitioning columns.
  • The primary key columns must be of Logical, Integral (excluding COMPRESSED), Temporal, STRING, SYMBOL, or DECIMAL type.
  • With more than one primary key column, a composite primary key is maintained. The composite primary key uses a Bloomfilter index by default (see the indexes parameter for details).

keepDuplicates (optional) specifies how to deal with records with duplicate sortColumns values. It can have the following values:

  • ALL: keep all records;
  • LAST: only keep the last record;
  • FIRST: only keep the first record.

It is recommended to specify the sortKeyMappingFunction parameter if there are many sort keys in a partition of a TSDB database and a small number of records with the same sort key. After dimensionality reduction, the blocks in a TSDB level file can store more data, which not only reduces the frequency of reading data blocks and disk I/O during query, but also improves the data compression ratio.

sortKeyMappingFunction (optional) is a vector of unary functions. It has the same length as the number of sort keys. The specified mapping functions are applied to each sort key (i.e., the sort columns except for the temporal column) for dimensionality reduction. After the dimensionality reduction for the sort keys, records with a new sort key entry will be sorted based on the last column of sortColumns (the temporal column).

Note:

  • Dimensionality reduction is performed when writing to disk, so specifying this parameter may affect write performance.
  • The functions specified in sortKeyMappingFunction correspond to each and every sort key. If a sort key does not require dimensionality reduction, leave the corresponding element empty in the vector.
  • If a mapping function is hashBucket AND the sort key to which it applies is a HASH partitioning column, make sure the number of hash partitions and buckets are not divisible by each other (except when buckets=1). Otherwise the column values from the same HASH partition would be mapped to the same hash bucket after dimensionality reduction.

softDelete (optional) determines whether to enable soft delete for TSDB databases. The default value is false. To use it, keepDuplicates must be set to 'LAST'. It is recommended to enable soft delete for databases where the row count is large and delete operations are infrequent.

indexes (optional) is a dictionary with columns as keys and index types as values. Both keys and values are of STRING type. indexes can only be set for tables of TSDB (keepDuplicates=ALL) or PKEY databases.

  • bloomfilter
    • Only available in PKEY databases.
    • Excels in point queries on high-cardinality columns (e.g., ID card numbers, order numbers, foreign keys from upstreams).
    • Supports indexing on columns of the following data types: BOOL, CHAR, SHORT, INT, LONG, TEMPORAL, BLOB, STRING, BINARY, COMPLEX, DECIMAL32, DECIMAL64, DECIMAL128.
    • Composite primary keys are automatically indexed with Bloomfilter. Columns not specified in indexes default to ZoneMap indexing.
  • textindex
    • Only available in PKEY databases.
    • Enables efficient text searches through text indexing on non-primary key columns of STRING type.
    • Specified in the form of textindex(parser=english,lowercase=true,stem=true), where
      • parser specifies the tokenizer. It has no default value and must be explicitly set. Options include none (not tokenized) and english (tokenized based on spaces and punctuations).
      • lowercase specifies whether the tokenizer is case-insensitive, which only takes effect when parser is set to english. The default value is true. Set it to false for case-sensitive scenarios.
      • stem specifies whether to match English words by their stem, which only takes effect when parser=english and lowercase=true. The default value is false, indicating exact searches. Set it to true to return related derivatives (e.g., a search for "dark" may also return "darkness").
  • vectorindex
    • Available in PKEY and TSDB databases.
    • Specified in the form of vectorindex(type={t}, dim={d}), where
      • type can take the following values: flat, pq, ivf, ivfpq, hnsw.
      • dim is an integer no less than 1, indicating the dimension of the vector. For index types "pq" or "ivfpq", dim must be divisible by 4. Note that the dimension of vectors inserted into the indexing column must match the specified dim.
    • Accelerates retrieval under the following conditions:
      • No table joins are used in queries.
      • The order by clause must be sorted in ascending order and can only use rowEuclidean to compute distances.
      • The first parameter passed to rowEuclidean must be the indexing column, i.e., rowEuclidean(<vectorCol>, queryVec).
      • A limit clause must be specified.
      • If a where clause is specified, it must not include any sort columns.
      • The query cannot include clauses such as group by and having.
    Note: If the above-mentioned conditions are met for a PKEY engine, the query will first filter the indexing column based on the order by and limit clauses, and then filter the results with the where clause. This approach may output fewer results than the limit clause alone.

Details

Create an empty partitioned table with the same schema as the specified table.

  • To create a DFS table or a table on disk, parameter table must be a table. This function is used with append! or tableInsert to generate a partitioned table. It cannot be used to create a partitioned table with sequential domain.
  • To create an in-memory partitioned table, parameter table can be a table or a tuple of tables. The number of tables given by the parameter table must be the same as the number of partitions in the database.

Note:

  • Only the schema of table is used. None of the rows in table is imported to the newly created partitioned table.
  • For a DFS database in an OLAP engine, the maximum number of handles (including temporary handles*) to partitioned tables is 8,192 per node. For the TSDB storage engine, there is no limit.

*temporary handles: If no handle is specified when you create a partitioned table in a DFS database with createPartitionedTable, each database creates a temporary handle to hold the return value. If you create multiple tables under the same database, the temporary handle for the database is overwritten each time.

Examples

Example 1. Create a DFS table

Example 1.1. Create a DFS table in OLAP database

n=10000
t=table(2020.01.01T00:00:00 + 0..(n-1) as timestamp, rand(`IBM`MS`APPL`AMZN,n) as symbol, rand(10.0, n) as value)
db = database("dfs://rangedb_tradedata", RANGE, `A`F`M`S`ZZZZ)
Trades = db.createPartitionedTable(table=t, tableName="Trades", partitionColumns="symbol", compressMethods={timestamp:"delta"});

At this point, the table Trades is empty. The schema of Trades is the same as the schema of table t. Then we append table t to table Trades.

Trades.append!(t);

Now the contents of table Trades have been updated on disk. In the DFS system, the system doesn't dynamically refresh the contents of tables. We need to load the table into memory before we can work with it interactively.

Trades=loadTable(db,`Trades);
select min(value) from Trades;
// output: 0

After appending data to a DFS table, we don't need to use function loadTable to load the table before querying the table, as the system automatically refreshes the table after appending operations. After system restarts, however, we need to use loadTable to load a DFS table before querying the table.

Example 1.2. Create a DFS table in TSDB database

n = 10000
SecurityID = rand(`st0001`st0002`st0003`st0004`st0005, n)
sym = rand(`A`B, n)
TradeDate = 2022.01.01 + rand(100,n)
TotalVolumeTrade = rand(1000..3000, n)
TotalValueTrade = rand(100.0, n)
schemaTable_snap = table(SecurityID, TradeDate, TotalVolumeTrade, TotalValueTrade).sortBy!(`SecurityID`TradeDate)

dbPath = "dfs://TSDB_STOCK"
if(existsDatabase(dbPath)){dropDatabase(dbPath)}
db_snap = database(dbPath, VALUE, 2022.01.01..2022.01.05, engine='TSDB')
snap=createPartitionedTable(dbHandle=db_snap, table=schemaTable_snap, tableName="snap", partitionColumns=`TradeDate, sortColumns=`SecurityID`TradeDate, keepDuplicates=ALL, sortKeyMappingFunction=[hashBucket{,5}])
snap.append!(schemaTable_snap)
flushTSDBCache()
snap = loadTable(dbPath, `snap)
select * from snap

Example 1.3. Create a distributed partitioned table in PKEY database.

db = database(directory="dfs://PKDB", partitionType=VALUE, partitionScheme=1..10, engine="PKEY")
schematb = table(1:0,`id1`id2`val1`val2`date1`time1,[INT,INT,INT,DECIMAL32(2),DATE,TIME])
pkt = createPartitionedTable(dbHandle=db, table=schematb, tableName="pkt", partitionColumns="id1", primaryKey=`id1`id2, indexes={"val1": "bloomfilter", "val2": "bloomfilter"})

Example 2. Create in-memory partitioned tables

Example 2.1. Create a partitioned in-memory table

n = 20000
colNames = `time`sym`qty`price
colTypes = [TIME,SYMBOL,INT,DOUBLE]
t = table(n:0, colNames, colTypes)
db = database(, RANGE, `A`D`F)
pt = db.createPartitionedTable(t, `pt, `sym)

insert into pt values(09:30:00.001,`AAPL,100,56.5)
insert into pt values(09:30:01.001,`DELL,100,15.5)

Example 2.2. Create a partitioned keyed table

n = 20000
colNames = `time`sym`qty`price
colTypes = [TIME,SYMBOL,INT,DOUBLE]
t = keyedTable(`time`sym, n:0, colNames, colTypes)
db = database(, RANGE, `A`D`F)
pt = db.createPartitionedTable(t, `pt, `sym)

insert into pt values(09:30:00.001,`AAPL,100,56.5)
insert into pt values(09:30:01.001,`DELL,100,15.5)

Example 2.3. Create a partitioned stream table

When creating a partitioned stream table, the second parameter of createPartitionedTable must be a tuple of tables, and its length must be equal to the number of partitions. Each table in the tuple represents a partition. In the following example, trades_stream1 and trades_stream2 form a partitioned stream table trades. We cannot directly write data to trades. Instead, we need to write to trades_stream1 and trades_stream2.

n=200000
colNames = `time`sym`qty`price
colTypes = [TIME,SYMBOL,INT,DOUBLE]
trades_stream1 = streamTable(n:0, colNames, colTypes)
trades_stream2 = streamTable(n:0, colNames, colTypes)
db=database(, RANGE, `A`D`F)
trades = createPartitionedTable(db,[trades_stream1, trades_stream2], "", `sym)

insert into trades_stream1 values(09:30:00.001,`AAPL,100,56.5)
insert into trades_stream2 values(09:30:01.001,`DELL,100,15.5)

select * from trades;
time sym qty price
09:30:00.001 AAPL 100 56.5
09:30:01.001 DELL 100 15.5

Example 2.4. Create a partitioned MVCC table

Similar to a partitioned stream table, to create a partitioned MVCC table, the second parameter of createPartitionedTable must be a tuple of tables, and its length must be equal to the number of partitions. Each table in the tuple represents a partition. In the following example, trades_mvcc1 and trades_mvcc2 form a partitioned MVCC table trades. We cannot directly write data to trades. Instead, we need to write to trades_mvcc1 and trades_mvcc2.

n=200000
colNames = `time`sym`qty`price
colTypes = [TIME,SYMBOL,INT,DOUBLE]
trades_mvcc1 = mvccTable(n:0, colNames, colTypes)
trades_mvcc2 = mvccTable(n:0, colNames, colTypes)
db=database(, RANGE, `A`D`F)
trades = createPartitionedTable(db,[trades_mvcc1, trades_mvcc2], "", `sym)

insert into trades_mvcc1 values(09:30:00.001,`AAPL,100,56.5)
insert into trades_mvcc2 values(09:30:01.001,`DELL,100,15.5)

select * from trades;
time sym qty price
09:30:00.001 AAPL 100 56.5
09:30:01.001 DELL 100 15.5

Example 3. Create a distributed partitioned table with vector index set.

Example 3.1 In TSDB database.

db = database(directory="dfs://indexesTest", partitionType=VALUE, partitionScheme=1..10, engine="TSDB")
schematb = table(1:0,`col0`col1`col2`col3,[INT,INT,TIMESTAMP,FLOAT[]])
pt = createPartitionedTable(dbHandle=db, table=schematb, tableName=`pt, partitionColumns=`col0, sortColumns=`col1`col2, indexes={"col3":"vectorindex(type=flat, dim=5)"})

tmp = cj(table(1..10 as col0),cj(table(1..10 as col1),table(now()+1..10 as col2))) join table(arrayVector(1..1000*5,1..5000) as col3)

pt.tableInsert(tmp)

select * from pt where col2<now() order by rowEuclidean(col3,[1339,252,105,105,829]) limit 10
col0 col1 col2 col3
2 1 2024.06.27 16:56:38.950 [526, 527, 528, 529, 530]
2 1 2024.06.27 16:56:38.949 [521, 522, 523, 524, 525]
2 1 2024.06.27 16:56:38.951 [531, 532, 533, 534, 535]
2 1 2024.06.27 16:56:38.948 [516, 517, 518, 519, 520]
2 1 2024.06.27 16:56:38.952 [536, 537, 538, 539, 540]
2 1 2024.06.27 16:56:38.947 [511, 512, 513, 514, 515]
2 1 2024.06.27 16:56:38.953 [541, 542, 543, 544, 545]
2 1 2024.06.27 16:56:38.946 [506, 507, 508, 509, 510]
2 1 2024.06.27 16:56:38.954 [546, 547, 548, 549, 550]
2 1 2024.06.27 16:56:38.945 [501, 502, 503, 504, 505]

Example 3.1 In PKEY database.

db = database(directory="dfs://PKDB", partitionType=VALUE, partitionScheme=1..10, engine="PKEY")
schematb = table(1:0,`id1`id2`val1`val2`val3`date1`time1,[INT,INT,INT,DECIMAL32(2),DOUBLE[],DATE,TIME])
pkt = createPartitionedTable(dbHandle=db, table=schematb, tableName="pkt", 
partitionColumns="id1", primaryKey=`id1`id2, 
indexes={"val1": "bloomfilter", "val2": "bloomfilter", "val3":"vectorindex(type=pq, dim=4)"})

Example 4. For data with a column in the format id_date_id (e.g., ax1ve_20240101_e37f6), partition by date using a user-defined function:

// Define a function to extract the date information
def myPartitionFunc(str,a,b) {
    return temporalParse(substr(str, a, b),"yyyyMMdd")
}

// Create a database
data = ["ax1ve_20240101_e37f6", "91f86_20240103_b781d", "475b4_20240101_6d9b2", "239xj_20240102_x983n","2940x_20240102_d9237"]
tb = table(data as id_date, 1..5 as value, `a`b`c`d`e as sym)
db = database("dfs://testdb", VALUE, 2024.02.01..2024.02.02)

// Use myPartitionFunc to process the data column
pt = db.createPartitionedTable(table=tb, tableName=`pt, 
    partitionColumns=["myPartitionFunc(id_date, 6, 8)"])
pt.append!(tb)

select * from pt

The queried data are read and returned by partition. The query result shows that table pt is partitioned by the date information extracted from the id_date column.

id_date value sym
ax1ve_20240101_e37f6 1 a
475b4_20240101_6d9b2 3 c
239xj_20240102_x983n 4 d
2940x_20240102_d9237 5 e
91f86_20240103_b781d 2 b