IPC In-memory Table Subscription
The C++ API provides the IPCInMemoryStreamClient
class to subscribe to
an IPC in-memory table. In scenarios with extremely high latency requirements, user
processes can directly access shared memory to obtain data from server through APIs,
which greatly reduces the latency of network transmissions. Because the processes access
the same shared memory, the publish side and the subscribe side must be located on the
same server.
This section describes how to subscribe to shared memory tables through the C++ API.
Creating Subscription
The function declaration is as follows:
using IPCInMemoryTableReadHandler = std::function<void(ConstantSP)>;
ThreadSP subscribe(const string& tableName, const IPCInMemoryTableReadHandler& handler, TableSP outputTable = nullptr, bool overwrite = true);
- tableName: The name of the subscribed stream table.
- handler: A user-defined function to process the received data.
- outputTable: optional. The output table with the same schema of the subscribed table. It is used to store the incoming data of the subsribed table.
- overwrite: optional. Whether to overwrite the previous record in the outputTable when receiving a new message.
Canceling Subscription
The function declaration is as follows:
void unsubscribe(const string& tableName);
- tableName: The name of the subscribed stream table.
Usage Example
The following example uses the DolphinDB server version 2.00.10.
DolphinDB script: Create a stream table and an IPC in-memory table. Subscribe
to the stream table with function subscribeTable
. During the
process of inserting data into the stream table, the data will be written to the IPC
in-memory table.
share streamTable(10000:0,`timestamp`temperature, [TIMESTAMP,DOUBLE]) as pubTable;
share createIPCInMemoryTable(1000000, "pubTable", `timestamp`temperature, [TIMESTAMP, DOUBLE]) as shm_test;
def shm_append(msg) {
shm_test.append!(msg)
}
topic2 = subscribeTable(tableName="pubTable", actionName="act3", offset=0, handler=shm_append, msgAsTable=true)
C++ Code:
void print(TableSP table) {
std::cout << table->getString() << std::endl;
}
int main(int argc, const char **argv)
{
string tableName = "pubTable";
//construct an in-memory table
IPCInMemoryStreamClient memTable;
//create an output table whose schema is the same as IPC in-memory table
vector<string> colNames = {"timestamp", "temperature"};
vector<DATA_TYPE> colTypes = {DT_TIMESTAMP, DT_DOUBLE};
int rowNum = 0, indexCapacity=10000;
TableSP outputTable = Util::createTable(colNames, colTypes, rowNum, indexCapacity);
//overwrite option is disabled
bool overwrite = false;
ThreadSP thread = memTable.subscribe(tableName, print, nullptr, overwrite);
Util::sleep(10000);
//cancel the subscription
memTable.unsubscribe(tableName);
}