上云无忧 > 文档中心 > 天翼云数据湖探索使用教程 - 创建并提交Flink SQL作业
数据湖探索
天翼云数据湖探索使用教程 - 创建并提交Flink SQL作业

文档简介:
本章节主要介绍创建并提交Flink SQL作业 。 使用DLI提交Flink SQL作业进行实时计算。基本流程如下: 1.登录云。 2.准备数据源和数据输出通道。 3.创建OBS桶保存输出数据。 4.登录DLI管理控制台。 5.创建队列。 6.创建增强型跨源连接。 7.创建跨源认证。 8.配置安全组规则和测试地址连通性。 9.创建FlinkSQL作业。
*产品来源:中国电信天翼云。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

使用DLI提交Flink SQL作业进行实时计算。基本流程如下:

1.登录云

2.准备数据源和数据输出通道

3.创建OBS桶保存输出数据

4.登录DLI管理控制台

5.创建队列

6.创建增强型跨源连接

7.创建跨源认证

8.配置安全组规则和测试地址连通性

9.创建FlinkSQL作业

样例场景需要创建一个FlinkSQL作业,并且该作业有一个输入流和一个输出流。输入流用于从DIS读取数据,输出流用于将数据写入到Kafka中。

登录云

使用DLI服务,首先要登录云。

  1. 打开产品首页。
  2. 在登录页面输入“帐号名”和“密码”,单击“登录”。

准备数据源和数据输出通道

DLI Flink作业支持其他服务作为数据源和数据输出通道,具体内容请参见《数据湖探索用户指南》>《Flink作业管理》>《准备数据》。

本样例中,假设作业名称为“JobSample”,采用DIS服务作为数据源,开通数据接入服务(DIS),具体操作请参见《数据接入服务用户指南》中的“开通DIS通道”章节。采用分布式消息服务Kafka作为数据输出通道,创建Kafka专享版实例,具体操作请参见《分布式消息服务Kafka用户指南》中的“购买实例”章节。

1.创建用于作业输入流的DIS通道:

a. 登录DIS管理控制台。

b. 在管理控制台左上角选择区域和项目。

c. 单击“购买接入通道”配置相关参数。通道信息如下:

  • 区域:选择与DLI服务相同的区域
  • 通道名称:csinput
  • 通道类型:普通
  • 分区数量:1
  • 生命周期(小时):24
  • 源数据类型:BLOB
  • 自动扩缩容:关闭
  • 企业项目:default
  • 高级配置:暂不配置

d. 单击“立即购买”,进入“规格确认”页面。

e. 单击“提交”,完成通道接入。

2.创建用于作业输出流的Kafka专享版实例:

a. 在创建Kafka实例前您需要提前准备相关依赖资源,包括VPC、子网和安全组,并配置安全组。

  • 创建VPC和子网的操作指导请参考《虚拟私有云》帮助文档>创建虚拟私有云和子网,若需要在已有VPC上创建和使用新的子网,请参考《虚拟私有云》帮助文档>为虚拟私有云创建新的子网。
说明
  •  创建的VPC与使用的Kafka服务应在相同的区域。

  •  创建VPC和子网时,如无特殊需求,配置参数使用默认配置即可。

  • 创建安全组的操作指导请参考《[虚拟私有云](https://www.ctyun.cn/document/10026755)》帮助文档>创建安全组,为安全组添加规则的操作指导请参考《[虚拟私有云](https://www.ctyun.cn/document/10026755)》帮助文档>添加安全组规则。更多信息请参考《[分布式消息服务Kafka用户指南](https://www.ctyun.cn/document/10029624/10030981)》中的“准备实例依赖资源”章节。


b. 登录分布式消息服务Kafka管理控制台。

c. 在管理控制台左上角选择区域。

d. 在“Kafka专享版”页面,单击右上角“购买Kafka实例”配置相关参数。实例信息如下:

  • 区域:选择与DLI服务相同的区域
  • 项目:默认
  • 可用区:默认
  • 实例名称:kafka-dliflink
  • 企业项目:default
  • 版本:默认
  • CPU架构:默认
  • 规格:选择对应的规格
  • 代理个数:默认
  • 存储空间:默认
  • 容量阈值策略:默认
  • 虚拟私有云,子网:选择a中创建的虚拟私有云和子网。
  • 安全组:选择a中创建的安全组。
  • Manager用户名:dliflink(用于登录实例管理页面)
  • 密码:****(请妥善管理密码,系统无法获取您设置的密码内容)
  • 确认密码:****
  • 更多配置:开启参数“Kafka SASL_SSL”,根据界面提示配置SSL认证的用户名和密码。其他参数可暂不配置。

e. 单击“立即购买”,弹出“规格确认”页面。

f.单击“提交”,完成实例创建。

g. 在分布式消息服务Kafka管理,单击“Kafka专享版”,单击已创建的Kafka实例名称,例如kafka-dliflink,进入实例详情页面。

h. 在“基本信息 > 高级配置 > SSL 证书”所在行,单击下载按钮。下载压缩包到本地并解压,获取压缩包中的客户端证书文件:client.truststore.jks,给后续步骤做准备。

创建OBS桶保存输出数据

在本样例中,需要为作业“JobSample”开通对象存储服务(OBS),为DLI Flink作业提供Checkpoint、保存作业日志和调试测试数据的存储功能。

具体操作请参见《对象存储服务控制台指南》中的“创建桶”章节。

  1. 在OBS管理控制台左侧导航栏选择“对象存储”。
  2. 在页面右上角单击“创建桶”,配置桶参数。
  • 区域:选择与DLI服务相同的区域
  • 桶名称:具体根据实际情况选择桶名,例如当前选择:smoke-test
  • 存储类别:标准存储
  • 桶策略:私有
  • 默认加密:关闭
  • 归档数据直读:关闭
  • 企业项目:default
  • 标签:不填写
  1. 单击“立即创建”。

登录DLI管理控制台

  1. 在列表中,选择“数据湖探索 DLI”。
  2. 进入DLI管理控制台页面。第一次进入数据湖探索管理控制台需要进行授权,以获取访问OBS的权限。

创建队列

创建DLI FlinkSQL作业,不能使用系统已有的default队列,需要您创建队列,例如创建名为“Flinktest”的队列。创建队列详细介绍请参考《数据湖探索用户指南》>《创建队列》。

  1. 在DLI管理控制台总览页,单击右上角“购买队列”进入购买队列页面。
  2. 配置参数。
  • 队列名称:Flinktest
  • 队列类型:通用队列。勾选“专属资源模式”。
  • 队列规格:16CUs企业项目:default
  • 描述:不填
  • 高级选项:自定义配置
  • 网段:配置的网段不能与Kafka的子网网段冲突
  1. 单击“立即购买”,确认配置。
  2. 配置确认无误,提交请求。

创建增强型跨源连接

创建DLI Flink作业,还需要创建增强型跨源连接。具体操作请参考《数据湖探索用户指南》>《跨源连接》>《增强型跨源连接》。

说明
  • 绑定跨源的DLI队列网段和数据源网段不能重合。

  • 系统default队列不支持创建跨源连接。

  • 访问跨源表需要使用已经创建跨源连接的队列。


  1. 在DLI管理控制台左侧导航栏中,选择“跨源连接”。
  2. 选择“增强型跨源”页签,单击左上角的“创建”按钮。配置参数:
  • 连接名称:diskafka
  • 绑定队列:Flinktest
  • 虚拟私有云:vpc-dli
  • 子网:dli-subnet

说明

创建跨源连接的虚拟私有云和子网需要和Kafka实例保持一致。

  1. 单击“确定”,完成创建增强型跨源连接。
  2. 在“增强型跨源”页签,单击创建的连接名称:diskafka,查看对等连接ID及连接状态,连接状态为“已激活”表示连接成功。

创建跨源认证

创建跨源认证的具体操作请参考《数据湖探索用户指南》>《跨源连接》>《跨源认证》。

  1. 将步骤2:准备数据源和数据输出通道中获取的kafka认证文件“client.truststore.jks”上传到步骤3:创建OBS桶保存输出数据中的OBS桶“smoke-test”下。
  2. 在DLI管理控制台选择“跨源连接”。
  3. 在“跨源认证”页签,单击“创建”,创建认证信息。配置参数:
  • 认证信息名称:Flink
  • 类型:Kafka_SSL
  • Truststore路径:obs://smoke-test/client.truststore.jks
  • Truststore密码:dms@kafka

其余参数可不用配置。

  1. 单击“确定”,完成创建跨源认证。

配置安全组规则和测试地址连通性

  1. DLI管理控制台,单击“队列管理”,选择绑定的队列,点开队列左边的箭头,查看队列详情,获取队列的网段信息。
  2. 登录分布式消息服务Kafka管理控制台,单击“Kafka专享版”,单击已创建的Kafka实例名称,例如kafka-dliflink,进入实例基本信息页面。
  3. 在实例基本信息页面,在“连接地址”配置下的获取Kafka的连接地址和端口。
  4. 在实例基本信息页面,在“网络”配置下的“安全组”,单击安全组名称,进入安全组配置页面。
  5. 在Kafka实例对应的安全组配置页面,单击“入方向规则 > 添加规则”,协议选择“TCP”,端口选择“9093”,源地址填写DLI队列的网段。单击“确定”完成配置。
  6. 登录DLI管理控制台,选择“队列管理”,在所在Flink队列行,单击“更多 >测试地址连通性”,在“地址”参数中按照“ IP: 端口 ”的格式输入Kafka的连接地址和端口,单击“测试”,返回地址可达后进行后续操作步骤。注意多个地址要分开单独测试。

创建Flink SQL作业

准备好数据源和数据输出通道之后,就可以创建Flink SQL作业了。

  1. 在DLI管理控制台的左侧导航栏中,单击“作业管理”>“Flink作业”,进入“Flink作业”页面。
  2. 在“Flink作业”页面右上角单击“创建作业”,弹出“创建作业”对话框。配置参数:
  • 类型:Flink SQL
  • 名称:DIS-Flink-Kafka
  • 描述:不填
  • 模板名称:不选择
  1. 单击“确定”,进入作业“编辑”页面。
  2. 编辑SQL作业。

在SQL语句编辑区域,输入详细的SQL语句。具体如下,注意以下加粗的参数值都需要根据注释提示修改。

CREATE SOURCE STREAM car_info ( 
 a1 string, 
 a2 string, 
 a3 string, 
 a4 INT 
) 
WITH ( 
 type = "dis",
 region = "",//需要修改为当前 DLI 队列所在的 region
 channel = "csinput", 
 encode = "csv", 
 FIELD_DELIMITER = ";" 
); 
CREATE SINK STREAM kafka_sink ( 
 a1 string, 
 a2 string, 
 a3 string, 
 a4 INT 
2022-05-09 32
) // 输出字段
WITH ( 
 type="kafka", 
 kafka_bootstrap_servers = "192.x.x.x:9093, 192.x.x.x:9093, 
192.x.x.x:9093",//需要修改为 kafka 实例的连接地址
 kafka_topic = "testflink", // 要写入 kafka 的 topic,进入 kafka 控制台,单击已创建的
Kafka 实例名称,在 Topic 管理查看 Topic 名称
 encode = "csv", // 编码格式,支持 json/csv 
 kafka_certificate_name = "Flink", 
 kafka_properties_delimiter = ",", 
 //kafka_properties 中的 username 和 password 的值 xxx 需要替换为步骤 2 中 kafka 创建 SSL
认证的用户名和密码
 kafka_properties = 
"sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username=\"xxx\" 
password=\"xxx\";,sasl.mechanism=PLAIN,security.protocol=SASL_SSL" 
); 
INSERT INTO kafka_sink 
SELECT * FROM car_info; 
CREATE sink STREAM car_info1 ( 
 a1 string, 
 a2 string, 
 a3 string, 
 a4 INT 
) 
WITH ( 
 type = "dis", 
 region = "",//需要修改为当前 DLI 队列所在的 region
 channel = "csinput", 
 encode = "csv", 
 FIELD_DELIMITER = ";" 
); 
insert into car_info1 select 'id','owner','brand',1; 
insert into car_info1 select 'id','owner','brand',2; 
insert into car_info1 select 'id','owner','brand',3; 
insert into car_info1 select 'id','owner','brand',4; 
insert into car_info1 select 'id','owner','brand',5; 
insert into car_info1 select 'id','owner','brand',6; 
insert into car_info1 select 'id','owner','brand',7; 
insert into car_info1 select 'id','owner','brand',8; 
insert into car_info1 select 'id','owner','brand',9; 
insert into car_info1 select 'id','owner','brand',10;复制
  1. 单击“语义校验”,确保语义校验成功。
  2. 设置作业运行参数。配置必选参数:
  • 所属队列:Flinktest
  • CU数量:2
  • 管理单元:1
  • 并行数:1
  • 保存作业日志:勾选
  • OBS桶:选择作业日志保存的OBS桶,并进行授权。

其余参数可不用配置。

  1. 单击“保存”,保存作业和相关参数。
  2. 单击“启动”,进入“启动Flink作业”页面,确认作业规格和费用后,单击“立即启动”,启动作业。

启动作业后,系统将自动跳转到Flink作业管理页面,新创建的作业将显示在作业列表中,在“状态”列中可以查看作业状态。作业提交成功后,状态将由“提交中”变为“运行中”。

如果作业状态为“提交失败”或“运行异常”,表示作业提交或运行失败。用户可以在作业列表中的“状态”列中,将鼠标移动到状态图标上查看错误信息,单击可以复制错误信息。根据错误信息解决故障后,重新提交。

  1. 作业运行完成后,可登录分布式消息服务Kafka管理控制台,查看对应的Kafka专享实例。单击实例名称,选择“消息查询”页签,选择Flink SQL作业中写入的kafka的Topic名称,单击“搜索”,在操作列单击“查看消息正文”查看写入的消息内容。
相似文档
  • 本章节主要介绍数据湖探索(DLI)的权限管理。 DLI服务不仅在服务本身有一套完善的权限控制机制,同时还支持通过统一身份认证服务(Identity and Access Management,简称IAM)细粒度鉴权,可以通过在IAM创建策略来管理DLI的权限控制。
  • 本章节主要介绍数据湖探索(DLI)如何创建用户并授权使用。 本章节通过简单的用户组授权方法,将DLI服务的策略授予用户组,并将用户添加至用户组中,从而使用户拥有对应的DLI权限。操作流程如下图所示。
  • 本章节主要介绍数据湖探索(DLI)如何创建自定义策略。 如果系统预置的DLI权限,不满足您的授权要求,可以创建自定义策略。 目前云支持以下两种方式创建自定义策略: 可视化视图创建自定义策略:无需了解策略语法,按可视化视图导航栏选择云服务、操作、资源、条件等策略内容,可自动生成策略。
  • 本章节主要介绍数据湖探索(DLI)如何选择特定资源。 资源是服务中存在的对象。在DLI中,资源如下,您可以在创建自定义策略时,通过指定资源路径来选择特定资源。
  • 本章节主要介绍数据湖探索(DLI)如何添加“请求条件”(Condition元素)来控制策略何时生效。 您可以在创建自定义策略时,通过添加“请求条件”(Condition元素)来控制策略何时生效。请求条件包括条件键和运算符,条件键表示策略语句的 Condition 元素,分为全局级条件键和服务级条件键。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部