上云无忧 > 文档中心 > 天翼云时序数据库Influx使用教程 - Java SDK接入
时序数据库Influx版
天翼云时序数据库Influx使用教程 - Java SDK接入

文档简介:
本页介绍了时序数据库Influx版产品Java SDK的使用及其具备的功能。 详细介绍如何使用Java语言对时序数据库Influx版实例进行连接指引。 构建要求: Java SDK工具包需在Java 1.8+环境下运行。
*产品来源:中国电信天翼云。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

本页介绍了时序数据库Influx版产品Java SDK的使用及其具备的功能。

详细介绍如何使用Java语言对时序数据库Influx版实例进行连接指引。

构建要求

Java SDK工具包需在Java 1.8+环境下运行。

获取并装载SDK

时序数据库Influx版实例用户可以在【实例信息-文档下载】页下载SDK包。并将下载的.jar包依赖引入到项目工程中。

使用SDK创建连接

InfluxDBClient是操作InfluxDB的客户端操作类,使用InfluxDB的SDK前首先需要创建InfluxDBClient的实例对象,可以通过InfluxDBClientFactory工厂类里面提供的8种方法来创建InfluxDBClient实例对象。其中,8种创建InfluxDBClient实例对象的方法可以概括为三类:第一类是方法中通过传递不同的配置参数来创建InfluxDBClient实例对象;第二类是通过InfluxDBClientOptions配置类来创建InfluxDBClient实例对象;第三类是方法中不传递任何参数而是通过读取根目录下的influx2.properties配置文件中的配置项来创建InfluxDBClient实例对象。

注意:InfluxDBClient实例对象使用完成之后记得调用influxDBClient.close()方法进行及时关闭。

  • 第一类示例代码

方法1create(String url, char[] token, String org, String bucket)

InfluxDBClient client = InfluxDBClientFactory.create("http://localhost:8086",
 "my-token".toCharArray(), "myOrg", "myBucket");复制

方法2create(String url, char[] token, String org)

InfluxDBClient client = InfluxDBClientFactory.create"http://localhost:8086", "myOrg");复制

方法3create(String url, char[] token)

InfluxDBClient client = InfluxDBClientFactory.create("http://localhost:8086", "my-token".toCharArray());复制

方法4:create(String url, String username, char[] password)

InfluxDBClient client = InfluxDBClientFactory.create("http://localhost:8086",
 "my-username", "my-password".toCharArray());复制

方法5:create(String connectionString)

//可以使用连接字符串构建客户端,该连接字符串包含URL地址及相关参数。InfluxDBClient client
 = InfluxDBClientFactory.create("http://localhost:8086?readTimeout=1000&writeTimeout=
3000&connectTimeout=2000&logLevel=HEADERS&token=my-token&bucket=my-bucket&org=my-org");复制

其中,URL中参数支持以下选项:

属性名

默认值

描述说明

org

-

写入和查询的默认目标组织

bucket

-

写入的默认目标存储桶

token

-

用于授权的令牌

logLevel

NONE

日志级别

readTimeout

10000ms

读取超时时间

writeTimeout

10000ms

写超时时间

connectTimeout

10000ms

套接字超时时间

  •  第二类示例代码

首先创建InfluxDBClientOptions对象,然后放入InfluxDBClientFactory类的create方法中即可创建InfluxDBClient实例对象。

//创建配置对象InfluxDBClientOptions options = InfluxDBClientOptions.builder()  .url(url) 
 .org("-")  .authenticateToken("my-token".toCharArray())  .bucket("my-bucket")  
.build();//创建InfluxDBClient的实例对象InfluxDBClient client = InfluxDBClientFactory.create(options);复制
  • 第三类示例代码

不传递任何参数而是通过读取根目录下的influx2.properties配置文件中的配置项来创建InfluxDBClient实例对象。

InfluxDBClient client = InfluxDBClientFactory.create();复制

异步非阻塞写入

异步非阻塞写入数据方式需要创建WriteApi对象,支持使用行协议Line Protocol、数据点Data Point和对象POJO写入数据,支持批量写入数据。

  • 对象写入

可以通过POJO对象的形式异步非阻塞写入数据到特定的Bucket。示例代码:

//定义POJO对象类@Measurement(name = "temperature")public static class Temperature {   
 @Column(tag = true)    String location;     @Column    Double value;    
 @Column(timestamp = true)    Instant time;} //配置客户端influxDBClient//写入数据try 
(WriteApi writeApi = influxDBClient.getWriteApi()) {    // 构建POJO对象  Temperature 
temperature = new Temperature();  temperature.location = "south";  temperature.value = 
62D;  temperature.time = Instant.now();  // 写入对象数据  writeApi.writeMeasurement(Wri
tePrecision.NS, temperature);}//关闭客户端influxDBClient.close();复制
  • 数据点写入

可以通过Data Point数据点的形式异步非阻塞写入数据到特定的Bucket。示例代码:

//配置客户端influxDBClient//写入数据try (WriteApi writeApi = influxDBClient.getWriteApi()) {  
  // 构建数据点Point    Point point = Point.measurement("temperature")	
.addTag("location", "west")	.addField("value", 55D)    .time(Instant.now().toEpochMilli(),
 WritePrecision.MS);    // 写入Point数据    writeApi.writePoint(point);}
//关闭客户端influxDBClient.close();复制
  • 行协议写入

可以通过Line Protocol行协议的形式异步非阻塞写入数据到特定的Bucket。示例代码:

//配置客户端influxDBClient//写入数据try (WriteApi writeApi =   influxDBClient.getWriteApi())
 {    // 定义record    String   record = "temperature,location=north value=60.0";
    // 写入行数据    writeApi.writeRecord(WritePrecision.NS,  
 record);}//关闭客户端influxDBClient.close();复制
  •  批量写入

通过构建WriteOptions配置对象并将其作为参数传入客户端创建WriteApi对象的方法中,从而实现对数据批量写入的支持。示例代码:

//配置客户端influxDBClient//构建WriteOptions配置对象,参数根据实际需要灵活调整。WriteOptions
 writeOptions = WriteOptions.builder()    .batchSize(10_000)    .bufferLimit(500)  
  .flushInterval(500)    .jitterInterval(1_000)    .retryInterval(2_000)   
 .maxRetries(5)    .maxRetryDelay(250_123)    .exponentialBase(2)    
.writeScheduler(Schedulers.computation())    .backpressureStrategy(BackpressureOverflowStrategy.ERROR) 
   .build();//创建WriteApi 对象WriteApi api =   influxDBClient.getWriteApi(writeOptions )
;//写入数据……//关闭客户端influxDBClient.close();复制

同步阻塞写入

同步阻塞写入数据方式需要创建WriteApiBlocking对象,支持使用行协议Line Protocol、数据点Data Point和对象POJO写入数据。

  • 对象写入

可以通过POJO对象的形式同步阻塞写入数据到特定的Bucket。示例代码:

//定义POJO对象类@Measurement(name = "temperature")public static class Temperature {  
  @Column(tag = true)    String location;    @Column    Double value;   
 @Column(timestamp = true)    Instant time;}//配置客户端influxDBClient//创建WriteApiBlocking
对象WriteApiBlocking writeApi =   influxDBClient.getWriteApiBlocking();//写入数据try {  
  // 构建POJO对象    Temperature temperature = new Temperature();    temperature.location
 = "south";    temperature.value   = 62D;    temperature.time = Instant.now();   
 // 写入对象数据    writeApi.writeMeasurement(WritePrecision.NS, temperature);} catch 
(InfluxException ie) {        System.out.println("InfluxException: " + ie);}
//关闭客户端influxDBClient.close();复制
  •  数据点写入

可以通过Data Point数据点的形式同步阻塞写入数据到特定的Bucket。示例代码:

//配置客户端influxDBClient//创建WriteApiBlocking对象WriteApiBlocking writeApi 
= influxDBClient.getWriteApiBlocking();//写入数据try {    // 构建数据点Point  
  Point point = Point.measurement("temperature")        .addTag("location",
 "west")        .addField("value", 55D)        .time(Instant.now().toEpochMilli(),
 WritePrecision.MS);    // 写入Point数据    writeApi.writePoint(point);}
//关闭客户端influxDBClient.close();复制
  •  行协议写入

可以通过Line Protocol行协议的形式同步阻塞写入数据到特定的Bucket。示例代码:

//配置客户端influxDBClient//创建WriteApiBlocking对象WriteApiBlocking writeApi
 = influxDBClient.getWriteApiBlocking();//写入数据try {    // 定义record  
  String record = "temperature,location=north value=60.0";// 写入行数据writeApi
.writeRecord(WritePrecision.NS,   record);}//关闭客户端influxDBClient.close();复制

默认标签值使用

有时候在每一个测量值中都需保存部分相同的信息,如hostname、location等,这时可以通过静态值、系统变量或环境变量的属性配置来设置默认的标签值,其配置的表达式形式如下:静态值:China;系统变量:${version};环境变量:${env.hostname}。

针对示例行协议内容:mine-sensor,id=132-987-655,customer="China",hostname=example.com,sensor-version=v1.00 altitude=10,其有两种实现方式,分别如下。

  • 通过配置文件实现的示例代码:

influx2.tags.id = 132-987-655influx2.tags.customer = Chinainflux2.tags.hostname 
= ${env.hostname}influx2.tags.sensor-version = ${version}复制
  • 通过API实现的示例代码:

InfluxDBClientOptions options = InfluxDBClientOptions.builder()    .url(url) 
 .authenticateToken(token)  .addDefaultTag("id", "132-987-655")  .addDefaultTag("customer",
 "China")  .addDefaultTag("hostnamer", "${env.hostname}")    .addDefaultTag(
"sensor-version", "${version}")    .build();复制

GZip支持

InfluxDBClient默认不会为其底层调用的HTTP请求启用GZIP压缩,如果要启用GZIP以减少传输数据的大小,则可以通过如下示例代码进行配置。

示例代码:

influxDBClient.enableGzip();复制

同步查询

同步查询不适用于大型查询结果示例代码:

//配置客户端influxDBClient//定义查询Flux,内容可以根据查询需要自行定义。String flux = 
"from(bucket:\"my-bucket\") |> range(start: 0)";//创建QueryApi对象QueryApi queryApi 
= influxDBClient.getQueryApi();//查询数据List<FluxTable> tables = queryApi.query(flux);for
 (FluxTable fluxTable : tables) {    List<FluxRecord> records = fluxTable.getRecords();
    for (FluxRecord fluxRecord : records) {        System.out.println(fluxRecord.
getTime() + ": " + fluxRecord.getValueByKey("_value"));    }}//关闭客户端influxDBClient.close();复制

另外同步查询提供了FluxRecords到 POJO的可能性映射。

//定义POJO对象类@Measurement(name = "temperature")public static class Temperature {   
 @Column(tag = true)    String location;    @Column    Double value;    
@Column(timestamp = true)    Instant time;}//配置客户端influxDBClient//定义查询Flux,
内容可以根据查询需要自行定义。String flux = "from(bucket:\"my-bucket\") |> range(start: 
0) |> filter(fn: (r) => r._measurement == \"temperature\")";//创建QueryApi对象QueryApi 
queryApi = influxDBClient.getQueryApi();//查询数据并映射成POJO对象List<Temperature> temperatures 
= queryApi.query(flux, Temperature.class);for (Temperature temperature : temperatures) {
    System.out.println(temperature.location + ": " + temperature.value + " at " 
+ temperature.time);}//关闭客户端influxDBClient.close();复制

异步查询

异步查询提供了处理未绑定查询的可能性,并允许用户处理异常、停止接收更多结果以及成功查询的通知。示例代码:

//配置客户端influxDBClient//定义查询Flux,内容可以根据查询需要自行定义。String flux = "from(bucket:\"my-bucket\")
 |> range(start: 0)";//创建QueryApi对象QueryApi queryApi = influxDBClient.getQueryApi();//查询数据:query(String
query, BiConsumer<Cancellable, FluxRecord> onNext, Consumer<? super Throwable> onError, Runnable onComplete);
//异步查询提供多种query方法,如上方法的后两个参数可以根据实际需要灵活选用queryApi.query(flux, (cancellable, fluxRecord)
   -> {    // 回调消费FluxRecord结果,具有中断流查询的能力               
System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getValueByKey("_value"));}, throwable -> 
{    // 回调消费所有的错误通知    System.out.println("Error occurred: " + throwable.getMessage());},
 () -> {    // 回调消费查询成功的通知    System.out.println("Query completed");});
//关闭客户端influxDBClient.close();复制

另外异步查询提供FluxRecords POJO的可能性映射。示例代码:

//定义POJO对象类@Measurement(name = "temperature")public static class Temperature {  
  @Column(tag = true)    String location;    @Column    Double value;   
 @Column(timestamp = true)    Instant time;}//配置客户端influxDBClient//定义查询Flux,
内容可以根据查询需要自行定义。String flux = "from(bucket:\"my-bucket\") |> range(start: 0)
|>   filter(fn: (r) => r._measurement == \"temperature\")";//创建QueryApi对象QueryApi 
queryApi = influxDBClient.getQueryApi();//查询数据并映射成POJO对象queryApi.query(flux, Temperature.class,
   (cancellable, temperature) -> {  // 回调消费FluxRecord结果并映射成POJO对象,具有中断异步查询的能力
  System.out.println(temperature.location + ": " + temperature.value + " at " + temperature.time);
});//关闭客户端influxDBClient.close();复制

原生查询

原生查询允许直接处理成原始CSV响应。示例代码:

//配置客户端influxDBClient//定义查询Flux,内容可以根据查询需要自行定义。String flux = "from(bucket:\"my-bucket\")
 |> range(start: 0)";//创建QueryApi对象QueryApi queryApi = influxDBClient.getQueryApi();//查询数据String 
csv = queryApi.queryRaw(flux);System.out.println("CSV response: " + csv);//关闭客户端influxDBClient.close();复制

异步版本允许逐行处理。示例代码:

//配置客户端influxDBClient//定义查询Flux,内容可以根据查询需要自行定义。String flux = "from(bucket:\"my-bucket\")
 |> range(start: 0)";//创建QueryApi对象QueryApi queryApi = influxDBClient.getQueryApi();//查询数据queryApi.
queryRaw(flux, (cancellable, line) -> {    System.out.println("Response: " + line);});
//关闭客户端influxDBClient.close();复制

Flux-DSL构建查询

对于同步查询、异步查询或原生查询中的Flux查询语句,除了直接手动拼接成String外,还可以通过Flux-DSL来构建Flux查询。示例代码:

//配置客户端influxDBClient//构建Flux对象,内容可以根据查询需要自行定义。Flux flux = Flux.from("my-bucket") 
   .range(-30L, ChronoUnit.MINUTES)    .filter(Restrictions.and(Restrictions.measurement().equal("cpu")))
    .limit(10);//创建QueryApi对象QueryApi queryApi = influxDBClient.getQueryApi();//查询数据List<FluxTable>
 tables = queryApi.query(flux.toString());for (FluxTable fluxTable : tables) {    List<FluxRecord>
 records = fluxTable.getRecords();    for (FluxRecord fluxRecord : records) {       
 System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getValueByKey("_value"));    
}}//关闭客户端influxDBClient.close();复制

删除数据

删除数据需要创建DeleteApi对象,下面展示从InfluxDB删除数据的示例代码:

//配置客户端influxDBClient//创建DeleteApi   对象。DeleteApi deleteApi = influxDBClient.getDeleteApi()
;//删除数据。其中,DeleteApi   对象拥有包含不同参数的多个delete方法,起止时间以及bucket等信息参数可以根
据实际需要灵活调整。try {    OffsetDateTime start = OffsetDateTime.now().minus(1, ChronoUnit.HOURS);
    OffsetDateTime stop = OffsetDateTime.now();    deleteApi.delete(start, stop, "", "my-bucket", 
"my-org");} catch (InfluxException ie) {    System.out.println("InfluxException: " + ie);}
//关闭客户端influxDBClient.close();复制

日志等级配置

可以通过更改日志等级来记录请求和响应内容,其中,LogLevel值包括:NONE、BASIC、HEADER、BODY。

注意,当Logleve配置为BODY等级时,将在流式传输时禁用块并将整个响应加载到内存中。示例代码:

influxDBClient.setLogLevel(LogLevel.HEADERS)复制

管理API

允许客户通过SDK中的API进行管理操作,其中包含Buckets接口、org接口、sources接口。

  • Buckets接口

管理buckets相关接口需要首先创建BucketsApi对象,进而调用其新建、查询、删除等函数方法实现对buckets的数据管理。示例代码:

//配置客户端influxDBClient//创建BucketsApi对象BucketsApi bucketsApi = influxDBClient.getBucketsApi();
//说明:BucketsApi   对象中提供多种不同参数的新建、查询、删除等函数方法用于Bucket对象的管理,
下面只是择取部分方法对其进行代码示例,实际可根据需要灵活调整所调用方法。//创建Bucket(Bucket个数不超过
100)BucketRetentionRules retention = new BucketRetentionRules();retention.setEverySeconds(3600)
;Bucket createBucket= bucketsApi.createBucket("iot-bucket", retention, "xxxxxxxxx");//查询BucketBucket
 foundBucket = bucketsApi.findBucketByID(createBucket.getId());//更新BucketcreateBucket.setName("Therm
 sensor 2000");createBucket.getRetentionRules().add(new BucketRetentionRules().everySeconds(3600*2));
Bucket updatedBucket = bucketsApi.updateBucket(createBucket);//删除BucketbucketsApi.deleteBucket
(updatedBucket);//关闭客户端influxDBClient.close();复制
  • org接口

管理organizations相关接口需要首先创建OrganizationsApi对象,进而调用其查询函数方法实现对organizations数据的查询。示例代码:

//配置客户端influxDBClient//创建OrganizationsApi对象OrganizationsApi organizationsApi = influxDBClient
.getOrganizationsApi();//说明:OrganizationsApi 对象中提供多种不同参数的查询函数方法用于Organizations
对象的管理,下面只是择取部分方法对其进行代码示例,实际可根据需要灵活调整所调用方法。//查询Organizat
ionOrganization foundOrganization = organizationsApi.findOrganizationByID(createOrganization.getId
());//关闭客户端influxDBClient.close();复制
  • sources接口

管理sources相关接口需要首先创建SourcesApi对象,进而调用其新建、查询、删除等函数方法实现对sources的数据管理。示例代码:

//配置客户端influxDBClient//创建SourcesApi对象SourcesApi sourcesApi = influxDBClient.getSourcesApi();
//说明:SourcesApi 对象中提供多种不同参数的查询等函数方法用于Sources对象的管理,下面只是择取部分方法对
其进行代码示例,实际可根据需要灵活调整所调用方法。//创建SourceSource source = new Source();source.
setOrgID("02cebf26d7fc1000");source.setDefault(false);source.setName("my-source");source.setType(Sourc
e.TypeEnum.V1);source.setUrl("http://localhost:8086");source.setInsecureSkipVerify(true);source.setTel
egraf("telegraf");source.setToken(UUID.randomUUID().toString());source.setUsername("admin");source.se
tPassword("password");source.setSharedSecret(UUID.randomUUID().toString());source.setMetaUrl("/usr/lo
cal/var/influxdb/meta");source.setDefaultRP("autogen");Source createdSource = sourcesApi.createSou
rce(source); //查询SourceSource foundSource = sourcesApi.findSourceByID(createdSource.getId());/
/更新SourcecreatedSource.setInsecureSkipVerify(false);updateSource = sourcesApi.updateSource(create
dSource);//删除SourcesourcesApi.deleteSource(updateSource);//关闭客户端influxDBClient.close();复制

关闭客户端

InfluxDB Client提供了优雅的关闭功能。示例代码:

//关闭客户端influxDBClient.close();
相似文档
  • 本页介绍将本地CSV数据导入到指定的Bucket中。 1、选择Bucket管理菜单,点击【添加数据】。 2、选择CSV导入,点击【查看样例】按钮下载CSV样例数据。
  • 本页介绍Telegar将数据自动采集到指定的Bucket中。 1、选择Bucket管理菜单,点击添加数据。 2、选择telegarf采集,选择令牌或者生成新的令牌,选择input标签然后点击下载配置文件。
  • 本页介绍了时序数据库Influx版产品购买类的常见问题。 Q:如何选择时序数据库Influx的实例规格? A:您可以根据每秒写入点数、每秒查询请求、每秒写入请求三个指标作参考,实例规格对应各指标的限制。
  • 本页介绍了时序数据库Influx版产品计费类的常见问题。 Q:时序数据库Influx版的计费模式有哪几种,计费项有哪些? A:时序数据库Influx版产品的计费模式分两种:按需计费和包年/包月计费;计费项由实例规格、存储空间容量组成,实例规格包含对读写性能的要求,数据存储空间用于存放您的数据。
  • 本页介绍了时序数据库Influx版产品操作类的常见问题。 Q:时序数据库Influx有哪些使用约定? A:时序数据库Influx版有使用约定,见帮助文档->产品简介->使用限制。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部