Heterogeneous Stream Subscription
Starting from DolphinDB server 1.30.17/2.00.5, DolphinDB server supports replaying
multiple stream tables with different schemata into one stream table which is called a
heterogeneous stream table. Starting from Java API 1.30.19, API provides the
StreamDeserializer
class to construct deserializers for
heterogeneous stream tables.
Refer to DolphinDB - streamFilter for more detailed
instructions on using StreamDeserializer
.
Constructor
You can construct a deserializer for heterogeneous table with
streamDeserializer
.
(1) With specified table schema:
- specify
schema
StreamDeserializer(Map<String, BasicDictionary> filters)
- specify column
types
StreamDeserializer(HashMap<String, List<Entity.DATA_TYPE>> filters)
(2) With specified table:
StreamDeserializer(Map<String, Pair<String, String>> tableNames, DBConnection conn)
Code example:
//Supposing the inputTables to be replayed is: //d = dict(['msg1', 'msg2'], [table1, table2]); \ //replay(inputTables = d, outputTables = `outTables, dateColumn = `timestampv, timeColumn = `timestampv)"; //create a deserializer for heterogeneous table {//specify schema BasicDictionary table1_schema = (BasicDictionary)conn.run("table1.schema()"); BasicDictionary table2_schema = (BasicDictionary)conn.run("table2.schema()"); Map<String,BasicDictionary > tables = new HashMap<>(); tables.put("msg1", table1_schema); tables.put("msg2", table2_schema); StreamDeserializer streamFilter = new StreamDeserializer(tables); } {//or specify column types Entity.DATA_TYPE[] array1 = {DT_DATETIME,DT_TIMESTAMP,DT_SYMBOL,DT_DOUBLE,DT_DOUBLE}; Entity.DATA_TYPE[] array2 = {DT_DATETIME,DT_TIMESTAMP,DT_SYMBOL,DT_DOUBLE}; List<Entity.DATA_TYPE> filter1 = new ArrayList<>(Arrays.asList(array1)); List<Entity.DATA_TYPE> filter2 = new ArrayList<>(Arrays.asList(array2)); HashMap<String, List<Entity.DATA_TYPE>> filter = new HashMap<>(); filter.put("msg1",filter1); filter.put("msg2",filter2); StreamDeserializer streamFilter = new StreamDeserializer(filter); } {//specify tables Map<String, Pair<String, String>> tables = new HashMap<>(); tables.put("msg1", new Pair<>("", "table1")); tables.put("msg2", new Pair<>("", "table2")); //conn is an optional parameter StreamDeserializer streamFilter = new StreamDeserializer(tables, conn); }
Subscription Options
(1) subscribe to a heterogeneous table using ThreadedClient
:
-
specify the parameter deserializer for function
subscribe
so as to deserialize the table when data is ingested:ThreadedClient client = new ThreadedClient(8676); client.subscribe("192.168.0.21", 9002,"Trades1","subTread1", handler, -1, true, filter, streamFilter);
-
or add the
streamFilter
to user-defined handler:public class CustomHandler implements MessageHandler { private StreamDeserializer deserializer_; private List<BasicMessage> msg1 = new ArrayList<>(); private List<BasicMessage> msg2 = new ArrayList<>(); public CustomHandler(StreamDeserializer deserializer) { deserializer_ = deserializer; } public void batchHandler(List<IMessage> msgs) { } public void doEvent(IMessage msg) { try { BasicMessage message = deserializer_.parse(msg); if (message.getSym().equals("msg1")) { msg1.add(message); } else if (message.getSym().equals("msg2")) { msg2.add(message); } } catch (Exception e) { e.printStackTrace(); } } public List<BasicMessage> getMsg1() { return msg1; } public List<BasicMessage> getMsg2() { return msg2; } } // create a stream table conn.run("st11 = streamTable(100:0, `timestampv`id`blob`price1,[TIMESTAMP,INT,BLOB,DOUBLE])\n" + "enableTableShareAndPersistence(table=st11, tableName=`outTables, asynWrite=true, compress=true, cacheSize=200000, retentionMinutes=180, preCache = 0)\t\n"); Map<String, Pair<String, String>> tables = new HashMap<>(); StreamDeserializer streamFilter = new StreamDeserializer(tables, conn); CustomHandler handler = new CustomHandler(streamFilter); ThreadedClient client = new ThreadedClient(8848); client.subscribe("192.168.0.21", 9002, "outTables", "mutiSchema", handler, 0, true); conn.run("t = table(timestamp(1 2) as a,1 2 as b,blob(`a`b) as c,1.1 2.1 as d)\n" + "outTables.append!(t)"); List<BasicMessage> msg1 = handler.getMsg1(); List<BasicMessage> msg2 = handler.getMsg2(); client.unsubscribe("192.168.0.21", 9002, "outTables", "mutiSchema");
(2) Subscribing to a heterogeneous table using ThreadPooledClient
is
similar as above:
-
specify the parameter deserializer of function
subscribe
CustomHandler handler = new CustomHandler(streamFilter); ThreadPooledClient client1 = new ThreadPooledClient(9050, 10); // client.subscribe(hostName, port, tableName, actionName, handler, 0, true); // client.subscribe("192.168.0.21", 9002, "Trades", handler); // client.subscribe("192.168.0.21", 9002, "outTables", "mutiSchema", handler, 0, true); client.subscribe("192.168.0.21", 9002,"Trades1","subTread1", handler, -1, true, filter, streamFilter);
-
or add the
streamFilter
to user-defined handler:CustomHandler handler = new CustomHandler(streamFilter); ThreadPooledClient client1 = new ThreadPooledClient(9050, 10); // client.subscribe(hostName, port, tableName, actionName, handler, 0, true); // client.subscribe("192.168.0.21", 9002, "Trades", handler); // client.subscribe("192.168.0.21", 9002, "outTables", "mutiSchema", handler, 0, true); client.subscribe("192.168.0.21", 9002,"Trades1","subTread1", handler, -1, true, filter, streamFilter);
(3) As PollingClient
does not support callbacks, you can only pass
the deserializer parameter to the function subscribe
:
PollingClient client = new PollingClient(9050); TopicPoller poller1 = client.subscribe("192.168.0.21", 9002, "Trades", "subTread1", 0, true, null, streamFilter,"","",true); ArrayList<IMessage> msg1; List<String> colNames = Arrays.asList("tag","ts","data"); List<Vector> colData = Arrays.asList(new BasicIntVector(0),new BasicTimestampVector(0),new BasicDoubleVector(0)); BasicTable bt = new BasicTable(colNames,colData); conn.run("n=5000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)"); msg1 = poller1.poll(1000, 10000);