Heterogeneous Stream Subscription

Starting from DolphinDB server 1.30.17/2.00.5, DolphinDB server supports replaying multiple stream tables with different schemata into one stream table which is called a heterogeneous stream table. Starting from Java API 1.30.19, API provides the StreamDeserializer class to construct deserializers for heterogeneous stream tables.

Refer to DolphinDB - streamFilter for more detailed instructions on using StreamDeserializer.

Constructor

You can construct a deserializer for heterogeneous table with streamDeserializer.

(1) With specified table schema:

  • specify schema
    StreamDeserializer(Map<String, BasicDictionary> filters)
  • specify column types
    StreamDeserializer(HashMap<String, List<Entity.DATA_TYPE>> filters)

(2) With specified table:

StreamDeserializer(Map<String, Pair<String, String>> tableNames, DBConnection conn)

Code example:

//Supposing the inputTables to be replayed is:
//d = dict(['msg1', 'msg2'], [table1, table2]); \
//replay(inputTables = d, outputTables = `outTables, dateColumn = `timestampv, timeColumn = `timestampv)";

//create a deserializer for heterogeneous table 

{//specify schema
    BasicDictionary table1_schema = (BasicDictionary)conn.run("table1.schema()");
    BasicDictionary table2_schema = (BasicDictionary)conn.run("table2.schema()");
    Map<String,BasicDictionary > tables = new HashMap<>();
    tables.put("msg1", table1_schema);
    tables.put("msg2", table2_schema);
    StreamDeserializer streamFilter = new StreamDeserializer(tables);
}
{//or specify column types
    Entity.DATA_TYPE[] array1 = {DT_DATETIME,DT_TIMESTAMP,DT_SYMBOL,DT_DOUBLE,DT_DOUBLE};
    Entity.DATA_TYPE[] array2 = {DT_DATETIME,DT_TIMESTAMP,DT_SYMBOL,DT_DOUBLE};
    List<Entity.DATA_TYPE> filter1 = new ArrayList<>(Arrays.asList(array1));
    List<Entity.DATA_TYPE> filter2 = new ArrayList<>(Arrays.asList(array2));
    HashMap<String, List<Entity.DATA_TYPE>> filter = new HashMap<>();
    filter.put("msg1",filter1);
    filter.put("msg2",filter2);
    StreamDeserializer streamFilter = new StreamDeserializer(filter);
}
{//specify tables
    Map<String, Pair<String, String>> tables = new HashMap<>();
    tables.put("msg1", new Pair<>("", "table1"));
    tables.put("msg2", new Pair<>("", "table2"));
    //conn is an optional parameter
    StreamDeserializer streamFilter = new StreamDeserializer(tables, conn);
}

Subscription Options

(1) subscribe to a heterogeneous table using ThreadedClient:

  • specify the parameter deserializer for function subscribe so as to deserialize the table when data is ingested:

    ThreadedClient client = new ThreadedClient(8676);
    client.subscribe("192.168.0.21", 9002,"Trades1","subTread1", handler, -1, true, filter, streamFilter);
  • or add the streamFilter to user-defined handler:

    public class CustomHandler implements MessageHandler {
        private StreamDeserializer deserializer_;
        private List<BasicMessage> msg1 = new ArrayList<>();
        private List<BasicMessage> msg2 = new ArrayList<>();
    
        public CustomHandler(StreamDeserializer deserializer) {
            deserializer_ = deserializer;
        }
        public void batchHandler(List<IMessage> msgs) {
        }
    
        public void doEvent(IMessage msg) {
            try {
                    BasicMessage message = deserializer_.parse(msg);
                    if (message.getSym().equals("msg1")) {
                        msg1.add(message);
                    } else if (message.getSym().equals("msg2")) {
                        msg2.add(message);
                    }
            } catch (Exception e) {
                    e.printStackTrace();
            }
        }
    
        public List<BasicMessage> getMsg1() {
            return msg1;
        }
        public List<BasicMessage> getMsg2() {
            return msg2;
        }
    }
    
    // create a stream table
    conn.run("st11 = streamTable(100:0, `timestampv`id`blob`price1,[TIMESTAMP,INT,BLOB,DOUBLE])\n" +
                    "enableTableShareAndPersistence(table=st11, tableName=`outTables, asynWrite=true, compress=true, cacheSize=200000, retentionMinutes=180, preCache = 0)\t\n");
    
    Map<String, Pair<String, String>> tables = new HashMap<>();
    StreamDeserializer streamFilter = new StreamDeserializer(tables, conn);
    CustomHandler handler = new CustomHandler(streamFilter);
    ThreadedClient client = new ThreadedClient(8848);
    client.subscribe("192.168.0.21", 9002, "outTables", "mutiSchema", handler, 0, true);
    conn.run("t = table(timestamp(1 2) as a,1 2 as b,blob(`a`b) as c,1.1 2.1 as d)\n" +
                    "outTables.append!(t)");
    List<BasicMessage> msg1 = handler.getMsg1();
    List<BasicMessage> msg2 = handler.getMsg2();
    client.unsubscribe("192.168.0.21", 9002, "outTables", "mutiSchema");

(2) Subscribing to a heterogeneous table using ThreadPooledClient is similar as above:

  • specify the parameter deserializer of function subscribe

    CustomHandler handler = new CustomHandler(streamFilter);
    ThreadPooledClient client1 = new ThreadPooledClient(9050, 10);
    // client.subscribe(hostName, port, tableName, actionName, handler, 0, true);
    // client.subscribe("192.168.0.21", 9002, "Trades", handler);
    // client.subscribe("192.168.0.21", 9002, "outTables", "mutiSchema", handler, 0, true);
    client.subscribe("192.168.0.21", 9002,"Trades1","subTread1", handler, -1, true, filter, streamFilter);
  • or add the streamFilter to user-defined handler:

    CustomHandler handler = new CustomHandler(streamFilter);
    ThreadPooledClient client1 = new ThreadPooledClient(9050, 10);
    // client.subscribe(hostName, port, tableName, actionName, handler, 0, true);
    // client.subscribe("192.168.0.21", 9002, "Trades", handler);
    // client.subscribe("192.168.0.21", 9002, "outTables", "mutiSchema", handler, 0, true);
    client.subscribe("192.168.0.21", 9002,"Trades1","subTread1", handler, -1, true, filter, streamFilter);

(3) As PollingClient does not support callbacks, you can only pass the deserializer parameter to the function subscribe:

 PollingClient client = new PollingClient(9050);
 TopicPoller poller1 = client.subscribe("192.168.0.21", 9002, "Trades", "subTread1", 0, true, null, streamFilter,"","",true);
 ArrayList<IMessage> msg1;
 List<String> colNames =  Arrays.asList("tag","ts","data");
 List<Vector> colData = Arrays.asList(new BasicIntVector(0),new BasicTimestampVector(0),new BasicDoubleVector(0));
 BasicTable bt = new BasicTable(colNames,colData);
 conn.run("n=5000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" +
         "Trades.append!(t)");
 msg1 = poller1.poll(1000, 10000);