// 登录数据库
login(`admin, `123456)

/*
// 从插件市场下载安装 nsq 插件
listRemotePlugins("nsq")
installPlugin("nsq")
*/

// 加载插件
try{ loadPlugin("nsq") } catch(ex) { print(ex) }
go
// 调用模块
use DolphinDBModules::easyNSQ


// nsq 行情配置文件路径
configFilePath = "<your_path_to>/nsq_sdk_config.ini";
// nsq 账号(非必填)
nsq_username = "<your_nsq_username>";
nsq_password = "<your_nsq_password>";
// 数据接收选项(非必填)
nsq_data_option = dict(STRING, ANY)
nsq_data_option["receivedTime"]=true  // 在行情数据中增加一列接收时间
nsq_data_option["getAllFieldNames"]=true // 接受 nsq 原始行情中所有字段

/*
configFilePath = "/home/appadmin/mqzhu/ddb_20011/server/uploads/sdk_config.ini";
nsq_username = "";
nsq_password = "";
*/


/** 例1 仅从NSQ接收深圳市场snapshot类型的实时行情数据到流数据表,不存储到分区表(流表使用模块提供的默认名字) */

// 初始化流环境(清理所有相关流表及其订阅)
iniNsqEnv()

// 拉起订阅
streamTableNames = subscribeNsq(configFilePath, "snapshot", "sz", options=nsq_data_option)

// 检查订阅情况
nsq::getSubscriptionStatus()
// select count(*) from objByName(streamTableNames[0])
// select top 100 * from objByName(streamTableNames[0])

// 停止订阅
nsq::unsubscribe("snapshot", "sz")
nsq::getSubscriptionStatus()

/** 例2 从NSQ接收上海市场的实时行情数据到流数据表,并在分区表中持久化存储(流表和分区表使用模块提供的默认名字) */ 

// 初始化所有相关的流环境和分区表
iniNsqEnv()
iniNsqDfs() // 注意,该函数会删除数据库中的分区表,谨慎使用

// 订阅上海市场orders,trade,snapshot行情数据,并持久化存储
subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true, options=nsq_data_option)
subscribeNsq(configFilePath, "trade", "sh", saveToDfs=true, options=nsq_data_option)
subscribeNsq(configFilePath, "snapshot", "sh", saveToDfs=true, options=nsq_data_option)

// 检查订阅情况
nsq::getSubscriptionStatus()
// select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%"
// select count(*) from objByName("nsqStockOrdersSHStream")
// select count(*) from loadTable("dfs://nsqStockOrders", "ordersSH")
// select count(*) from objByName("nsqStockTradeSHStream")
// select count(*) from loadTable("dfs://nsqStockTrade", "tradeSH")
// select count(*) from objByName("nsqStockSnapshotSHStream")
// select count(*) from loadTable("dfs://nsqStockSnapshot", "snapshotSH")

// 仅停止orders行情数据的订阅
nsq::unsubscribe("orders", "sh")
nsq::getSubscriptionStatus()

// 关闭与nsq的连接,并停止所有订阅
closeNsqConnection()

/** 例3.1 停止例2中的订阅后,重新接收上海市场orders数据,保留之前订阅持久化到分区表的数据 */
// 初始化流环境,仅清理 nsqStockOrdersSHStream 流表
iniNsqEnv("nsqStockOrdersSHStream")

// 订阅上海市场orders行情数据,并持久化存储
streamTableNames, dbPath, tableNames = subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password)

// 检查订阅情况
nsq::getSubscriptionStatus()
// select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%"
// select count(*) from objByName(streamTableNames[0])
// select count(*) from loadTable(dbPath, tableNames[0])

// 停止订阅
nsq::unsubscribe("orders", "sh")
nsq::getSubscriptionStatus()

/** 例3.2 停止例2中的订阅后,重新接收上海市场orders数据,且不保留之前订阅持久化到分区表的数据 */

// 初始化流环境和分布式表
iniNsqEnv("nsqStockOrdersSHStream")
iniNsqDfs("dfs://nsqStockOrders", "ordersSH") // 注意,该函数会删除数据库中的分区表,谨慎使用

// 订阅上海市场orders行情数据,并持久化存储
subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password)

// 检查订阅情况
nsq::getSubscriptionStatus()
// select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%"
// select count(*) from objByName("nsqStockOrdersSHStream")
// select count(*) from loadTable("dfs://nsqStockOrders", "ordersSH")

// 停止订阅
nsq::unsubscribe("orders", "sh")
nsq::getSubscriptionStatus()

/** 例4.1 从NSQ接收上海和深圳市场的实时行情数据到流数据表,并在分区表中持久化存储,上海市场和深圳市场的数据合并处理(流表和分区表使用模块提供的默认名字) */

// 初始化流环境和分布式表
iniNsqEnv()
iniNsqDfs() // 注意,该函数会删除数据库中的分区表,谨慎使用

// 订阅上海和深圳市场orders,trade,snapshot行情数据,并持久化存储
subscribeNsq(configFilePath, "orders", ["sh","sz"], merge=true, saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password)
subscribeNsq(configFilePath, "trade", ["sh","sz"], merge=true, saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password)
subscribeNsq(configFilePath, "snapshot", ["sh","sz"], merge=true, saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password)

// 检查订阅情况
nsq::getSubscriptionStatus()
// select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%"

// 停止订阅
closeNsqConnection()

// 清理流表和分区表
iniNsqEnv()
iniNsqDfs() // 注意,该函数会删除数据库中的分区表,谨慎使用

/** 例4.2 从NSQ接收上海和深圳市场的实时行情数据到流数据表,并在分区表中持久化存储,上海市场和深圳市场的数据分开处理(流表和分区表使用自定义名字) */

// 初始化流环境和分布式表
iniNsqEnv(["myNsqOrdersSHStream", "myNsqOrdersSZStream", "myNsqTradeSHStream", "myNsqTradeSZStream","myNsqSnapshotSHStream", "myNsqSnapshotSZStream"])
each(iniNsqDfs{"dfs://myNsqOrders"}, `myNsqOrdersSH`myNsqOrdersSZ)  // 注意,该函数会删除数据库中的分区表,谨慎使用
each(iniNsqDfs{"dfs://myNsqTrade"}, `myNsqTradeSH`myNsqTradeSZ)  // 注意,该函数会删除数据库中的分区表,谨慎使用
each(iniNsqDfs{"dfs://myNsqSnapshot"}, `myNsqSnapshotSH`myNsqSnapshotSZ)  // 注意,该函数会删除数据库中的分区表,谨慎使用

// 订阅上海和深圳市场orders,trade,snapshot行情数据,并持久化存储
subscribeNsq(configFilePath, "orders", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqOrdersSHStream", "myNsqOrdersSZStream"], dbPath="dfs://myNsqOrders", tableNames=["myNsqOrdersSH", "myNsqOrdersSZ"], options=nsq_data_option, username=nsq_username, password=nsq_password)
subscribeNsq(configFilePath, "trade", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqTradeSHStream", "myNsqTradeSZStream"], dbPath="dfs://myNsqTrade", tableNames=["myNsqTradeSH", "myNsqTradeSZ"], options=nsq_data_option, username=nsq_username, password=nsq_password)
subscribeNsq(configFilePath, "snapshot", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqSnapshotSHStream", "myNsqSnapshotSZStream"], dbPath="dfs://myNsqSnapshot", tableNames=["myNsqSnapshotSH", "myNsqSnapshotSZ"], options=nsq_data_option, username=nsq_username, password=nsq_password)

// 检查订阅情况
nsq::getSubscriptionStatus()
// select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%"

// 停止订阅
closeNsqConnection()

// 清理流表和分区表
iniNsqEnv(["myNsqOrdersSHStream", "myNsqOrdersSZStream", "myNsqTradeSHStream", "myNsqTradeSZStream","myNsqSnapshotSHStream", "myNsqSnapshotSZStream"])
each(iniNsqDfs{"dfs://myNsqOrders"}, `myNsqOrdersSH`myNsqOrdersSZ)  // 注意,该函数会删除数据库中的分区表,谨慎使用
each(iniNsqDfs{"dfs://myNsqTrade"}, `myNsqTradeSH`myNsqTradeSZ)  // 注意,该函数会删除数据库中的分区表,谨慎使用
each(iniNsqDfs{"dfs://myNsqSnapshot"}, `myNsqSnapshotSH`myNsqSnapshotSZ)  // 注意,该函数会删除数据库中的分区表,谨慎使用