上云无忧 > 文档中心 > 腾讯云消息队列 CKafka - 数据处理规则说明
消息队列 CKafka
腾讯云消息队列 CKafka - 数据处理规则说明

文档简介:
概览: 在通过 CKafka 连接器处理数据流入流出任务时,通常需要对数据进行简易的清洗操作,如格式化原始数据、解析特定字段、数据格式转换等等。开发者往往需要自己搭建一套数据清洗的服务(ETL)。
*此产品及展示信息均由腾讯云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

概览

在通过 CKafka 连接器处理数据流入流出任务时,通常需要对数据进行简易的清洗操作,如格式化原始数据、解析特定字段、数据格式转换等等。开发者往往需要自己搭建一套数据清洗的服务(ETL)。
Logstash 是一款免费且开放的服务器端数据处理管道,能够从多个数据源采集数据,转换数据,然后将数据发送到相应的“存储库”中。 logstash 拥有丰富的过滤器插件,这使得 logstash 成为了被广泛使用的一款功能强大的数据转换工具。
然而搭建、配置、维护自己的 logstash 服务会增大开发和运维的难度,为此 Ckafka 提供了一套对标 logstash 的数据处理服务,开发者仅需通过控制台交互界面就可以新建自己的数据处理任务。数据处理服务允许用户编辑相应的数据处理规则,支持构建链式处理,同时可以预览数据处理的结果,帮助用户轻松高效的构建一套数据处理服务,满足数据清洗和转换的需求。

功能对标清单

Logstash
连接器数据处理服务
功能
Codec.json
数据解析(JSON)
Filter.grok
数据解析(正则匹配)
Filter.mutate.split
数据解析(字符分割)
Filter.date
日期格式处理
Filter.json
解析内部json结构
Filter.mutate.convert
数据修改(格式转换)
Filter.mutate.gsub
数据修改(字符替换)
Filter.mutate.strip
数据修改(去除首尾空格)
Filter.mutate.join
数据修改(拼接字段)
Filter.mutate.rename
字段修改(更改字段名)
Filter.mutate.update
字段修改(更新字段)
Filter.mutate.replace
字段修改(替换字段)
Filter.mutate.add_field
字段修改(添加字段)
Filter.mutate.remove_field
字段修改(删除字段)
Filter.mutate.copy
字段修改(复制字段值)
Filter.mutate.merge

TODO
Filter.mutate.uppercase

TODO
Filter.mutate.lowercase

TODO

操作方法介绍

数据解析

logstash 处理方式:
		
// Codec.json
input {
file {
path => "/var/log/nginx/access.log_json""
codec => "json"
}
}
// Filter.grok
filter {
grok {
match => {
"message" => "\s+(?<request_time>\d+(?:\.\d+)?)\s+"
}
}
}
// Filter.mutate.split
filter {
split {
field => "message"
terminator => "#"
}
}
连接器处理方式: 通过选择相应的数据解析模式 ,并一键点击即可预览:

日期格式处理

logstash 处理方式:
		
// Filter.date
filter {
date {
match => ["client_time", "yyyy-MM-dd HH:mm:ss"]
}
}
连接器处理方式:
1.1 可以通过预设系统当前时间给某字段赋值:

1.2 通过 处理 value 功能来对数据内容进行处理:

解析内部 JSON 结构

logstash 处理方式:
		
// Filter.json
filter {
json {
source => "message"
target => "jsoncontent"
}
}
连接器处理方式: 通过对某字段选择 MAP 操作来对其进行解析,从而把特定字段解析为 JSON 格式:

数据修改

logstash 处理方式:
		
// Filter.mutate.convert
filter {
mutate {
convert => ["request_time", "float"]
}
}
// Filter.mutate.gsub
filter {
mutate {
gsub => ["urlparams", ",", "_"]
}
}
// Filter.mutate.strip
filter {
mutate {
strip => ["field1", "field2"]
}
}
// Filter.mutate.join
filter {
mutate {
join => { "fieldname" => "," }
}
}
连接器处理方式: 通过选择相应的处理 value 功能一键定义规则:

1.1 通过选择数据格式一键更改相应字段的数据格式:
1.2 通过 JSONPATH 语法实现 join 的拼接功能,如是用 $.concat($.data.Response.SubnetSet[0].VpcId,&quot;#&quot;,$.data.Response.SubnetSet[0].SubnetId,&quot;#&quot;,$.data.Response.SubnetSet[0].CidrBlock)) 语法拼接 Vpc 和子网的属性,并且通过 # 字符加以分割。
1.3 结果如下:

字段修改

logstash 处理方式:
		
// Filter.mutate.rename
filter {
mutate {
rename => ["syslog_host", "host"]
}
}
// Filter.mutate.update
filter {
mutate {
update => { "sample" => "My new message" }
}
}
// Filter.mutate.replace
filter {
mutate {
replace => { "message" => "%{source_host}: My new message" }
}
}
// Filter.mutate.add_field
filter {
mutate {
split => { "hostname" => "." }
add_field => { "shortHostname" => "%{[hostname][0]}" }
}
}
// Filter.mutate.remove_field
filter {
mutate {
remove_field => ["field_name"]
}
}
// Filter.mutate.copy
filter {
mutate {
copy => { "source_field" => "dest_field" }
}
}
连接器处理方式:

实际案例演示

案例1:多级字段解析

输入 message:
		
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"beat": {
"hostname": "test-server",
"ip": "6.6.6.6",
"version": "5.6.9"
},
"input_type": "log",
"message": "{\"userId\":888,\"userName\":\"testUser\"}",
"offset": 3030131,
}
目标 message :
		
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"input_type": "log",
"hostname": "test-server",
"ip": "6.6.6.6",
"userId": 888,
"userName": "testUser"
}
连接器配置方法:
1.1 处理链 1 配置如下:

1.2 处理链 1 结果如下:
		
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"input_type": "log",
"message": "{\"userId\":888,\"userName\":\"testUser\"}",
"hostname": "test-server",
"ip": "6.6.6.6"
}
1.3 处理链 2 配置如下:

1.4 处理链 2 结果如下:
		
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"input_type": "log",
"hostname": "test-server",
"ip": "6.6.6.6",
"userId": 888,
"userName": "testUser"
}

案例2:非 JSON 数据解析

输入 message :
		
region=Shanghai$area=a1$server=6.6.6.6$user=testUser$timeStamp=2022-02-26T22:25:33.210Z
目标 message:
		
{
"region": "Shanghai",
"area": "a1",
"server": "6.6.6.6",
"user": "testUser",
"timeStamp": "2022-02-27 06:25:33",
"processTimeStamp": "2022-06-27 11:14:49"
}
连接器配置方法:
1.1 使用分隔符 $ 对原始 message 进行解析

1.2 初步解析结果:
		
{
"0": "region=Shanghai",
"1": "area=a1",
"2": "server=6.6.6.6",
"3": "user=testUser",
"4": "timeStamp=2022-02-26T22:25:33.210Z"
}
1.3 使用分隔符 = 对结果二次解析:

1.4 二次解析结果:
		
{
"0": "region=Shanghai",
"1": "area=a1",
"2": "server=6.6.6.6",
"3": "user=testUser",
"4": "timeStamp=2022-02-26T22:25:33.210Z",
"0.region": "Shanghai",
"1.area": "a1",
"2.server": "6.6.6.6",
"3.user": "testUser",
"4.timeStamp": "2022-02-26T22:25:33.210Z"
}
1.5 对字段进行编辑、删减,调整时间戳格式,并新增当前系统时间字段:


最终结果:
{
"region": "Shanghai",
"area": "a1",
"server": "6.6.6.6",
"user": "testUser",
"timeStamp": "2022-02-27 06:25:33",
"processTimeStamp": "2022-06-27 11:14:49"
}
相似文档
  • Ckafka 连接器的数据处理功能提供了根据正则表达式提取消息内容的能力,正则提取采用的是开源的正则提取包 re2 。 Java 的标准正则表达式包 java.util.regex 以及其他被广泛使用的正则表达式包如 PCRE、Perlre和 Python(re),都使用回溯实现策略,即当一个 pattern 出现两个替代方案a|b 的时候,引擎将首先尝试匹配子模式a,如果匹配失败,它将重置输入流并尝试匹配子模式 b。
  • 操作场景: JSON 目前是互联网信息传递中最通用的格式协议之一。目前数据处理也主要围绕 JSON 数据格式进行解析处理。 JSONPath 是针对 JSON 格式推出的消息查询语法规范。在数据处理中,不仅能够使用简单的 JSONPath 语法,快速获取复杂嵌套 JSON 结构体的某一成员的值;还能使用 JayWay 库的扩展函数,聚合或操作某一类型的成员字段。
  • 操作背景: 通过 CKafka 连接器连接 CVM 自建的服务时,根据腾讯云网络团队制定的标准跨 VPC 资源访问方案,需要先将自建服务挂载到 CLB(负载均衡)上,才能实现跨 VPC 的资源访问。
  • 操作背景: 用户在使用 CKafka 连接器访问 CLS、COS 等服务时,需要授权连接器访问用户账号下 CLS、COS 等服务的权限。如果使用 CKafka 的子账号具备访问管理的策略权限(QcloudCamRoleFullAccess),在创建 CKafka 任务时勾选 角色授权,连接器将自动为您完成授权。否则,需要拥有管理员权限(AdministratorAccess)的用户进行相应的授权后,再使用子账号创建 连接器 任务。
  • 背景: debezium connector 最初仅在创建连接任务时会同步表的存量数据,后续新增的表无法触发存量数据的同步。为了支持新增表的存量数据同步功能,debezium 采用了“信号”的模式通知 connector 进行触发新增表的存量数据同步。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部