滴滴云 | 基于 Binlog 的采集架构与实践
桔妹导读:大数据是这个时代赋予我们的强大引擎,在数字化大潮中 ,借助数据驱动的方法推动业务乘风破浪,几乎是每家公司的核心战略。数据驱动的落脚点是数据,能否将组织或业务运行过程中的信息,进行有效收集并组织成信息流,是数据驱动的基石所在。本文分享了滴滴数据体系建设过程中,MySQL这一类数据源的采集架构和应用实践。 1. 背景 关系模型构建起整个数据分析的基石,关系型数据库作为具体实现、采集MySQL数据接入Hive是很多企业进行数据分析的前提。如何及时、准确的把MySQL数据同步到Hive呢? 一般解决方案是使用类似Sqoop的工具,直连MySQL去Select数据存储到HDFS,然后把HDFS数据Load到Hive中。这种方法简单易操作,但随着业务规模扩大,不足之处也逐步暴露出来: 直连MySQL查询,对于数据库压力较大(如订单表、支付表等),可能直接影响在线业务 数据整体就位时间(尤其大表)不满足下游生产需求 扩展性较差,对于分表、字段增减、变更等的支持较弱 拉取的数据是该时刻的镜像,无法获取中间变化情况 为解决上述问题,我们引入Binlog实时采集 + 离线还原的解决方案,本文将从这两个方面介绍整个数据的接入流程。 2. 整体数据流程 整体数据流程如上图所示,数据收集部分使用定制化Canal组件(基于阿里开源项目)收集binlog日志并做格式转换,然后通过消息队列传输并落地到HDFS,最后对HDFS上的binlog进行清洗还原入库。 如果是增量接入,上述操作就完成了一次入库流程。针对全量接入或者回溯历史数据,因为缺少历史binlog日志(发起采集时才开始收集)无法还原历史数据,此时需要借助离线一次性拉取,流程如下: 按照上述流程采集binlog日志增量入HDFS 使用离线一次性拉取一份历史全量数据,按字段还原到Hive作为基点(即第一个接入周期的数据) 使用前一个接入周期的全量数据和本周期的增量binlog做merge形成该周期内的数据。 相比一般解决方案,其优点比较明显,主要表现在: 基于Binlog日志的数据还原,与在线业务解耦 采集通过分布式队列实时传递,还原操作在集群上实现,及时性及可扩展性强 Binlog日志包括了增、删、改等明细动作,支持定制化的ETL 3. Binlog
一共有两种类型二进制记录方式: Statement模式:每一条会修改数据的sql都会被记录在binlog中,如inserts, updates, deletes。 Row模式: 每一行的具体变更事件都会被记录在binlog中。 Mixed模式是以上两种level的混合使用,默认使用Statement模式,根据具体场景自动切换到Row模式,Row(Mixed)模式从MySQL 5.1版本起可用。 滴滴的MySQL Binlog使用Row模式,记录了每次对数据进行增删改查时,一行数据在变更前后的值,同时无论单列是否被改动,都会记录一行数据的完整信息。
4. Canal
Canal主要运作方式如下: canal模拟mysql slave的交互协议,伪装自己为mysql slave,向master发送dump协议 mysql master收到dump请求,开始推送binary log到canal canal解析binary log对象,并将解析的结果编码成JSON格式的文本串 把解析后的文本串发送到消息队列并上报发送情况(如Kafka、DDMQ) 格式化后的单条记录新增消息示例如下: { "binlog": "25521@mysql-bin.000070", "time": 1450236307000, "canalTime": 1450236308279, "db": "TestCanal", "table": "g_order_010", "event": "u", "columns": [ {"n": "order_id", "t": "bigint(20)", "v": "126", "null": false, "updated": false}, {"n": "driver_id", "t": "bigint(20)", "v": "123456", "null": false, "updated": false}, { "n": "passenger_id", "t": "bigint(20)", "v": "654321", "null": false, "updated": false}, {"n": "current_lng", "t": "decimal(10,6)", "v": "39.021400", "null": false, "updated": false}, {"n": "current_lat", "t": "decimal(10,6)", "v": "120.423300", "null": false, "updated": false}, { "n": "starting_lng", "t": "decimal(10,6)", "v": "38.128000", "null": false, "updated": false}, { "n": "starting_lat", "t": "decimal(10,6)", "v": "121.445000", "null": false, "updated": false}, { "n": "dest_name", "t": "varchar(100)", "v": "Renmin University", "origin_val": "知春路", "null": false, "updated": true} ], "keys": ["order_id"] }{ "binlog": "25521@mysql-bin.000070", "time": 1450236307000, "canalTime": 1450236308279, "db": "TestCanal", "table": "g_order_010", "event": "u", "columns": [ {"n": "order_id", "t": "bigint(20)", "v": "126", "null": false, "updated": false}, {"n": "driver_id", "t": "bigint(20)", "v": "123456", "null": false, "updated": false}, { "n": "passenger_id", "t": "bigint(20)", "v": "654321", "null": false, "updated": false}, {"n": "current_lng", "t": "decimal(10,6)", "v": "39.021400", "null": false, "updated": false}, {"n": "current_lat", "t": "decimal(10,6)", "v": "120.423300", "null": false, "updated": false}, { "n": "starting_lng", "t": "decimal(10,6)", "v": "38.128000", "null": false, "updated": false}, { "n": "starting_lat", "t": "decimal(10,6)", "v": "121.445000", "null": false, "updated": false}, { "n": "dest_name", "t": "varchar(100)", "v": "Renmin University", "origin_val": "知春路", "null": false, "updated": true} ], "keys": ["order_id"] } 为保障整个Binlog链路中数据完整性,我们引入了Dquality服务。Dquality是数据通道中非常重要的一个环节,记录着整个数据通道每一个流程的数据信息,如某一段时间内的数据总和等。Dquality主要包含以下功能: 为数据回溯提供元数据支持 校验数据丢失与延迟情况 校验数据完整性 简单流程为数据链路上的各发送方在成功传递数据后,把投递结果以及时间信息发送到Dquality,Dquality统一汇总,分析判定每个时间段内数据是否完成及时准确传输,并把分析结果存储下来。下游数据使用方通过接口从Dquality查询该结果。 以Binlog链路为例,在Binlog流程中有两个环节Canal->MQ、MQ->HDFS,上报数据发送情况到Dquality。下游ETL环节使用Dquality接口查询数据就位情况,比如对于小时粒度任务,查询该小时的0分0秒到59分59秒之间的数据是否已经完成写入,如果已经完成写入,那么ETL任务就可以启动执行。 基于此,天或小时采集周期内的数据是固定的(幂等),以该时间段内的数据作为清洗基础,无论什么时候执行其结果不会改变。但在Canal上报环节,目前无法有效判定较小数据量场景和同步异常场景,一定程度上影响数据就位时间。 5. 一次性拉取&初始化 Binlog从发起采集的一刻起才会在整个链路上存在,即以增量的方式传递,那么对于历史数据如何获取?实际场景中包括全量接入或增量历史数据回溯。 目前实现方式为通过DataX工具直连MySQL离线库,拉取一份截至到当前时间的全量数据,然后按列还原到Hive表的首个分区中。 全量采集场景下,下个分区的数据基于上个分区的数据和当前周期内的增量Binlog日志merge,即可产生该分区内的数据。 上面介绍了基于Binlog数据接入的整体流程,下面列举两个实际解决的业务问题。 6. 场景一:数据飘移的支持 在实际业务中,存在很多类似的两种case,其采集周期存在一定的不确定性。 case 1:订单的Binlog日志中,当订单事件的更新时间在59分59秒左右时,数据有可能会落在下一个小时的分区,以至于当前小时数据没有统计到该条订单,同时下一个小时分区的数据也没有打上相应的事件标签。 case 2:支付结算系统,当天所有交易记录会在次日凌晨后结算完成,按照默认采集逻辑,当天的记录落在次日的变更内,无法有效支持当天核算。 以上两个case的常规解决方案可能是把下个小时的数据也囊括到本采集周期内,但会导致数据就位时间延迟一个小时,扩散到数据下游,时间会更长,可能不满足实际需求。采集平台提供数据漂移的功能,即按需配置偏移量。比如小时粒度默认为00:00 - 59:59之间的数据,配置5min的偏移,那么数据区间为00:00 - 04:59(次小时),多出来的部分可以有效解决数据漂移功能,同时为及时性提供了有效支撑。 该功能在专快订单、财务应付应收以及国际化部分都有应用。但需要注意的是,下个采集周期内也包含了这部分实际发生在该区间内的数据。 7. 场景二:分库分表的支持 业务发展,不可避免会有分库分表的诉求,其规则也可能多种多样,如table_{城市区号},table_{连续数字},table_{日期},如果逐个抽取并聚合,上下游的成本巨大。因此我们需要在数据规范层面,数据链路上保障能自动化收集这类数据。 1. 统一MySQL使用规范,明确分库分表的命名规则,做到规则内自动化识别,同时完成全量元数据信息的收集,非规范化的命名规则无法自动化支持。 2. 默认情况下一个库的数据会收集到一个topic内,如果有分库存在也可以一并收集到一个topic内,保证逻辑上分库分表的数据物理上收集到一起。 3. 按照/{db}/{table}/{year}/{month}/{day}/{hour}的路径结构(其中日期由Binlog时间格式化生成)落地到HDFS上,一个逻辑表的数据存储在一起。 4. ETL处理阶段,取出上述路径下的Binlog日志,还原到Hive中。 为用户更好使用分库分表数据以及获取中间变化过程,ETL阶段额外再Hive表中写入三个字段: system_rule_etl_update_field 记录更新时间,更新晚的对应该字段的值更大,前十位是时间戳信息 system_rule_etl_delete_flag 标识本条记录是否在上游数据库中被删除,0-正常记录,1-删除记录 system_rule_etl_uniq_key 全局主键,由mysql库名+表名+主键拼接而成
图片作为数据建设的基础,数据平台提供的基于Binlog的MySQL入Hive服务,覆盖公司内部各个业务线,日1.9w+同步任务,近50T数据同步量,实时层面毫秒级别延迟,实现了及时、准确、定制化的同步需求。但在个性化ETL、性能优化、内容建设等方面还存在未解决的问题,后续我们会在这些方面重点发力,更好的助力业务发展。 |
全部评论
暂无评论
最新文章
热点排行
-
1文件存储 NAS 和 对象存储 OSS 的区别
-
2天翼云云电脑 | 让电视一秒变身电脑
-
3阿里工程师太凶残了,竟把服务器泡在“水里”!
-
4腾讯云 | 想在微信群里发起9人以上的语音聊天怎么办?
-
5数据仓库终结者:Dremio
-
62020年云服务器哪家强:阿里云、腾讯云、华为云、UCloud测评报告
-
72020 年 Q1 中国云市场份额:阿里云第一、华为云跃居第二、腾讯云下降为第三
-
8阿里云 web 应用防火墙(WAF)价格:179元/年
-
92020 Q1 中国公有云市场份额 TOP3:阿里云、腾讯云、华为云
-
10郑大一附院系统瘫痪 2 小时,运维人员被判 5 年半:破坏计算机信息系统罪
有话要说