From InfluxDB to DolphinDB

This tutorial introduces how to migrate data from InfluxDB to DolphinDB with an IoT example.

Migration Methods

There are multiple methods for accessing InfluxDB. Regarding data export, however, users tend to migrate data from InfluxDB to other platforms with APIs, which comes with a cost in development.

Tools such as Addax have been introduced to implement efficient data synchronization between different databases. We recommend Addax-DolphinDBWriter plugin, which is developed based on Addax, to migrate data from InfluxDB to DolphinDB.

Migration Steps with an IoT case

This tutorial takes IoT monitoring scenario as an example to demonstrate how to migrate device metrics from InfluxDB to DolphinDB.

Import Sample Data to InfluxDB

The sample data used in the following case is obtained from InfluxDB official documentation: Machine production sample data.

Note: Pay attention to the data types of the migrated data. For the specific data types supported by DolphinDB, you can refer to Data Types.

Create an InfluxDB bucket, and import sample data to the bucket (genMockData.flux).

import "influxdata/influxdb/sample"

sample.data(set: "machineProduction")
    |> to(bucket: "demo-bucket")

Create a DolphinDB Database and Table

Create a database and a table in DolphinDB to store the migrated data. You need to define a database with reasonable partition scheme according to the data to be migrated. For more instructions, see DolphinDB Partitioned Database Tutorial.

The following script creates database "dfs://demo" and partitioned table "pt" (createTable.dos).

login("admin","123456")

dbName="dfs://demo"
tbName="pt"

colNames=`time`stateionID`grinding_time`oil_temp`pressure`pressure_target`rework_time`state
colTypes=[NANOTIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,SYMBOL]
schemaTb=table(1:0,colNames,colTypes)

db = database(dbName, VALUE,2021.08.01..2022.12.31)
pt = db.createPartitionedTable(schemaTb, "pt", `time)

Deployment

  • Deploy Addax

There are two methods for installing Addax:

(1) Download the compiled binary package;

(2) Download and compile with the source code.

  • Deploy Addax-DolphinDBWriter Plugin

Download the compiled Addax-DolphinDBWriter plugin and copy it to the addax/plugin/writer directory. Alternatively, you can choose to compile on your own.

Configure a Migration Job

Create a configuration file to define the migration job based on Addax's job configuration. Pay attention to the format conversion of date and time values (refer to Convert the Date and Time Values).

Create a Configuration File

The specific content of the configuration file is as follows (influx2ddb.json). For detailed information on parameters, refer to Appendices.

{
  "job": {
    "content": {
      "reader": {
        "name": "influxdb2reader",
        "parameter": {
          "column": ["*"],
          "connection": [
            {
              "endpoint": "http://183.136.170.168:8086",
              "bucket": "demo-bucket",
              "table": [
                  "machinery"
              ],
              "org": "zhiyu"
            }
          ],
          "token": "GLiPjQFQIxzVO0-atASJHH4b075sTlyEZGrqW20XURkelUT5pOlfhi_Yuo2fjcSKVZvyuO00kdXunWPrpJd_kg==",
          "range": [
            "2007-08-09"
          ]
        }
      },
      "writer": {
        "name": "dolphindbwriter",
        "parameter": {
          "userId": "admin",
          "pwd": "123456",
          "host": "115.239.209.122",
          "port": 3134,
          "dbPath": "dfs://demo",
          "tableName": "pt",
          "batchSize": 1000000,
          "saveFunctionName": "transData",
          "saveFunctionDef": "def parseRFC3339(timeStr) {if(strlen(timeStr) == 20) {return localtime(temporalParse(timeStr,'yyyy-MM-ddTHH:mm:ssZ'));} else if (strlen(timeStr) == 24) {return localtime(temporalParse(timeStr,'yyyy-MM-ddTHH:mm:ss.SSSZ'));} else {return timeStr;}};def transData(dbName, tbName, mutable data) {timeCol = exec time from data; timeCol=each(parseRFC3339, timeCol);  writeLog(timeCol);replaceColumn!(data,'time', timeCol); loadTable(dbName,tbName).append!(data); }",
          "table": [
              {
                "type": "DT_STRING",
                "name": "time"
              },
              {
                "type": "DT_SYMBOL",
                "name": "stationID"
              },
              {
                "type": "DT_DOUBLE",
                "name": "grinding_time"
              },
              {
                "type": "DT_DOUBLE",
                "name": "oil_temp"
              },
              {
                "type": "DT_DOUBLE",
                "name": "pressure"
              },
              {
                "type": "DT_DOUBLE",
                "name": "pressure_target"
              },
              {
                "type": "DT_DOUBLE",
                "name": "rework_time"
              },
              {
                "type": "DT_SYMBOL",
                "name": "state"
              }
          ]
        }
      }
    },
    "setting": {
      "speed": {
        "bytes": -1,
        "channel": 1
      }
    }
  }
}

Convert the Date and Time Values

InfluxDB 2.0 stores date and time values in RFC3339 format with timezone information, while DolphinDB only stores local time. A conversion error will be raised when date and time values from InfluxDB is migrated to DolphinDB directly. We need to define a function in DolphinDBWriter plugin (with the saveFunctionName and saveFunctionDef parameters) to convert the data.

Below is the user-defined function specified for saveFunctionDef in the configuration file:

def parseRFC3339(timeStr) {
	if(strlen(timeStr) == 20) {
		return localtime(temporalParse(timeStr,'yyyy-MM-ddTHH:mm:ssZ'));
	} else if (strlen(timeStr) == 24) {
		return localtime(temporalParse(timeStr,'yyyy-MM-ddTHH:mm:ss.SSSZ'));
	} else {
		return timeStr;
	}
}

def transData(dbName, tbName, mutable data) {
	timeCol = exec time from data; writeLog(timeCol);
	timeCol=each(parseRFC3339, timeCol);
	replaceColumn!(data, 'time', timeCol);
	loadTable(dbName,tbName).append!(data);
}

In the example above, function parseRFC3339 is nested within function transData. Therefore, parameter saveFunctionName is specified as "transData" as it is called by the plugin directly.

Function parseRFC3339 uses the temporalParse function to convert a string to a DolphinDB temporal data type, and the localtime function to convert it to local time. With the time column returned by function parseRFC3339, function transData then calls append! to write data to the DolphinDB DFS table.

Note: If your business does not require time zone conversion, you can just use the result returned by temporalParse.

For time zone-related issues, refer to Time Zones in DolphinDB.

Synchronize Data

Run the migration job configured in the file above.

./bin/addax.sh ~/addax/job/influx2ddb.json

Note: The configuration file influx2ddb.json is placed in the addax/job directory in this case. You can change to the directory where the configuration file is located.

The log is displayed as follows:

2022-12-13 21:17:17.870 [job-0] INFO  JobContainer         -
Start Time                     : 2022-12-13 21:17:14
End Time                       : 2022-12-13 21:17:17
Elapsed Time                   :                  3s
Average Flow                   :          345.07KB/s
Write Speed                    :           6568rec/s
Total Read Records             :               19705
Total Failed Attempts          :                   0

You can specify the filtering condition in the parameter range to the "reader" part of influxdb2ddb.json to incrementally synchronize the selected data.

The above case illustrates the process of migrating data from InfluxDB 2.0 to DolphinDB. For InfluxDB 1.0, replace the InfluxDB2 Reader plugin with InfluxDB Reader plugin in the configuration file (influx2ddb.json).

Appendices

Configuration Parameters for InfluxDB2 Reader

ParametersRequiredData TypeDefaultDescription
endpointstring/InfluxDB connection string
tokenstring/Token to access to the database
table list/The name of the table to be synchronize
orgstring/InfluxDB org name
bucketstring/InfluxDB bucket name
column list/A JSON array indicating column names of the table to be synchronized. A asterisk symbol (*) indicates that all columns are to be synchronized, e.g. ["*"]
rangelist/The time range of the data to be read
limit int/The limit to the number of records acquired

Configuration Parameters for DolphinDBWriter Plugin

ParametersRequiredData TypeDefaultDescription
hoststring/Server Host
portint/Server Port
userIdstring/DolphinDB username Note that when importing data to a DFS database, the user must be granted appropriate privileges.
pwdstring/Password for DolphinDB user
dbPathstring/The target database to be written to, e.g., "dfs://MYDB".
tableNamestring/The target table
batchSize int10000000The number of records to be written for each batch
table Column names of the table to be written to. For more information, refer to the details below.
saveFunctionName string/User-defined function to process data. If it is not specified, the data will be inserted into DolphinDB with function tableInsert. If specifed, the data will be processed with the specified function.
saveFunctionDef string/User-defined function to append data to the database. It takes three arguments: dfsPath, tbName, and data (the table to be imported).

Details of parameter table:

table specifies the columns of the table to be written to.

 {"name": "columnName", "type": "DT_STRING", "isKeyField":true}

Note that the order of specified columns must be consistent with the table to be imported.

  • name: the column names;
  • isKeyField: to determine whether the column is the key column. The default value is true. It is used to identify the primary key when updating data.
  • type: the mapping between the DolphinDB data type and its value is listed below:
DolphinDB Data TypeValue of Type
DOUBLEDT_DOUBLE
FLOATDT_FLOAT
BOOLDT_BOOL
DATEDT_DATE
MONTHDT_MONTH
DATETIMEDT_DATETIME
TIMEDT_TIME
SECONDDT_SECOND
TIMESTAMPDT_TIMESTAMP
NANOTIMEDT_NANOTIME
NANOTIMETAMPDT_NANOTIMETAMP
INTDT_INT
LONGDT_LONG
UUIDDT_UUID
SHORTDT_SHORT
STRINGDT_STRING
SYMBOLDT_SYMBOL