上云无忧 > 文档中心 > 腾讯云消息队列 CKafka - JsonPath 说明
消息队列 CKafka
腾讯云消息队列 CKafka - JsonPath 说明

文档简介:
操作场景: JSON 目前是互联网信息传递中最通用的格式协议之一。目前数据处理也主要围绕 JSON 数据格式进行解析处理。 JSONPath 是针对 JSON 格式推出的消息查询语法规范。在数据处理中,不仅能够使用简单的 JSONPath 语法,快速获取复杂嵌套 JSON 结构体的某一成员的值;还能使用 JayWay 库的扩展函数,聚合或操作某一类型的成员字段。
*此产品及展示信息均由腾讯云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

操作场景

JSON 目前是互联网信息传递中最通用的格式协议之一。目前数据处理也主要围绕 JSON 数据格式进行解析处理。
JSONPath 是针对 JSON 格式推出的消息查询语法规范。在数据处理中,不仅能够使用简单的 JSONPath 语法,快速获取复杂嵌套 JSON 结构体的某一成员的值;还能使用 JayWay 库的扩展函数,聚合或操作某一类型的成员字段。

基础功能

基础语法

$ 根结点操作符,表示当前 JSON 结构体的根结点。
.<childName> 点操作符或 ['<childName>'] 方括号操作符,表示选取当前作用对象名为 childName 的子成员。
.. 递归操作符,表示 递归 获取当前作用对象的所有子成员。
[<index>] 片选操作符,表示获取当前可迭代对象的第 index 个子成员。

获取嵌套 JSON 特定成员变量

下图为 TKE 采集的容器标准输出日志结构:

		
{
"@timestamp": 1648803500.63659,

"@filepath": "/var/log/tke-log-agent/test7/xxxxxxxx-adfe-4617-8cf3-9997aea90ded

/c_tke-es-xxxxxxxx57-n29jr_default_nginx-xxxxxxxx49626ef42d5615a636aae

74d6380996043cf6f6560d8131f21a4d8ba/jgw_INFO_2022-02-10_15_4.log",

"log": "15:00:00.000[4349811564226374227] [http-nio-8081-exec-64] INFO com.qcloud.jgw.gateway.server.topic.TopicService",
"kubernetes": {
"pod_name": "tke-es-xxxxxxxxxx-n29jr",
"namespace_name": "default",
"pod_id": "xxxxxxxx-adfe-4617-8cf3-9997aea90ded",
"labels": {
"k8s-app": "tke-es",
"pod-template-hash": "xxxx95d557",
"qcloud-app": "tke-es"
},
"annotations": {
"qcloud-redeploy-timestamp": "1648016531476",

"tke.cloud.tencent.com/networks-status": "[{\n \"name\": \"tke-bridge\",\n

\"interface\": \"eth0\",\n \"ips\": [\n \"172.16.x.xx\"\n ],\n

\"mac\": \"xx:xx:xx:4a:c2:ba\",\n \"default\": true,\n \"dns\": {}\n}]"

},
"host": "10.0.xx.xx",
"container_name": "nginx",
"docker_id": "xxxxxxxx49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba",
"container_hash": "nginx@sha256:xxxxxxxx7b29b585ed1aee166a17fad63d344bc973bc63849d74c6452d549b3e",
"container_image": "nginx"
}
}

当用户需要获取当前 pod 名称,即获取 qcloud-app 成员字段时,可以在数据处理中使用 $.kubernetes.labels.qcloud-app,或 $.['kubernetes'].['labels'].['qcloud-app'] JSONPath 语法获取。
运行结果如下所示,可以看出在测试结果中已经成功读取到 JSONPath 对应的日志:



说明
在使用 JSONPath 处理参数时,当 JSON 变量名称本身带有英文句号 . 等特殊符号时,就只能使用方括号操作符进行包装。
例如 {"key1.key2":"value1"} 的 JSON 结构体,要想取得对应成员字段,则需要使用 $.['key1.key2'] 进行获取。

进阶功能

进阶语法

* 通配操作符,表示获取当前作用对象的 所有 子成员。
*~ 内置函数,表示获取当前可迭代对象所有子对象的名称。
min() 内置函数,表示获取当前可迭代对象子对象的最小值。
max() 内置函数,表示获取当前可迭代对象子对象的最大值。
sum() 内置函数,表示获取当前可迭代对象子对象之和。
concat() 内置函数,表示连接多个对象并生成字符串。

聚合特定字段数据

当 JSON 结构体中存在对象列表时,通常列表为变长形式,以下图中的请求返回日志为例:
		
{
"data": {
"Response": {
"Result": {
"Routers": [
{
"AccessType": 0,
"RouteId": 81111,
"VpcId": "vpc-xxxxxxxx",
"VipType": 3,
"VipList": [
{
"Vip": "10.0.0.189",
"Vport": "9xxx"
}
]
},
{
"AccessType": 0,
"RouteId": 81112,
"VpcId": "vpc-r5sbavzp",
"VipType": 3,
"VipList": [
{
"Vip": "10.0.0.248",
"Vport": "9xxx"
}
]
},
{
"AccessType": 0,
"RouteId": 81113,
"VpcId": "vpc-xxxxxxxx",
"VipType": 3,
"VipList": [
{
"Vip": "10.0.0.210",
"Vport": "9xxx"
}
]
}
]
},
"RequestId": "20e74750-ca40-403d-9ea9-d3f63b5415d2"
}
},
"code": 0
}
当需要聚合变长列表的成员属性时,此时无法使用处理链形式聚合处理。此时可以使用 JSONPath 中的 * 语法匹配列表中所有元素。
例如当需要得到所有的 VipList 中的 Vip 时,可以使用 $.data.Response.Result.Routers[*].VipList[0].Vip 的 JSONPath 语法获取。
运行结果如下所示,可以看出在测试结果中已经成功获取到了所有结构体中的 Vip:


合并修改结构体成员

在某些场景下,需要在数据处理中对 JSON 结构体的多个对象进行合并整合处理,以便投递到下游进行下一步操作。考虑如下格式:
		
{
"data": {
"Response": {
"SubnetSet": [
{
"VpcId": "vpc-xxxxxxxx",
"SubnetId": "subnet-xxxxxxxx",
"SubnetName": "ckafka_cloud_subnet-1",
"CidrBlock": "10.0.0.0/19",
"Ipv6CidrBlock": "",
"IsDefault": false,
"IsRemoteVpcSnat": false,
"EnableBroadcast": false,
"Zone": "ap-changsha-ec-1",
"RouteTableId": "rtb-xxxxxxxx",
"NetworkAclId": "",
"TotalIpAddressCount": 8189,
"AvailableIpAddressCount": 8033,
"CreatedTime": "2021-01-25 17:31:00",
"TagSet": [],
"CdcId": "",
"IsCdcSubnet": 0,
"LocalZone": false,
"IsShare": false
}
],
"TotalCount": 1,
"RequestId": "705c4955-0cd9-48b2-9132-79eadae2e3e6"
}
},
"code": 0
}
当下游不具有计算功能,需要在数据处理中聚合 Vpc 以及子网属性时,可以使用 JSONPath 中的 concat() 函数进行多个字段的聚合,并且在此基础上对字符串进行修改。
例如可以使用 $.concat($.data.Response.SubnetSet[0].VpcId,"#",$.data.Response.SubnetSet[0].SubnetId,"#",$.data.Response.SubnetSet[0].CidrBlock)) 语法拼接 VPC 和子网的属性,并且通过 # 字符加以分割。
运行结果如下所示,可以看出在测试结果中已经成功获取整合了 VPC 相关的资源信息:


相似文档
  • 操作背景: 通过 CKafka 连接器连接 CVM 自建的服务时,根据腾讯云网络团队制定的标准跨 VPC 资源访问方案,需要先将自建服务挂载到 CLB(负载均衡)上,才能实现跨 VPC 的资源访问。
  • 操作背景: 用户在使用 CKafka 连接器访问 CLS、COS 等服务时,需要授权连接器访问用户账号下 CLS、COS 等服务的权限。如果使用 CKafka 的子账号具备访问管理的策略权限(QcloudCamRoleFullAccess),在创建 CKafka 任务时勾选 角色授权,连接器将自动为您完成授权。否则,需要拥有管理员权限(AdministratorAccess)的用户进行相应的授权后,再使用子账号创建 连接器 任务。
  • 背景: debezium connector 最初仅在创建连接任务时会同步表的存量数据,后续新增的表无法触发存量数据的同步。为了支持新增表的存量数据同步功能,debezium 采用了“信号”的模式通知 connector 进行触发新增表的存量数据同步。
  • 如何选择实例的规格? 可以借助控制台的 "迁移上云 - 规格计算器" 来计算所需的产品规格:
  • 如何选取 CKafka 副本数? 建议您创建 Topic 时选择2副本或3副本存储数据,保障数据可靠性。默认创建的 Topic 是2副本,如果业务需要更高的可用性,则可以指定选择3副本运行。如果 Topic 需要更多的副本数,可以 提交工单 进行处理。新建 Topic 时副本选择方式如下图所示。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部