Heterogeneous Stream Table Subscription
Starting from version 1.30.17/2.00.5, DolphinDB function replay
supports
replaying (serializing) tables with different schemata into a single "heterogeneous"
stream table. The streamDeserializer
class is provided to enable C++
API users to subscribe to a heterogeneous stream table and deserialize the content from
that table. This section describes how to subscribe to heterogeneous stream tables
through the C++ API.
Note:
-
The symbol, i.e., the table identifiers (dictionary keys) specified in the inputTables parameter of
replay
must be set to determine the source table from which the message originates. -
If the subscribed heterogeneous stream table contains an array vector column, the
Message
of function specified in handler must contain a corresponding array vector column. For a regular stream table, a vector column should be contained inMessage
.
Constructing a Deserializer
The subscribed heterogeneous stream table has to be deserialized with the
streamDeserializer
class in C++ API. Construct a deserializer
for heterogeneous stream table with the following methods:
(1) sym2tableName - Constructed by the mapping between symbol and table. The table is represented by a <string, string> pair, i.e., <database name, table name>. For an in-memory table, the database name is an empty string. The second parameter pconn can be disregarded for now.
StreamDeserializer(const unordered_map<string, pair<string, string>> &sym2tableName, DBConnection *pconn = nullptr);
(2) sym2schema - Constructed by the mapping between symbol and the table schema.
StreamDeserializer(const unordered_map<string, DictionarySP> &sym2schema);
(3) symbol2col - Constructed by the mapping between symbol and data types for each table columns.
StreamDeserializer(const unordered_map<string, vector<DATA_TYPE>> &symbol2col);
Usage Example
The following example uses the DolphinDB server version 2.00.10.
DolphinDB script: Create two tables with different schemata and replay them to a single heterogeneous stream table.
st2 = streamTable(100:0, `timestampv`sym`blob`price1,[TIMESTAMP,SYMBOL,BLOB,DOUBLE])
share st2 as SDoutTables
n = 5;
table1 = table(100:0, `datetimev`timestampv`sym`price1`price2, [DATETIME, TIMESTAMP, SYMBOL, DOUBLE, DOUBLE]);
table2 = table(100:0, `datetimev`timestampv`sym`price1, [DATETIME, TIMESTAMP, SYMBOL, DOUBLE]);
share table1 as table1_SDPT
share table2 as table2_SDPT
tableInsert(table1_SDPT, 2012.01.01T01:21:23 + 1..n, 2018.12.01T01:21:23.000 + 1..n, take(`a`b`c,n), rand(100,n)+rand(1.0, n), rand(100,n)+rand(1.0, n));
tableInsert(table2_SDPT, 2012.01.01T01:21:23 + 1..n, 2018.12.01T01:21:23.000 + 1..n, take(`a`b`c,n), rand(100,n)+rand(1.0, n));
d = dict(['msg1','msg2'], [table1_SDPT, table2_SDPT]);
replay(inputTables=d, outputTables=`SDoutTables, dateColumn=`timestampv, timeColumn=`timestampv)
Example 1
Construct deserializer with the mapping between symbol and table.
int main(int argc, const char **argv)
{
std::unordered_map<std::string, std::pair<std::string, std::string>> sym2table{{"msg1", {"", "table1_SDPT"}}, {"msg2", {"", "table2_SDPT"}}};
StreamDeserializerSP sdsp = new StreamDeserializer(sym2table);
auto onehandler = [&](Message msg)
{
std::cout << "symbol: " << msg.getSymbol() << " content: " << msg->getString() << std::endl;
};
ThreadedClient threadedClient(0);
threadedClient.subscribe("127.0.0.1", 8848, onehandler, "SDoutTables", "test_SD", 0, true, nullptr, false, false, "admin", "123456", sdsp);
Util::sleep(1000);
threadedClient.unsubscribe("127.0.0.1", 8848, "SDoutTables", "test_SD");
}
Example 2
Construct deserializer with the mapping between symbol and the table schema.
int main(int argc, const char **argv)
{
DBConnection conn;
conn.connect("127.0.0.1", 8848, "admin", "123456");
DictionarySP t1s = conn.run("schema(table1_SDPT)");
DictionarySP t2s = conn.run("schema(table2_SDPT)");
unordered_map<string, DictionarySP> sym2schema;
sym2schema["msg1"] = t1s;
sym2schema["msg2"] = t2s;
StreamDeserializerSP sdsp = new StreamDeserializer(sym2schema);
auto onehandler = [&](Message msg)
{
std::cout << "symbol: " << msg.getSymbol() << " content: " << msg->getString() << std::endl;
};
ThreadedClient threadedClient(0);
threadedClient.subscribe("127.0.0.1", 8848, onehandler, "SDoutTables", "test_SD", 0, true, nullptr, false, false, "admin", "123456", sdsp);
Util::sleep(1000);
threadedClient.unsubscribe("127.0.0.1", 8848, "SDoutTables", "test_SD");
}
Example 3
Construct deserializer with the mapping between symbol and data types for each table columns.
int main(int argc, const char **argv)
{
unordered_map<string, std::vector<DATA_TYPE>> sym2cols{{"msg1", {DT_DATETIME, DT_TIMESTAMP, DT_SYMBOL, DT_DOUBLE, DT_DOUBLE}}, {"msg2", {DT_DATETIME, DT_TIMESTAMP, DT_SYMBOL, DT_DOUBLE}}};
StreamDeserializerSP sdsp = new StreamDeserializer(sym2cols);
auto onehandler = [&](Message msg)
{
std::cout << "symbol: " << msg.getSymbol() << " content: " << msg->getString() << std::endl;
};
ThreadedClient threadedClient(0);
threadedClient.subscribe("127.0.0.1", 8848, onehandler, "SDoutTables", "test_SD", 0, true, nullptr, false, false, "admin", "123456", sdsp);
Util::sleep(1000);
threadedClient.unsubscribe("127.0.0.1", 8848, "SDoutTables", "test_SD");
}