上云无忧 > 文档中心 > 百度智能云流式计算 BSC 实践 - API 日志调用统计
百度流式计算 BSC
百度智能云流式计算 BSC 实践 - API 日志调用统计

文档简介:
概览: 用户拥有多台服务器,托管了一些 API 调用服务,现在想统计 API 的调用情况,形成图表。 需求场景: 所有机器的 API 调用日志通过 自定义日志采集程序 进行日志采集后推送到 百度消息服务(BKAFKA)中作为流式计算 source , 在我们 BSC 中创建 FLINK_STREAM/SQL 类型的作业用于 API 日志的聚合统计 【百度智能云】流式计算BSC
*此产品及展示信息均由百度智能云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

概览

用户拥有多台服务器,托管了一些 API 调用服务,现在想统计 API 的调用情况,形成图表。

需求场景

所有机器的 API 调用日志通过 自定义日志采集程序 进行日志采集后推送到 百度消息服务(BKAFKA)中作为流式计算 source , 在我们 BSC 中创建 FLINK_STREAM/SQL 类型的作业用于 API 日志的聚合统计,并实时将聚合结果写到 时序时空数据库(TSDB)当中,用户可以通过 TSDB 的可视化面板或者利用 数据可视化工具(如 Sugar BI)等调用 TSDB 的数据 API 完成数据展示。

方案概述

服务器 → 自定义日志采集程序 → BKAFKA → BSC → TSDB → Sugar BI

配置步骤

一个完整的 Flink SQL 作业由 source 表、sink 表和 DML 语句构成。

定义 BKAFKA Source 表

CREATE TABLE source_kafka_table ( `timestamp` BIGINT, `status` INTEGER, 
`contentLength` BIGINT, `latency` BIGINT, `groupUuid` STRING, `apiUuid` STRING )
 WITH ( 'connector.type' = 'BKAFKA', 'format.encode' = 'JSON', 'connector.topic' 
= 'xxxxxxxxx__bsc-source', 'connector.properties.bootstrap.servers' = 
'kafka.bd.baidubce.com:9071', 'connector.properties.ssl.filename' = 
'kafka-key_bd.zip', 'connector.properties.group.id' = 'test_group', 
'connector.read.startup.mode' = 'latest', 'watermark.field' = 
'timestamp', 'watermark.threshold' = '1 minutes' );

定义 TSDB Sink 表

CREATE TABLE sink_tsdb_table ( `datapoints` ARRAY < ROW( `timestamp` BIGINT, `metric`
 STRING, `value` BIGINT, `tags` MAP < STRING, STRING > ) > ) WITH ( 'connector.type' = 'TSDB',
 'format.encode' = 'JSON', 'connector.emit' = 'BATCH', 'connector.url' = 'http://xxxxxxx.
tsdb-ej9v6mg6q8z9.tsdb.iot.bj.baidubce.com', 'connector.write.max-message-num-per-batch' = '2000' );

编写数据统计DML语句

统计每分钟按照 apiUuid、groupUuid、status 进行聚合的结果,每个 Query 产生3个 TSDB datapoints,并实时写入到 TSDB 中。这里通过嵌套子查询的方式来使SQL结构更加清晰。选取 timestamp 字段作为 Eventtime 的watermark,延迟设置为1分钟。聚合时采用滚动窗口,窗口大小为1分钟。

INSERT INTO sink_tsdb_table SELECT ARRAY [ ROW(`timestamp`, `count_name` , `count`, `common_tags`), 
ROW(`timestamp`, `traffic_name`, `traffic`, `common_tags`), ROW(`timestamp`, `latency_name`, 
`latency`, `common_tags`) ] FROM ( SELECT `timestamp`, 'count' AS `count_name`, `count`, 
'traffic' AS `traffic_name`, `traffic`, 'latency' AS `latency_name`, `latency`, MAP ['apiUuid', 
`apiUuid`, 'groupUuid', `groupUuid`, 'status', `status`] AS `common_tags` FROM ( SELECT TO_BIGINT
(TUMBLE_START(`timestamp`, INTERVAL '1' MINUTE)) AS `timestamp`, COUNT(1) AS `count`, 
SUM(contentLength) AS `traffic`, SUM(latency) AS `latency`, `apiUuid` AS `apiUuid`, `groupUuid` AS 
`groupUuid`, `status` AS `status` FROM ( SELECT `timestamp`, `contentLength`, `latency`, 
`apiUuid`, `groupUuid`, CASE WHEN status >= 200 AND status < 300 THEN '2xx' WHEN status >= 300
 AND status < 200 THEN '3xx' WHEN status >= 400 AND status < 500 THEN '4xx' WHEN status >= 500
 AND status < 600 THEN '5xx' ELSE 'oth' END AS `status` FROM source_kafka_table ) AS taba GROUP
 BY TUMBLE(`timestamp`, INTERVAL '1' MINUTE), `apiUuid`, `groupUuid`, `status` ) AS tabb ) AS tabc

相关产品

消息服务 for Kafka、时序时空数据库TSDB、数据可视化Sugar BI

相似文档
  • 概览: 统计每个设备每分钟报警次数。 需求场景: 用户拥有1千多台设备,分布在不同城市的多个厂区,每个设备上的传感器大概每5秒采集并上传数据到 物联网核心套件(IoT Core)或 物接入(IoT Hub) 的 MQTT 当中作为第一个 source 【百度智能云】流式计算BSC
  • 流计算作业中的代码包括几部分? 流计算作业中的代码包括DDL语句(用于创建输入端、输出端)、DML语句(业务逻辑的执行语句)。目前,同一个作业内支持多个输入端、一个输出端。 【百度智能云】流式计算BSC
  • Sugar BI是百度智能云推出的敏捷 BI 和数据可视化平台,通过拖拽图表组件可实现 5 分钟搭建数据可视化页面,组件丰富,开箱即用,无需SQL和任何编码。通过可视化图表及强大的交互分析能力,企业可使用 Sugar BI有效助力自己的业务决策。
  • V4.6.1 / 2022-10-10: 新功能:历史版本、回收站。 新功能:私有部署中支持「数据服务」。 API 方式绑定图表数据时,支持API 认证。 数据模型的自定义 SQL 视图中支持嵌入 URL 参数等。 数据模型中整数类型的字段支持转字符串。 报表中也支持隐藏图表的 loading 效果。
  • 组织: 一般指中小型企业,事业单位,学校院系或大型公司的部门。Sugar BI将会按照组织进行费用的收取,一个用户可以属于多个组织,一个组织下有多个用户。详见什么是组织。 空间: 组织下面可以创建多个空间,一般是建议按照项目或者团队来划分空间,空间之间数据不共享,每个空间下都有自己独立的权限管理。详见什么是空间。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部