IPC In-memory Table Subscription

The C++ API provides the IPCInMemoryStreamClient class to subscribe to an IPC in-memory table. In scenarios with extremely high latency requirements, user processes can directly access shared memory to obtain data from server through APIs, which greatly reduces the latency of network transmissions. Because the processes access the same shared memory, the publish side and the subscribe side must be located on the same server.

This section describes how to subscribe to shared memory tables through the C++ API.

Creating Subscription

The function declaration is as follows:

using IPCInMemoryTableReadHandler = std::function<void(ConstantSP)>;
ThreadSP subscribe(const string& tableName, const IPCInMemoryTableReadHandler& handler, TableSP outputTable = nullptr, bool overwrite = true);
  • tableName: The name of the subscribed stream table.
  • handler: A user-defined function to process the received data.
  • outputTable: optional. The output table with the same schema of the subscribed table. It is used to store the incoming data of the subsribed table.
  • overwrite: optional. Whether to overwrite the previous record in the outputTable when receiving a new message.

Canceling Subscription

The function declaration is as follows:

void unsubscribe(const string& tableName);
  • tableName: The name of the subscribed stream table.

Usage Example

The following example uses the DolphinDB server version 2.00.10.

DolphinDB script: Create a stream table and an IPC in-memory table. Subscribe to the stream table with function subscribeTable. During the process of inserting data into the stream table, the data will be written to the IPC in-memory table.

share streamTable(10000:0,`timestamp`temperature, [TIMESTAMP,DOUBLE]) as pubTable;
share createIPCInMemoryTable(1000000, "pubTable", `timestamp`temperature, [TIMESTAMP, DOUBLE]) as shm_test;

def shm_append(msg) {
    shm_test.append!(msg)
}
topic2 = subscribeTable(tableName="pubTable", actionName="act3", offset=0, handler=shm_append, msgAsTable=true)

C++ Code:

void print(TableSP table) {
    std::cout << table->getString() << std::endl;
}

int main(int argc, const char **argv)
{
    string tableName = "pubTable";
    //construct an in-memory table
    IPCInMemoryStreamClient memTable;

    //create an output table whose schema is the same as IPC in-memory table
    vector<string> colNames = {"timestamp", "temperature"};
    vector<DATA_TYPE> colTypes = {DT_TIMESTAMP, DT_DOUBLE};
    int rowNum = 0, indexCapacity=10000;
    TableSP outputTable = Util::createTable(colNames, colTypes, rowNum, indexCapacity); 

    //overwrite option is disabled
    bool overwrite = false;
    ThreadSP thread = memTable.subscribe(tableName, print, nullptr, overwrite);
    Util::sleep(10000);
    //cancel the subscription
    memTable.unsubscribe(tableName);
}