Heterogeneous Stream Table Subscription

Starting from version 1.30.17/2.00.5, DolphinDB function replay supports replaying (serializing) tables with different schemata into a single "heterogeneous" stream table. The streamDeserializer class is provided to enable C++ API users to subscribe to a heterogeneous stream table and deserialize the content from that table. This section describes how to subscribe to heterogeneous stream tables through the C++ API.

Note:

  • The symbol, i.e., the table identifiers (dictionary keys) specified in the inputTables parameter of replay must be set to determine the source table from which the message originates.

  • If the subscribed heterogeneous stream table contains an array vector column, the Message of function specified in handler must contain a corresponding array vector column. For a regular stream table, a vector column should be contained in Message.

Constructing a Deserializer

The subscribed heterogeneous stream table has to be deserialized with the streamDeserializer class in C++ API. Construct a deserializer for heterogeneous stream table with the following methods:

(1) sym2tableName - Constructed by the mapping between symbol and table. The table is represented by a <string, string> pair, i.e., <database name, table name>. For an in-memory table, the database name is an empty string. The second parameter pconn can be disregarded for now.

StreamDeserializer(const unordered_map<string, pair<string, string>> &sym2tableName, DBConnection *pconn = nullptr);

(2) sym2schema - Constructed by the mapping between symbol and the table schema.

StreamDeserializer(const unordered_map<string, DictionarySP> &sym2schema);

(3) symbol2col - Constructed by the mapping between symbol and data types for each table columns.

StreamDeserializer(const unordered_map<string, vector<DATA_TYPE>> &symbol2col);

Usage Example

The following example uses the DolphinDB server version 2.00.10.

DolphinDB script: Create two tables with different schemata and replay them to a single heterogeneous stream table.

st2 = streamTable(100:0, `timestampv`sym`blob`price1,[TIMESTAMP,SYMBOL,BLOB,DOUBLE])
share st2 as SDoutTables

n = 5;
table1 = table(100:0, `datetimev`timestampv`sym`price1`price2, [DATETIME, TIMESTAMP, SYMBOL, DOUBLE, DOUBLE]);
table2 = table(100:0, `datetimev`timestampv`sym`price1, [DATETIME, TIMESTAMP, SYMBOL, DOUBLE]);
share table1 as table1_SDPT
share table2 as table2_SDPT
tableInsert(table1_SDPT, 2012.01.01T01:21:23 + 1..n, 2018.12.01T01:21:23.000 + 1..n, take(`a`b`c,n), rand(100,n)+rand(1.0, n), rand(100,n)+rand(1.0, n));
tableInsert(table2_SDPT, 2012.01.01T01:21:23 + 1..n, 2018.12.01T01:21:23.000 + 1..n, take(`a`b`c,n), rand(100,n)+rand(1.0, n));
d = dict(['msg1','msg2'], [table1_SDPT, table2_SDPT]);
replay(inputTables=d, outputTables=`SDoutTables, dateColumn=`timestampv, timeColumn=`timestampv)

Example 1

Construct deserializer with the mapping between symbol and table.

int main(int argc, const char **argv)
{
    std::unordered_map<std::string, std::pair<std::string, std::string>> sym2table{{"msg1", {"", "table1_SDPT"}}, {"msg2", {"", "table2_SDPT"}}};
    StreamDeserializerSP sdsp = new StreamDeserializer(sym2table);
    auto onehandler = [&](Message msg)
    {
        std::cout << "symbol: " << msg.getSymbol() << " content: " << msg->getString() << std::endl;
    };
    ThreadedClient threadedClient(0);
    threadedClient.subscribe("127.0.0.1", 8848, onehandler, "SDoutTables", "test_SD", 0, true, nullptr, false, false, "admin", "123456", sdsp);
    Util::sleep(1000);
    threadedClient.unsubscribe("127.0.0.1", 8848, "SDoutTables", "test_SD");
}

Example 2

Construct deserializer with the mapping between symbol and the table schema.

int main(int argc, const char **argv)
{
    DBConnection conn;
    conn.connect("127.0.0.1", 8848, "admin", "123456");
    DictionarySP t1s = conn.run("schema(table1_SDPT)");
    DictionarySP t2s = conn.run("schema(table2_SDPT)");
    unordered_map<string, DictionarySP> sym2schema;
    sym2schema["msg1"] = t1s;
    sym2schema["msg2"] = t2s;
    StreamDeserializerSP sdsp = new StreamDeserializer(sym2schema);
    auto onehandler = [&](Message msg)
    {
        std::cout << "symbol: " << msg.getSymbol() << " content: " << msg->getString() << std::endl;
    };
    ThreadedClient threadedClient(0);
    threadedClient.subscribe("127.0.0.1", 8848, onehandler, "SDoutTables", "test_SD", 0, true, nullptr, false, false, "admin", "123456", sdsp);
    Util::sleep(1000);
    threadedClient.unsubscribe("127.0.0.1", 8848, "SDoutTables", "test_SD");
}

Example 3

Construct deserializer with the mapping between symbol and data types for each table columns.

int main(int argc, const char **argv)
{
    unordered_map<string, std::vector<DATA_TYPE>> sym2cols{{"msg1", {DT_DATETIME, DT_TIMESTAMP, DT_SYMBOL, DT_DOUBLE, DT_DOUBLE}}, {"msg2", {DT_DATETIME, DT_TIMESTAMP, DT_SYMBOL, DT_DOUBLE}}};
    StreamDeserializerSP sdsp = new StreamDeserializer(sym2cols);
    auto onehandler = [&](Message msg)
    {
        std::cout << "symbol: " << msg.getSymbol() << " content: " << msg->getString() << std::endl;
    };
    ThreadedClient threadedClient(0);
    threadedClient.subscribe("127.0.0.1", 8848, onehandler, "SDoutTables", "test_SD", 0, true, nullptr, false, false, "admin", "123456", sdsp);
    Util::sleep(1000);
    threadedClient.unsubscribe("127.0.0.1", 8848, "SDoutTables", "test_SD");
}