文档简介:
本页介绍了时序数据库Influx版产品Java SDK的使用及其具备的功能。
详细介绍如何使用Java语言对时序数据库Influx版实例进行连接指引。
构建要求
Java SDK工具包需在Java 1.8+环境下运行。
获取并装载SDK
时序数据库Influx版实例用户可以在【实例信息-文档下载】页下载SDK包。并将下载的.jar包依赖引入到项目工程中。
使用SDK创建连接
InfluxDBClient是操作InfluxDB的客户端操作类,使用InfluxDB的SDK前首先需要创建InfluxDBClient的实例对象,可以通过InfluxDBClientFactory工厂类里面提供的8种方法来创建InfluxDBClient实例对象。其中,8种创建InfluxDBClient实例对象的方法可以概括为三类:第一类是方法中通过传递不同的配置参数来创建InfluxDBClient实例对象;第二类是通过InfluxDBClientOptions配置类来创建InfluxDBClient实例对象;第三类是方法中不传递任何参数而是通过读取根目录下的influx2.properties配置文件中的配置项来创建InfluxDBClient实例对象。
注意:InfluxDBClient实例对象使用完成之后记得调用influxDBClient.close()方法进行及时关闭。
-
第一类示例代码
方法1:create(String url, char[] token, String org, String bucket)
InfluxDBClient client = InfluxDBClientFactory.create("http://localhost:8086",
"my-token".toCharArray(), "myOrg", "myBucket");复制
方法2:create(String url, char[] token, String org)
InfluxDBClient client = InfluxDBClientFactory.create"http://localhost:8086", "myOrg");复制
方法3:create(String url, char[] token)
InfluxDBClient client = InfluxDBClientFactory.create("http://localhost:8086", "my-token".toCharArray());复制
方法4:create(String url, String username, char[] password)
InfluxDBClient client = InfluxDBClientFactory.create("http://localhost:8086",
"my-username", "my-password".toCharArray());复制
方法5:create(String connectionString)
//可以使用连接字符串构建客户端,该连接字符串包含URL地址及相关参数。InfluxDBClient client
= InfluxDBClientFactory.create("http://localhost:8086?readTimeout=1000&writeTimeout=
3000&connectTimeout=2000&logLevel=HEADERS&token=my-token&bucket=my-bucket&org=my-org");复制
其中,URL中参数支持以下选项:
属性名 |
默认值 |
描述说明 |
org |
- |
写入和查询的默认目标组织 |
bucket |
- |
写入的默认目标存储桶 |
token |
- |
用于授权的令牌 |
logLevel |
NONE |
日志级别 |
readTimeout |
10000ms |
读取超时时间 |
writeTimeout |
10000ms |
写超时时间 |
connectTimeout |
10000ms |
套接字超时时间 |
-
第二类示例代码
首先创建InfluxDBClientOptions对象,然后放入InfluxDBClientFactory类的create方法中即可创建InfluxDBClient实例对象。
//创建配置对象InfluxDBClientOptions options = InfluxDBClientOptions.builder() .url(url)
.org("-") .authenticateToken("my-token".toCharArray()) .bucket("my-bucket")
.build();//创建InfluxDBClient的实例对象InfluxDBClient client = InfluxDBClientFactory.create(options);复制
-
第三类示例代码
不传递任何参数而是通过读取根目录下的influx2.properties配置文件中的配置项来创建InfluxDBClient实例对象。
InfluxDBClient client = InfluxDBClientFactory.create();复制
异步非阻塞写入
异步非阻塞写入数据方式需要创建WriteApi对象,支持使用行协议Line Protocol、数据点Data Point和对象POJO写入数据,支持批量写入数据。
-
对象写入
可以通过POJO对象的形式异步非阻塞写入数据到特定的Bucket。示例代码:
//定义POJO对象类@Measurement(name = "temperature")public static class Temperature {
@Column(tag = true) String location; @Column Double value;
@Column(timestamp = true) Instant time;} //配置客户端influxDBClient//写入数据try
(WriteApi writeApi = influxDBClient.getWriteApi()) { // 构建POJO对象 Temperature
temperature = new Temperature(); temperature.location = "south"; temperature.value =
62D; temperature.time = Instant.now(); // 写入对象数据 writeApi.writeMeasurement(Wri
tePrecision.NS, temperature);}//关闭客户端influxDBClient.close();复制
-
数据点写入
可以通过Data Point数据点的形式异步非阻塞写入数据到特定的Bucket。示例代码:
//配置客户端influxDBClient//写入数据try (WriteApi writeApi = influxDBClient.getWriteApi()) {
// 构建数据点Point Point point = Point.measurement("temperature")
.addTag("location", "west") .addField("value", 55D) .time(Instant.now().toEpochMilli(),
WritePrecision.MS); // 写入Point数据 writeApi.writePoint(point);}
//关闭客户端influxDBClient.close();复制
-
行协议写入
可以通过Line Protocol行协议的形式异步非阻塞写入数据到特定的Bucket。示例代码:
//配置客户端influxDBClient//写入数据try (WriteApi writeApi = influxDBClient.getWriteApi())
{ // 定义record String record = "temperature,location=north value=60.0";
// 写入行数据 writeApi.writeRecord(WritePrecision.NS,
record);}//关闭客户端influxDBClient.close();复制
-
批量写入
通过构建WriteOptions配置对象并将其作为参数传入客户端创建WriteApi对象的方法中,从而实现对数据批量写入的支持。示例代码:
//配置客户端influxDBClient//构建WriteOptions配置对象,参数根据实际需要灵活调整。WriteOptions
writeOptions = WriteOptions.builder() .batchSize(10_000) .bufferLimit(500)
.flushInterval(500) .jitterInterval(1_000) .retryInterval(2_000)
.maxRetries(5) .maxRetryDelay(250_123) .exponentialBase(2)
.writeScheduler(Schedulers.computation()) .backpressureStrategy(BackpressureOverflowStrategy.ERROR)
.build();//创建WriteApi 对象WriteApi api = influxDBClient.getWriteApi(writeOptions )
;//写入数据……//关闭客户端influxDBClient.close();复制
同步阻塞写入
同步阻塞写入数据方式需要创建WriteApiBlocking对象,支持使用行协议Line Protocol、数据点Data Point和对象POJO写入数据。
-
对象写入
可以通过POJO对象的形式同步阻塞写入数据到特定的Bucket。示例代码:
//定义POJO对象类@Measurement(name = "temperature")public static class Temperature {
@Column(tag = true) String location; @Column Double value;
@Column(timestamp = true) Instant time;}//配置客户端influxDBClient//创建WriteApiBlocking
对象WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();//写入数据try {
// 构建POJO对象 Temperature temperature = new Temperature(); temperature.location
= "south"; temperature.value = 62D; temperature.time = Instant.now();
// 写入对象数据 writeApi.writeMeasurement(WritePrecision.NS, temperature);} catch
(InfluxException ie) { System.out.println("InfluxException: " + ie);}
//关闭客户端influxDBClient.close();复制
-
数据点写入
可以通过Data Point数据点的形式同步阻塞写入数据到特定的Bucket。示例代码:
//配置客户端influxDBClient//创建WriteApiBlocking对象WriteApiBlocking writeApi
= influxDBClient.getWriteApiBlocking();//写入数据try { // 构建数据点Point
Point point = Point.measurement("temperature") .addTag("location",
"west") .addField("value", 55D) .time(Instant.now().toEpochMilli(),
WritePrecision.MS); // 写入Point数据 writeApi.writePoint(point);}
//关闭客户端influxDBClient.close();复制
-
行协议写入
可以通过Line Protocol行协议的形式同步阻塞写入数据到特定的Bucket。示例代码:
//配置客户端influxDBClient//创建WriteApiBlocking对象WriteApiBlocking writeApi
= influxDBClient.getWriteApiBlocking();//写入数据try { // 定义record
String record = "temperature,location=north value=60.0";// 写入行数据writeApi
.writeRecord(WritePrecision.NS, record);}//关闭客户端influxDBClient.close();复制
默认标签值使用
有时候在每一个测量值中都需保存部分相同的信息,如hostname、location等,这时可以通过静态值、系统变量或环境变量的属性配置来设置默认的标签值,其配置的表达式形式如下:静态值:China;系统变量:${version};环境变量:${env.hostname}。
针对示例行协议内容:mine-sensor,id=132-987-655,customer="China",hostname=example.com,sensor-version=v1.00 altitude=10,其有两种实现方式,分别如下。
-
通过配置文件实现的示例代码:
influx2.tags.id = 132-987-655influx2.tags.customer = Chinainflux2.tags.hostname
= ${env.hostname}influx2.tags.sensor-version = ${version}复制
-
通过API实现的示例代码:
InfluxDBClientOptions options = InfluxDBClientOptions.builder() .url(url)
.authenticateToken(token) .addDefaultTag("id", "132-987-655") .addDefaultTag("customer",
"China") .addDefaultTag("hostnamer", "${env.hostname}") .addDefaultTag(
"sensor-version", "${version}") .build();复制
GZip支持
InfluxDBClient默认不会为其底层调用的HTTP请求启用GZIP压缩,如果要启用GZIP以减少传输数据的大小,则可以通过如下示例代码进行配置。
示例代码:
influxDBClient.enableGzip();复制
同步查询
同步查询不适用于大型查询结果。示例代码:
//配置客户端influxDBClient//定义查询Flux,内容可以根据查询需要自行定义。String flux =
"from(bucket:\"my-bucket\") |> range(start: 0)";//创建QueryApi对象QueryApi queryApi
= influxDBClient.getQueryApi();//查询数据List<FluxTable> tables = queryApi.query(flux);for
(FluxTable fluxTable : tables) { List<FluxRecord> records = fluxTable.getRecords();
for (FluxRecord fluxRecord : records) { System.out.println(fluxRecord.
getTime() + ": " + fluxRecord.getValueByKey("_value")); }}//关闭客户端influxDBClient.close();复制
另外同步查询提供了FluxRecords到 POJO的可能性映射。
//定义POJO对象类@Measurement(name = "temperature")public static class Temperature {
@Column(tag = true) String location; @Column Double value;
@Column(timestamp = true) Instant time;}//配置客户端influxDBClient//定义查询Flux,
内容可以根据查询需要自行定义。String flux = "from(bucket:\"my-bucket\") |> range(start:
0) |> filter(fn: (r) => r._measurement == \"temperature\")";//创建QueryApi对象QueryApi
queryApi = influxDBClient.getQueryApi();//查询数据并映射成POJO对象List<Temperature> temperatures
= queryApi.query(flux, Temperature.class);for (Temperature temperature : temperatures) {
System.out.println(temperature.location + ": " + temperature.value + " at "
+ temperature.time);}//关闭客户端influxDBClient.close();复制
异步查询
异步查询提供了处理未绑定查询的可能性,并允许用户处理异常、停止接收更多结果以及成功查询的通知。示例代码:
//配置客户端influxDBClient//定义查询Flux,内容可以根据查询需要自行定义。String flux = "from(bucket:\"my-bucket\")
|> range(start: 0)";//创建QueryApi对象QueryApi queryApi = influxDBClient.getQueryApi();//查询数据:query(String
query, BiConsumer<Cancellable, FluxRecord> onNext, Consumer<? super Throwable> onError, Runnable onComplete);
//异步查询提供多种query方法,如上方法的后两个参数可以根据实际需要灵活选用queryApi.query(flux, (cancellable, fluxRecord)
-> { // 回调消费FluxRecord结果,具有中断流查询的能力
System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getValueByKey("_value"));}, throwable ->
{ // 回调消费所有的错误通知 System.out.println("Error occurred: " + throwable.getMessage());},
() -> { // 回调消费查询成功的通知 System.out.println("Query completed");});
//关闭客户端influxDBClient.close();复制
另外异步查询提供了FluxRecords到 POJO的可能性映射。示例代码:
//定义POJO对象类@Measurement(name = "temperature")public static class Temperature {
@Column(tag = true) String location; @Column Double value;
@Column(timestamp = true) Instant time;}//配置客户端influxDBClient//定义查询Flux,
内容可以根据查询需要自行定义。String flux = "from(bucket:\"my-bucket\") |> range(start: 0)
|> filter(fn: (r) => r._measurement == \"temperature\")";//创建QueryApi对象QueryApi
queryApi = influxDBClient.getQueryApi();//查询数据并映射成POJO对象queryApi.query(flux, Temperature.class,
(cancellable, temperature) -> { // 回调消费FluxRecord结果并映射成POJO对象,具有中断异步查询的能力
System.out.println(temperature.location + ": " + temperature.value + " at " + temperature.time);
});//关闭客户端influxDBClient.close();复制
原生查询
原生查询允许直接处理成原始CSV响应。示例代码:
//配置客户端influxDBClient//定义查询Flux,内容可以根据查询需要自行定义。String flux = "from(bucket:\"my-bucket\")
|> range(start: 0)";//创建QueryApi对象QueryApi queryApi = influxDBClient.getQueryApi();//查询数据String
csv = queryApi.queryRaw(flux);System.out.println("CSV response: " + csv);//关闭客户端influxDBClient.close();复制
异步版本允许逐行处理。示例代码:
//配置客户端influxDBClient//定义查询Flux,内容可以根据查询需要自行定义。String flux = "from(bucket:\"my-bucket\")
|> range(start: 0)";//创建QueryApi对象QueryApi queryApi = influxDBClient.getQueryApi();//查询数据queryApi.
queryRaw(flux, (cancellable, line) -> { System.out.println("Response: " + line);});
//关闭客户端influxDBClient.close();复制
Flux-DSL构建查询
对于同步查询、异步查询或原生查询中的Flux查询语句,除了直接手动拼接成String外,还可以通过Flux-DSL来构建Flux查询。示例代码:
//配置客户端influxDBClient//构建Flux对象,内容可以根据查询需要自行定义。Flux flux = Flux.from("my-bucket")
.range(-30L, ChronoUnit.MINUTES) .filter(Restrictions.and(Restrictions.measurement().equal("cpu")))
.limit(10);//创建QueryApi对象QueryApi queryApi = influxDBClient.getQueryApi();//查询数据List<FluxTable>
tables = queryApi.query(flux.toString());for (FluxTable fluxTable : tables) { List<FluxRecord>
records = fluxTable.getRecords(); for (FluxRecord fluxRecord : records) {
System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getValueByKey("_value"));
}}//关闭客户端influxDBClient.close();复制
删除数据
删除数据需要创建DeleteApi对象,下面展示从InfluxDB删除数据的示例代码:
//配置客户端influxDBClient//创建DeleteApi 对象。DeleteApi deleteApi = influxDBClient.getDeleteApi()
;//删除数据。其中,DeleteApi 对象拥有包含不同参数的多个delete方法,起止时间以及bucket等信息参数可以根
据实际需要灵活调整。try { OffsetDateTime start = OffsetDateTime.now().minus(1, ChronoUnit.HOURS);
OffsetDateTime stop = OffsetDateTime.now(); deleteApi.delete(start, stop, "", "my-bucket",
"my-org");} catch (InfluxException ie) { System.out.println("InfluxException: " + ie);}
//关闭客户端influxDBClient.close();复制
日志等级配置
可以通过更改日志等级来记录请求和响应内容,其中,LogLevel值包括:NONE、BASIC、HEADER、BODY。
注意,当Logleve配置为BODY等级时,将在流式传输时禁用块并将整个响应加载到内存中。示例代码:
influxDBClient.setLogLevel(LogLevel.HEADERS)复制
管理API
允许客户通过SDK中的API进行管理操作,其中包含Buckets接口、org接口、sources接口。
-
Buckets接口
管理buckets相关接口需要首先创建BucketsApi对象,进而调用其新建、查询、删除等函数方法实现对buckets的数据管理。示例代码:
//配置客户端influxDBClient//创建BucketsApi对象BucketsApi bucketsApi = influxDBClient.getBucketsApi();
//说明:BucketsApi 对象中提供多种不同参数的新建、查询、删除等函数方法用于Bucket对象的管理,
下面只是择取部分方法对其进行代码示例,实际可根据需要灵活调整所调用方法。//创建Bucket(Bucket个数不超过
100)BucketRetentionRules retention = new BucketRetentionRules();retention.setEverySeconds(3600)
;Bucket createBucket= bucketsApi.createBucket("iot-bucket", retention, "xxxxxxxxx");//查询BucketBucket
foundBucket = bucketsApi.findBucketByID(createBucket.getId());//更新BucketcreateBucket.setName("Therm
sensor 2000");createBucket.getRetentionRules().add(new BucketRetentionRules().everySeconds(3600*2));
Bucket updatedBucket = bucketsApi.updateBucket(createBucket);//删除BucketbucketsApi.deleteBucket
(updatedBucket);//关闭客户端influxDBClient.close();复制
-
org接口
管理organizations相关接口需要首先创建OrganizationsApi对象,进而调用其查询函数方法实现对organizations数据的查询。示例代码:
//配置客户端influxDBClient//创建OrganizationsApi对象OrganizationsApi organizationsApi = influxDBClient
.getOrganizationsApi();//说明:OrganizationsApi 对象中提供多种不同参数的查询函数方法用于Organizations
对象的管理,下面只是择取部分方法对其进行代码示例,实际可根据需要灵活调整所调用方法。//查询Organizat
ionOrganization foundOrganization = organizationsApi.findOrganizationByID(createOrganization.getId
());//关闭客户端influxDBClient.close();复制
-
sources接口
管理sources相关接口需要首先创建SourcesApi对象,进而调用其新建、查询、删除等函数方法实现对sources的数据管理。示例代码:
//配置客户端influxDBClient//创建SourcesApi对象SourcesApi sourcesApi = influxDBClient.getSourcesApi();
//说明:SourcesApi 对象中提供多种不同参数的查询等函数方法用于Sources对象的管理,下面只是择取部分方法对
其进行代码示例,实际可根据需要灵活调整所调用方法。//创建SourceSource source = new Source();source.
setOrgID("02cebf26d7fc1000");source.setDefault(false);source.setName("my-source");source.setType(Sourc
e.TypeEnum.V1);source.setUrl("http://localhost:8086");source.setInsecureSkipVerify(true);source.setTel
egraf("telegraf");source.setToken(UUID.randomUUID().toString());source.setUsername("admin");source.se
tPassword("password");source.setSharedSecret(UUID.randomUUID().toString());source.setMetaUrl("/usr/lo
cal/var/influxdb/meta");source.setDefaultRP("autogen");Source createdSource = sourcesApi.createSou
rce(source); //查询SourceSource foundSource = sourcesApi.findSourceByID(createdSource.getId());/
/更新SourcecreatedSource.setInsecureSkipVerify(false);updateSource = sourcesApi.updateSource(create
dSource);//删除SourcesourcesApi.deleteSource(updateSource);//关闭客户端influxDBClient.close();复制
关闭客户端
InfluxDB Client提供了优雅的关闭功能。示例代码:
//关闭客户端influxDBClient.close();