// 登录数æ®åº“ login(`admin, `123456) /* // 从æ’件市场下载安装 nsq æ’件 listRemotePlugins("nsq") installPlugin("nsq") */ // åŠ è½½æ’件 try{ loadPlugin("nsq") } catch(ex) { print(ex) } go // è°ƒç”¨æ¨¡å— use DolphinDBModules::easyNSQ // nsq 行情é…置文件路径 configFilePath = "<your_path_to>/nsq_sdk_config.ini"; // nsq è´¦å·ï¼ˆéžå¿…填) nsq_username = "<your_nsq_username>"; nsq_password = "<your_nsq_password>"; // æ•°æ®æŽ¥æ”¶é€‰é¡¹ï¼ˆéžå¿…填) nsq_data_option = dict(STRING, ANY) nsq_data_option["receivedTime"]=true // 在行情数æ®ä¸å¢žåŠ 一列接收时间 nsq_data_option["getAllFieldNames"]=true // æŽ¥å— nsq 原始行情ä¸æ‰€æœ‰å—段 /* configFilePath = "/home/appadmin/mqzhu/ddb_20011/server/uploads/sdk_config.ini"; nsq_username = ""; nsq_password = ""; */ /** 例1 仅从NSQ接收深圳市场snapshot类型的实时行情数æ®åˆ°æµæ•°æ®è¡¨ï¼Œä¸å˜å‚¨åˆ°åˆ†åŒºè¡¨ï¼ˆæµè¡¨ä½¿ç”¨æ¨¡å—æ供的默认åå—) */ // åˆå§‹åŒ–æµçŽ¯å¢ƒï¼ˆæ¸…ç†æ‰€æœ‰ç›¸å…³æµè¡¨åŠå…¶è®¢é˜…) iniNsqEnv() // 拉起订阅 streamTableNames = subscribeNsq(configFilePath, "snapshot", "sz", options=nsq_data_option) // 检查订阅情况 nsq::getSubscriptionStatus() // select count(*) from objByName(streamTableNames[0]) // select top 100 * from objByName(streamTableNames[0]) // åœæ¢è®¢é˜… nsq::unsubscribe("snapshot", "sz") nsq::getSubscriptionStatus() /** 例2 从NSQ接收上海市场的实时行情数æ®åˆ°æµæ•°æ®è¡¨ï¼Œå¹¶åœ¨åˆ†åŒºè¡¨ä¸æŒä¹…化å˜å‚¨ï¼ˆæµè¡¨å’Œåˆ†åŒºè¡¨ä½¿ç”¨æ¨¡å—æ供的默认åå—) */ // åˆå§‹åŒ–所有相关的æµçŽ¯å¢ƒå’Œåˆ†åŒºè¡¨ iniNsqEnv() iniNsqDfs() // 注æ„ï¼Œè¯¥å‡½æ•°ä¼šåˆ é™¤æ•°æ®åº“ä¸çš„分区表,谨慎使用 // 订阅上海市场orders,trade,snapshot行情数æ®ï¼Œå¹¶æŒä¹…化å˜å‚¨ subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true, options=nsq_data_option) subscribeNsq(configFilePath, "trade", "sh", saveToDfs=true, options=nsq_data_option) subscribeNsq(configFilePath, "snapshot", "sh", saveToDfs=true, options=nsq_data_option) // 检查订阅情况 nsq::getSubscriptionStatus() // select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // select count(*) from objByName("nsqStockOrdersSHStream") // select count(*) from loadTable("dfs://nsqStockOrders", "ordersSH") // select count(*) from objByName("nsqStockTradeSHStream") // select count(*) from loadTable("dfs://nsqStockTrade", "tradeSH") // select count(*) from objByName("nsqStockSnapshotSHStream") // select count(*) from loadTable("dfs://nsqStockSnapshot", "snapshotSH") // ä»…åœæ¢orders行情数æ®çš„订阅 nsq::unsubscribe("orders", "sh") nsq::getSubscriptionStatus() // å…³é—与nsq的连接,并åœæ¢æ‰€æœ‰è®¢é˜… closeNsqConnection() /** 例3.1 åœæ¢ä¾‹2ä¸çš„订阅åŽï¼Œé‡æ–°æŽ¥æ”¶ä¸Šæµ·å¸‚场ordersæ•°æ®ï¼Œä¿ç•™ä¹‹å‰è®¢é˜…æŒä¹…åŒ–åˆ°åˆ†åŒºè¡¨çš„æ•°æ® */ // åˆå§‹åŒ–æµçŽ¯å¢ƒï¼Œä»…æ¸…ç† nsqStockOrdersSHStream æµè¡¨ iniNsqEnv("nsqStockOrdersSHStream") // 订阅上海市场orders行情数æ®ï¼Œå¹¶æŒä¹…化å˜å‚¨ streamTableNames, dbPath, tableNames = subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password) // 检查订阅情况 nsq::getSubscriptionStatus() // select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // select count(*) from objByName(streamTableNames[0]) // select count(*) from loadTable(dbPath, tableNames[0]) // åœæ¢è®¢é˜… nsq::unsubscribe("orders", "sh") nsq::getSubscriptionStatus() /** 例3.2 åœæ¢ä¾‹2ä¸çš„订阅åŽï¼Œé‡æ–°æŽ¥æ”¶ä¸Šæµ·å¸‚场ordersæ•°æ®ï¼Œä¸”ä¸ä¿ç•™ä¹‹å‰è®¢é˜…æŒä¹…åŒ–åˆ°åˆ†åŒºè¡¨çš„æ•°æ® */ // åˆå§‹åŒ–æµçŽ¯å¢ƒå’Œåˆ†å¸ƒå¼è¡¨ iniNsqEnv("nsqStockOrdersSHStream") iniNsqDfs("dfs://nsqStockOrders", "ordersSH") // 注æ„ï¼Œè¯¥å‡½æ•°ä¼šåˆ é™¤æ•°æ®åº“ä¸çš„分区表,谨慎使用 // 订阅上海市场orders行情数æ®ï¼Œå¹¶æŒä¹…化å˜å‚¨ subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password) // 检查订阅情况 nsq::getSubscriptionStatus() // select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // select count(*) from objByName("nsqStockOrdersSHStream") // select count(*) from loadTable("dfs://nsqStockOrders", "ordersSH") // åœæ¢è®¢é˜… nsq::unsubscribe("orders", "sh") nsq::getSubscriptionStatus() /** 例4.1 从NSQ接收上海和深圳市场的实时行情数æ®åˆ°æµæ•°æ®è¡¨ï¼Œå¹¶åœ¨åˆ†åŒºè¡¨ä¸æŒä¹…化å˜å‚¨ï¼Œä¸Šæµ·å¸‚场和深圳市场的数æ®åˆå¹¶å¤„ç†ï¼ˆæµè¡¨å’Œåˆ†åŒºè¡¨ä½¿ç”¨æ¨¡å—æ供的默认åå—) */ // åˆå§‹åŒ–æµçŽ¯å¢ƒå’Œåˆ†å¸ƒå¼è¡¨ iniNsqEnv() iniNsqDfs() // 注æ„ï¼Œè¯¥å‡½æ•°ä¼šåˆ é™¤æ•°æ®åº“ä¸çš„分区表,谨慎使用 // 订阅上海和深圳市场orders,trade,snapshot行情数æ®,并æŒä¹…化å˜å‚¨ subscribeNsq(configFilePath, "orders", ["sh","sz"], merge=true, saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password) subscribeNsq(configFilePath, "trade", ["sh","sz"], merge=true, saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password) subscribeNsq(configFilePath, "snapshot", ["sh","sz"], merge=true, saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password) // 检查订阅情况 nsq::getSubscriptionStatus() // select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // åœæ¢è®¢é˜… closeNsqConnection() // 清ç†æµè¡¨å’Œåˆ†åŒºè¡¨ iniNsqEnv() iniNsqDfs() // 注æ„ï¼Œè¯¥å‡½æ•°ä¼šåˆ é™¤æ•°æ®åº“ä¸çš„分区表,谨慎使用 /** 例4.2 从NSQ接收上海和深圳市场的实时行情数æ®åˆ°æµæ•°æ®è¡¨ï¼Œå¹¶åœ¨åˆ†åŒºè¡¨ä¸æŒä¹…化å˜å‚¨ï¼Œä¸Šæµ·å¸‚场和深圳市场的数æ®åˆ†å¼€å¤„ç†ï¼ˆæµè¡¨å’Œåˆ†åŒºè¡¨ä½¿ç”¨è‡ªå®šä¹‰åå—) */ // åˆå§‹åŒ–æµçŽ¯å¢ƒå’Œåˆ†å¸ƒå¼è¡¨ iniNsqEnv(["myNsqOrdersSHStream", "myNsqOrdersSZStream", "myNsqTradeSHStream", "myNsqTradeSZStream","myNsqSnapshotSHStream", "myNsqSnapshotSZStream"]) each(iniNsqDfs{"dfs://myNsqOrders"}, `myNsqOrdersSH`myNsqOrdersSZ) // 注æ„ï¼Œè¯¥å‡½æ•°ä¼šåˆ é™¤æ•°æ®åº“ä¸çš„分区表,谨慎使用 each(iniNsqDfs{"dfs://myNsqTrade"}, `myNsqTradeSH`myNsqTradeSZ) // 注æ„ï¼Œè¯¥å‡½æ•°ä¼šåˆ é™¤æ•°æ®åº“ä¸çš„分区表,谨慎使用 each(iniNsqDfs{"dfs://myNsqSnapshot"}, `myNsqSnapshotSH`myNsqSnapshotSZ) // 注æ„ï¼Œè¯¥å‡½æ•°ä¼šåˆ é™¤æ•°æ®åº“ä¸çš„分区表,谨慎使用 // 订阅上海和深圳市场orders,trade,snapshot行情数æ®,并æŒä¹…化å˜å‚¨ subscribeNsq(configFilePath, "orders", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqOrdersSHStream", "myNsqOrdersSZStream"], dbPath="dfs://myNsqOrders", tableNames=["myNsqOrdersSH", "myNsqOrdersSZ"], options=nsq_data_option, username=nsq_username, password=nsq_password) subscribeNsq(configFilePath, "trade", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqTradeSHStream", "myNsqTradeSZStream"], dbPath="dfs://myNsqTrade", tableNames=["myNsqTradeSH", "myNsqTradeSZ"], options=nsq_data_option, username=nsq_username, password=nsq_password) subscribeNsq(configFilePath, "snapshot", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqSnapshotSHStream", "myNsqSnapshotSZStream"], dbPath="dfs://myNsqSnapshot", tableNames=["myNsqSnapshotSH", "myNsqSnapshotSZ"], options=nsq_data_option, username=nsq_username, password=nsq_password) // 检查订阅情况 nsq::getSubscriptionStatus() // select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // åœæ¢è®¢é˜… closeNsqConnection() // 清ç†æµè¡¨å’Œåˆ†åŒºè¡¨ iniNsqEnv(["myNsqOrdersSHStream", "myNsqOrdersSZStream", "myNsqTradeSHStream", "myNsqTradeSZStream","myNsqSnapshotSHStream", "myNsqSnapshotSZStream"]) each(iniNsqDfs{"dfs://myNsqOrders"}, `myNsqOrdersSH`myNsqOrdersSZ) // 注æ„ï¼Œè¯¥å‡½æ•°ä¼šåˆ é™¤æ•°æ®åº“ä¸çš„分区表,谨慎使用 each(iniNsqDfs{"dfs://myNsqTrade"}, `myNsqTradeSH`myNsqTradeSZ) // 注æ„ï¼Œè¯¥å‡½æ•°ä¼šåˆ é™¤æ•°æ®åº“ä¸çš„分区表,谨慎使用 each(iniNsqDfs{"dfs://myNsqSnapshot"}, `myNsqSnapshotSH`myNsqSnapshotSZ) // 注æ„ï¼Œè¯¥å‡½æ•°ä¼šåˆ é™¤æ•°æ®åº“ä¸çš„分区表,谨慎使用