上云无忧 > 文档中心 > 腾讯云消息队列 CKafka 实战教程 - 数据简单清洗
消息队列 CKafka
腾讯云消息队列 CKafka 实战教程 - 数据简单清洗

文档简介:
操作场景: 在使用 CKafka 连接器在进行数据流入流出服务时,可能会遇到如下情况: 需要对消息中的特定字段进行解析,得到相关信息。 需要多次迭代处理消息中某一字段。 需要处理为未经过结构化的原始消息后,才能继续使用。 需要处理多层嵌套格式的消息。
*此产品及展示信息均由腾讯云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

操作场景

在使用 CKafka 连接器在进行数据流入流出服务时,可能会遇到如下情况:
需要对消息中的特定字段进行解析,得到相关信息。
需要多次迭代处理消息中某一字段。
需要处理为未经过结构化的原始消息后,才能继续使用。
需要处理多层嵌套格式的消息。
目前 CKafka 团队的技术专家经过长期的经验总结,全新推出了数据处理 V2.0 版本。 该版本在完美兼容原有数据处理 V1.0 的基础上,还增加了以下特性:
插件丰富:数据处理支持丰富的处理预制插件,利于快速生成所需的消费格式。
链式处理:数据处理支持消息链式处理,即上一次的处理结果能够作为本次处理的输入参数,利于处理复杂结构。
可视预览:数据处理支持实时 JSON 结构化预览,利于迅速定位排查问题。
类型转换:数据处理支持不同数据类型间的转换,利于校验数据格式。

运行原理

整个数据处理过程如下图所示,各个组件结构说明如下:

数据处理组件集群中,多个 worker 共同组成一个消费者组,批量读取源 topic 中的消息,并且对于每条消息顺序执行处理操作。
对于每条消息,根据控制台设置的处理链顺序,依次展开消息字段的嵌套结构,并进行替换/截取/数据转换/时间格式化等操作。
下一条处理链读取上一条处理链结果,并且进行本轮的链式处理,并将处理结果投递到下一轮。
执行完最后一轮处理链后的结果,投递到设置的目标 topic 中,至此完成该条消息的数据处理操作。

前提条件

需开通 CKafka 服务。
需消息格式为 JSON 格式或字符串格式,目前暂不支持其他编码协议。

实际应用

处理字符串类型格式日志

Nginx 格式日志通常如下所示,这种 Nginx 默认格式也被称作 combined,可以从中解析出请求者信息,报文信息,请求状态等信息,以便于对数据进行进一步分析。

		

66.249.65.159 - - [06/Nov/2014:19:10:38 +0600] "GET /news/53f8d72920ba2744fe873ebc.html

HTTP/1.1" 404 177 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML,

like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"

由于 Nginx 的 combined 格式采用空格和 - 分隔符来分隔数据,因此按照下文顺序设计解析流程:
1. 首先使用 "分隔符 进行初步数据解析,此时数据处理将会自动将日志转换成 JSON 结构

		
{
"0": "66.249.65.159 - - [06/Nov/2014:19:10:38 +0600] ",
"1": "GET /news/53f8d72920ba2744fe873ebc.html HTTP/1.1",
"2": " 404 177 ",

"5": "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26

(KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25

(compatible; Googlebot/2.1; +http://www.google.com/bot.html)"

}

2. 从上文 JSON 结构中可知,键名为 02 的字段由于 - 和空格的影响,还存在连接的耦合数据。因此再分别对这两个键进行 - 和空格进行 分隔符 拆分。 拆分后的 JSON 结果如下所示:

		
{
"1": "GET /news/53f8d72920ba2744fe873ebc.html HTTP/1.1",

"5": "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26

(KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25

(compatible; Googlebot/2.1; +http://www.google.com/bot.html)",

"0.0": "66.249.65.159 ",
"0.2": " [06/Nov/2014:19:10:38 +0600] ",
"2.1": "404",
"2.2": "177"
}

3. 由于时间格式前后还带有 [] 方括号,再使用一次 分隔符 进行截取,截取后的 JSON 如下所示:

		
{
"1": "GET /news/53f8d72920ba2744fe873ebc.html HTTP/1.1",

"5": "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26

(KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25

(compatible; Googlebot/2.1; +http://www.google.com/bot.html)",

"0.0": "66.249.65.159 ",
"0.2": "06/Nov/2014:19:10:38 +0600",
"2.1": "404",
"2.2": "177"
}

4. 由于所有字段都已经被合适拆分,最后根据对应字段属性,给相应映射字段的 key 设置名称即可。修改后最终结果如下所示:

		
{
"request": "GET /news/53f8d72920ba2744fe873ebc.html HTTP/1.1",

"http_user_agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X)

AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25

(compatible; Googlebot/2.1; +http://www.google.com/bot.html)",

"remote_addr": "66.249.65.159 ",
"dateTime": "06/Nov/2014:19:10:38 +0600",
"status": "404",
"body_bytes_sent ": "177"
}

处理嵌套类型格式日志

TKE 采集格式通常如下所示,TKE 采集器会将 metadata 数据放入 JSON 结构中的 kubernetes 字段中,采集到的日志放入 log 字段中。大致结构如下所示:

		
{
"@timestamp": 1648803500.63659,

"@filepath": "/var/log/tke-log-agent/test7/c816991f-adfe-4617-8cf3-9997aea90ded

/c_tke-es-687995d557-n29jr_default_nginx-add90ccf49626ef42d5615a636aae

74d6380996043cf6f6560d8131f21a4d8ba/jgw_INFO_2022-02-10_15_4.log",

"log": "15:00:00.000[4349811564226374227] [http-nio-8081-exec-64] INFO

com.qcloud.jgw.gateway.server.topic.TopicService",

"kubernetes": {
"pod_name": "tke-es-687995d557-n29jr",
"namespace_name": "default",
"pod_id": "c816991f-adfe-4617-8cf3-9997aea90ded",
"labels": {
"k8s-app": "tke-es",
"pod-template-hash": "687995d557",
"qcloud-app": "tke-es"
},
"annotations": {
"qcloud-redeploy-timestamp": "1648016531476",

"tke.cloud.tencent.com/networks-status": "[{\n \"name\": \"tke-bridge\",\n

\"interface\": \"eth0\",\n \"ips\": [\n \"172.16.0.31\"\n ],\n

\"mac\": \"ae:61:12:4a:c2:ba\",\n \"default\": true,\n \"dns\": {}\n}]"

},
"host": "10.0.96.47",
"container_name": "nginx",
"docker_id": "add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba",
"container_hash": "nginx@sha256:e1211ac17b29b585ed1aee166a17fad63d344bc973bc63849d74c6452d549b3e",
"container_image": "nginx"
}
}

当日志采集后投递进 ElasticSearch 时,由于支持嵌套 JSON 结构,因此不需要对数据做出太大改动。但当需要投递到数据库时,此时就需要将数据转换成能够识别的单层 JSON 格式,因此按照下文顺序设计解析流程:
1. 使用 JSONPATH 语句处理第一层嵌套的 JSON 结构 $.kubernetes,解析模式选择 JSON。首先将嵌套 JSON 结构转换为单层 JSON 结构。测试后结果如下所示:

		
{
"@timestamp": 1.64880350063659E9,

"@filepath": "/var/log/tke-log-agent/test7/c816991f-adfe-4617-8cf3-9997aea90ded/

c_tke-es-687995d557-n29jr_default_nginx-add90ccf49626ef42d5615a636aae

74d6380996043cf6f6560d8131f21a4d8ba/jgw_INFO_2022-02-10_15_4.log",

"log": "15:00:00.000[4349811564226374227] [http-nio-8081-exec-64] INFO

com.qcloud.jgw.gateway.server.topic.TopicService",

"$.kubernetes.pod_name": "tke-es-687995d557-n29jr",
"$.kubernetes.namespace_name": "default",
"$.kubernetes.pod_id": "c816991f-adfe-4617-8cf3-9997aea90ded",
"$.kubernetes.labels": {
"k8s-app": "tke-es",
"pod-template-hash": "687995d557",
"qcloud-app": "tke-es"
},
"$.kubernetes.annotations": {
"qcloud-redeploy-timestamp": "1648016531476",

"tke.cloud.tencent.com/networks-status": "[{\n \"name\": \"tke-bridge\",\n

\"interface\": \"eth0\",\n \"ips\": [\n \"172.16.0.31\"\n ],\n

\"mac\": \"ae:61:12:4a:c2:ba\",\n \"default\": true,\n \"dns\": {}\n}]"

},
"$.kubernetes.host": "10.0.96.47",
"$.kubernetes.container_name": "nginx",
"$.kubernetes.docker_id": "add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba",
"$.kubernetes.container_hash": "nginx@sha256:e1211ac17b29b585ed1aee166a17fad63d344bc973bc63849d74c6452d549b3e",
"$.kubernetes.container_image": "nginx"
}

2. 依次处理第二层嵌套结构 $.kubernetes.annotations$.kubernetes.labels。在处理链中使用 Map 方式选中这两个名称,即可将嵌套格式转换成单层 JSON 格式。处理后如下所示:

		
{
"@timestamp": 1648803500.63659,

"@filepath": "/var/log/tke-log-agent/test7/c816991f-adfe-4617-8cf3-9997aea90ded

/c_tke-es-687995d557-n29jr_default_nginx-add90ccf49626ef42d5615a636aae

74d6380996043cf6f6560d8131f21a4d8ba/jgw_INFO_2022-02-10_15_4.log",

"log": "15:00:00.000[4349811564226374227] [http-nio-8081-exec-64] INFO

com.qcloud.jgw.gateway.server.topic.TopicService",

"$.kubernetes.pod_name": "tke-es-687995d557-n29jr",
"$.kubernetes.namespace_name": "default",
"$.kubernetes.pod_id": "c816991f-adfe-4617-8cf3-9997aea90ded",
"$.kubernetes.host": "10.0.96.47",
"$.kubernetes.container_name": "nginx",
"$.kubernetes.docker_id": "add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba",
"$.kubernetes.container_hash": "nginx@sha256:e1211ac17b29b585ed1aee166a17fad63d344bc973bc63849d74c6452d549b3e",
"$.kubernetes.container_image": "nginx",
"$.kubernetes.labels.k8s-app": "tke-es",
"$.kubernetes.labels.pod-template-hash": "687995d557",
"$.kubernetes.labels.qcloud-app": "tke-es",
"$.kubernetes.annotations.qcloud-redeploy-timestamp": "1648016531476",

"$.kubernetes.annotations.tke.cloud.tencent.com/networks-status": "[{\n

\"name\": \"tke-bridge\",\n \"interface\": \"eth0\",\n \"ips\": [\n

\"172.16.0.31\"\n ],\n \"mac\": \"ae:61:12:4a:c2:ba\",\n \"default\": true,\n \"dns\": {}\n}]"

}

3. 最后修改相应映射字段的 key 为所需名称,并且删去不需要的字段。此时单击 添加处理链 后,打开 处理上层所有结果 按钮,整理优化后可以参见如下所示:

		
{
"@timestamp": 1.64880350063659E9,

"@filepath": "/var/log/tke-log-agent/test7/c816991f-adfe-4617-8cf3-9997

aea90ded/c_tke-es-687995d557-n29jr_default_nginx-add90ccf49626ef42d5615a636

aae74d6380996043cf6f6560d8131f21a4d8ba/jgw_INFO_2022-02-10_15_4.log",

"log": "15:00:00.000[4349811564226374227] [http-nio-8081-exec-64] INFO

com.qcloud.jgw.gateway.server.topic.TopicService",

"pod_name": "tke-es-687995d557-n29jr",
"namespace_name": "default",
"pod_id": "c816991f-adfe-4617-8cf3-9997aea90ded",
"host": "10.0.96.47",
"container_name": "nginx",
"docker_id": "add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba"
}

说明
在使用 JSONPath 处理参数时,当 key 中本身带有英文句号 . 时,需要在路径中方括号和单引号进行隔离。
例如 {"key1.key2":"value1"} 中,要想取得对应字段,则需要使用 $.['key1.key2'] 进行获取相应键值。

处理字符串序列化 JSON 格式日志

有时 JSON 格式在传输过程中,由于格式或性能等需要,需要转义成字符串格式的形式,此处把这种格式叫做 Raw JSON。需要在数据处理中重新将字符串反序列化成 JSON 格式。 此处以 MongoDB 中的 Raw JSON 格式为例,大致结构如下所示:

		
{

"key": " {\n \"categories\": [\"dev\"],\n \"created_at\":

\"2020-01-05 13:42:19.324003\",\n \"icon_url\": \"https://assets

.chucknorris.host/img/avatar/chuck-norris.png\",\n \"id\":

\"elgv2wkvt8ioag6xywykbq\",\n \"updated_at\": \"2020-01-05

13:42:19.324003\",\n \"url\": \"https://api.chucknorris.io/jokes

/elgv2wkvt8ioag6xywykbq\",\n \"value\": \"Chuck Norris's keyboard

doesn't have a Ctrl key because nothing controls Chuck Norris.\"\n }\n"

}

如果在数据处理中就将 Raw JSON 转换成普通的 JSON 格式,就能按照字段格式直接投递到目标下游中。大致处理思路如下所示:
1. 首先设置 解析模式 为 JSON。这样能够将消息先预读取成内部的 MAP 格式。解析后如下所示:

		
{

"key": " {\n \"categories\": [\"dev\"],\n \"created_at\":

\"2020-01-05 13:42:19.324003\",\n \"icon_url\": \"https://assets.

chucknorris.host/img/avatar/chuck-norris.png\",\n \"id\": \"elgv2wkvt8i

oag6xywykbq\",\n \"updated_at\": \"2020-01-05 13:42:19.324003\",\n

\"url\": \"https://api.chucknorris.io/jokes/elgv2wkvt8ioag6xywykbq\",\n

\"value\": \"Chuck Norris's keyboard doesn't have a Ctrl key because

nothing controls Chuck Norris.\"\n }\n"

}

2. 单击 添加处理链,使用 MAP 方式选中 key,解析方式选择 JSON。这样数据处理就能将 RAW JSON 自动转换成 JSON 形式。解析后如下所示:
		
{
"key.categories": [
"dev"
],
"key.created_at": "2020-01-05 13:42:19.324003",
"key.icon_url": "https://assets.chucknorris.host/img/avatar/chuck-norris.png",
"key.id": "elgv2wkvt8ioag6xywykbq",
"key.updated_at": "2020-01-05 13:42:19.324003",
"key.url": "https://api.chucknorris.io/jokes/elgv2wkvt8ioag6xywykbq",
"key.value": "Chuck Norris's keyboard doesn't have a Ctrl key because nothing controls Chuck Norris."
}
3. 最后单击 添加处理链 后,打开 处理上层所有结果 按钮,修改相应映射字段的 key 为所需名称,并且删去不需要的字段。整理优化后可以参见如下所示:
		
{
"categories": [
"dev"
],
"created_at": "2020-01-05 13:42:19.324003",
"icon_url": "https://assets.chucknorris.host/img/avatar/chuck-norris.png",
"id": "elgv2wkvt8ioag6xywykbq",
"updated_at": "2020-01-05 13:42:19.324003",
"url": "https://api.chucknorris.io/jokes/elgv2wkvt8ioag6xywykbq",
"value": "Chuck Norris's keyboard doesn't have a Ctrl key because nothing controls Chuck Norris."
}

说明

目前 RAW JSON 只支持解析 MAP 类型的数据。当最外层为 List 类型时,例如 "[\"test1\",\"test2\"]"

或者 "[{\"key\":\"value\"}]",由于无法解析合适的键值,因此将会提示解析失败。

相似文档
  • 操作场景: 在使用 TKE 容器服务 时,先前通常使用如下方法,来获取并查询部署的组件服务日志: 1. 从 TKE 控制台 登录容器节点,切换到日志所在文件夹并查看日志。 2. 从 TKE 控制台 查询重定向至标准输出的容器日志。 3. SSH 终端 登录集群节点,查询挂载在宿主机上的日志,或使用 Kubectl 登录容器查询日志。 4. 使用 CLS 采集器 采集日志,并在 日志服务 的控制台查询。
  • Flink 简介: Apache Flink 是一个可以处理流数据的实时处理框架,用于在无界和有界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
  • 无论是使用传统的 Avro API 自定义序列化类与反序列化类,还是使用 Twitter 的 Bijection 类库实现 Avro 的序列化与反序列化,两种方法有相同的缺点:在每条 Kafka 记录里都嵌入了 Schema,从而导致记录的大小成倍增加。但是不管怎样,在读取记录时仍然需要用到整个 Schema,所以要先找到 Schema。
  • Spark Streaming 是 Spark Core 的一个扩展,用于高吞吐且容错地处理持续性的数据,目前支持的外部输入有 Kafka、Flume、HDFS/S3、Kinesis、Twitter 和 TCP socket。
  • Apache Flume 是一个分布式、可靠、高可用的日志收集系统,支持各种各样的数据来源(如 HTTP、Log 文件、JMS、监听端口数据等),能将这些数据源的海量日志数据进行高效收集、聚合、移动,最后存储到指定存储系统中(如 Kafka、分布式文件系统、Solr 搜索服务器等)。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部