上云无忧 > 文档中心 > 百度智能云数据工厂Pingo使用教程 - 批量作业
数据工厂Pingo
百度智能云数据工厂Pingo使用教程 - 批量作业

文档简介:
批量作业管理的原型可以理解为Linux上的crontab,按照配置的调度规则自动触发任务的执行。在pingo中扩展了作业间的DAG依赖执行,重试机制,任务以及执行状态管理,以及自定义的任务扩展能力。
*此产品及展示信息均由百度智能云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

批量作业管理的原型可以理解为Linux上的crontab,按照配置的调度规则自动触发任务的执行。在pingo中扩展了作业间的DAG依赖执行,重试机制,任务以及执行状态管理,以及自定义的任务扩展能力。

概念介绍

作业:执行的最小单元,如上图中的Spark、依赖检查等都是具体的一种类型的作业。

作业组:作业的集合,调度的最小单元。比如以这个cron格式"0 * * * *"调度,就意味着每小时执行一次作业组。

作业组实例:一个作业组执行后的镜像。比如2018.4.17凌晨一点执行后的镜像,实例中保存了作业组中每个作业的状态(成功、失败等),以及执行日志。

基准时间:还是以trade_table表为例,需要有一个任务每小时运行一次,计算这一个小时内的交易量,那么每个任务都要关联一个具体时间(比如20180417-13)的数据分片,这个时间就是基准时间,在作业中可以通过环境变量BASETIME访问到。

基准时间差:这个主要用来解决例行数据延迟的问题。还是以某电商的trade_table为例,假如每小时的数据都要延迟2小时就绪,也就是说6点的数据要等到8点才能就绪,那每个任务处理的数据都是两小时以前的。

新建作业组

由于没有作业的作业组是没有意义的,所以不可以建立一个空的作业组,因此必须先新建一个作业才可以新建一个作业组。选择任意一个类型的作业,填入有效的名称和各项参数,即可新建一个作业,该作业的名字就会同时默认成为作业组的名字。

编辑作业组

点击"作业组列表"页面中的"编辑"按钮即可进入编辑作业组页面,如下图。点击"添加依赖作业"按钮可以添加一个新的作业到该作业组,拖动作业方框中的实心方块,可以建立作业间的依赖关系,这样在依赖的作业完成之前,后续的作业是不会启动的。双击作业方框可以进入作业的编辑页面。点击更改调度即可进入概念介绍示例的调度规则编辑页面。

Spark作业

在"创建新作业"中选择"Spark",即可创建spark类型的作业,Spark Plugin在批量作业中提供了Spark例行作业的入口,允许用户编辑Spark例行作业并设置调度例行运行。

下面针对各个部分进行详细介绍。

提交模式

Spark Plugin提供了源代码和包两种提交模式

  • 源代码模式对应于Spark-Client的交互式运行模式(spark-sql、spark-shell、pyspark),可以运行SQL/Scala/Python语言的Spark代码,无需编译直接提交运行
  • 包模式对应于Spark-Client的文件提供模式(spark-submit),支持Python/Scala/Java的文件提交。两种提交模式均提供了cluster运行模式,可以直接将任务运行到集群中

注意: Spark Plugin不支持client或者local运行模式

源代码提交模式

源代码提交模式核心部分由Spark语句输入框和语言类型组成,用户需要先选择语言类型,然后在Spark语句输入框中输入对应语言的Spark代码。 由于源代码提交模式是一种交互类运行模式,因此不同语言类型有一些特殊的书写规范,下面将进行一一列举。

SQL

  • SQL语句可分行写,但最后要加";",当查询语句中存在";"时要进行转义
select *
from hive_table
where select_name="sql\;";
  • SQL语句允许添加注释,SQL语句的注释是--,目前不支持一行中同时存在注释和代码
select *
-- hive_table是一个hive表
from hive_table
where select_name="sql\;";
  • SQL语句中支持使用时间宏。SQL中可用{}包裹时间宏,程序会自动替换相应的日期。具体时间宏的种类请参考后面详细介绍
-- 读取hive_table某一天的数据
select *
from hive_table
where event_day="{DATE}";

Python

  • 输入框模式将自动初始化SparkSession,因此请勿重复初始化SparkContext。下面变量将默认提供:spark、sc、sqlContext,其中spark对应于SparkSession,sc对应于SparkContext,sqlContext对应于SQLContext。
  • 输入框无法执行Python文件头等相关内容,一旦输入将导致运行失败。下面代码是无法执行的,请勿添加到输入框
下面代码写入输入框会出错,请勿添加到输入框
!/bin/python
 -*- coding: utf-8 -*-
  • 输入框是自上而下顺序执行的,因此无需添加main函数,即使添加了也不会运行。
  • 输入框中请勿添加或修改sys,添加或修改容易导致集群中Python加载异常。Spark原生是utf8编码,无需额外设置
下面代码写入输入框会出错,请勿添加到输入框
reload(sys)
sys.setdefaultencoding('utf8')
  • 如果需要在Spark运行过程中添加打印日志,用print或者logging(配置stdout输出情况下)均会在运行结束后打印。需要用如下方法打印日志,日志将打印到Spark运行日志的stderr中。
def getLogger():
    tag="application-user-log"
    log4j = spark._jvm.org.apache.log4j
    return log4j.LogManager.getLogger(tag)
logger = getLogger()
logger.info("test1")
  • Python语言类型中也支持时间宏,除了像SQL一样支持时间宏替换外,还支持通过env获取时间宏。具体时间宏的种类,请参考后面的时间宏部分
下面两个等价
event_day="{DATE}"
import os
event_day=os.environ["DATE"]

Scala

  • 输入框模式将自动初始化SparkSession,因此请勿重复初始化SparkContext。下面变量将默认提供:spark、sc、sqlContext,其中spark对应于SparkSession,sc对应于SparkContext,sqlContext对应于SQLContext。
  • 输入框是自上而下顺序执行的,因此无需添加main函数,即使添加了也不会运行。
  • 如果需要在Spark运行过程中添加打印日志,用print会在运行结束后打印。需要用如下方法打印日志,日志将打印到Spark运行日志的stderr中。

    import org.apache.log4j.LogManager
    logger = LogManager.getLogger("application-user-log")
    logger.info("test1")
  • Scala语言类型中也支持时间宏,除了像SQL一样支持时间宏替换外,还支持通过env获取时间宏。具体时间宏的种类,请参考后面的时间宏部分

    下面两个等价
    event_day="{DATE}"
    event_day=sys.env("DATE")

包提交模式

包运行模式下,允许用户提交jar包或者py文件运行,其中jar包支持Scala和Java两种语言。 详细样例case请参考“快速入门”的“运行代码包”部分

如何提交

  • 运行包

    • 用户需要将运行的jar或者是py文件上传到集群目录中
    • 运行包位置填写完成集群路径
  • 主类名

    • 主类名是专门为jar包提供的,需要提供运行的主类
    • py文件仅一个,会自动使用内部声明的主函数
  • 运行参数

    • 允许用户添加运行参数,每行输入一个运行参数,右侧框添加参数解释

注意事项

  • Pingo默认是用了2.3版本的Spark,因此使用API或添加依赖时请注意Spark版本是2.3.3版本
  • Jar包内请勿打包Spark依赖,相关依赖请注明provided
  • 关于时间宏,运行包模式仅支持从环境变量中读取。具体时间宏的种类,请参考后面的时间宏部分

    Python:
           import os
           event_day=os.environ["DATE"]
    Java:
           String event_day=System.getenv().get("DATE")
    Scala:
           val event_day=sys.env["DATE"]

高级选项

高级选项提供了对Spark作业进一步使用和优化的能力,其中依赖文件帮助用户注入运行依赖,配置信息部分帮助用户添加配置,运行状态轮询时间(默认请勿修改)调整获取Spark状态间隔

依赖文件

依赖文件中提供了jars、pyFiles、files、archives4中类型的依赖,所有的依赖类型都需要填写依赖文件完整的集群路径。下面是这几个依赖类型的区别,请根据具体需求使用。

依赖类型 依赖文件类型 说明
jars .jar 通过jars依赖的文件将部署到driver和executor运行目录中,并自动添加到Java的CLASSPATH中,需要引用时,直接import即可
pyFiles .py/.egg/.zip 通过pyFiles依赖的文件将部署到driver和executor运行目录中,并自动添加到Python的LD_LIBRARY_PATH中,需要引用时,直接from … import即可
files 任何类型 通过files依赖的文件将部署到driver和executor运行目录中,需要读取时,直接从当前目录读取即可
archives .tar.gz/.zip等 通过archives依赖的文件将部署到driver和executor运行目录中,并自动解压,解压后的目录名默认为压缩包名,可通过#定义别名。例如java.tar.gz#java解压后目录名为java

配置信息

允许用户指定Spark配置,类似spark-defaults.conf功能,使用每行第一个空格分隔key和value,可以使用/#添加注释。 配置信息中也支持时间宏替换。具体时间宏的种类,请参考后面的时间宏部分

修改executor内存和task并行
spark.executor.cores 2
spark.executor.memory 4g

 定义APP 名字
spark.app.name spark_test-{DATE}

Spark作业时间宏

Spark Plugin默认提供的时间宏如下:(宏替换指的是{DATE}替换方式;环境变量获取指的是从环境变量获取DATE的值)

关键字 对应时间 使用位置
基准时间 20190501 12:30:28
BASETIME 20190501 12:30:28 宏替换/环境变量获取
DATE 20190501 宏替换/环境变量获取
YEAR 2019 宏替换/环境变量获取
MONTH 05 宏替换/环境变量获取
DAY 01 宏替换/环境变量获取
HOUR 12 宏替换/环境变量获取
MINUTE 30 宏替换/环境变量获取
SECOND 28 宏替换/环境变量获取
YESTERDAY 20190430 宏替换/环境变量获取
FORWARD_7_DAY 20130424 宏替换
FORWARD_30_DAY 20190401 宏替换
FORWARD_90_DAY 20190131 宏替换
FORWARD_365_DAY 20180501 宏替换
FORWARD_WEEK_BEGIN 20190429 宏替换
FORWARD_WEEK_END 20190429 宏替换
FORWARD_MONTH_BEGIN 20190401 宏替换
FORWARD_MONTH_END 20190430 宏替换
FORWARD_2_MONTH_BEGIN 20190301 宏替换
FORWARD_2_MONTH_END 20190331 宏替换

依赖检查作业

还是以trade_table为例,如果实际数据延迟两小时到达,那么真正的到达时间还是不可能精确到整点,那我们在10点整启动一个任务处理8点的数据,可能由于数据是10:01到达而导致任务失败或者结果非预期。这个依赖检查作业就是用来检验数据是否真的到达的,参考编辑作业组,让真正处理数据的Spark任务在这个依赖检查作业后续执行,即可保证正确的处理逻辑。 下图是配置一个依赖两小时前小时级数据的依赖检查作业。

相似文档
  • 交互分析概述: Pingo提供Notebook式的交互分析环境,基于开源的jupyter项目,进行了一系列优化、适配。具备以下特性: 用户隔离:使用普通账号启动jupyter hub,然后通过一个设置了SUID位的程序将每位用户都映射到不同的uid,实现了Linux账号级别的安全控制,并且比原生的sudo-spawner资源隔离方案更加安全、可扩展。
  • 管理计算资源: 点击计算资源按钮进入计算资源管理页面。 创建队列组: 队列组是用户整理自己的队列计算资源的聚合单位,通常建议一个产品线维护一个队列组即可,组内用户可以共享队列的权限。
  • 概述: 在Pingo中,永久性UDF不使用SQL语句创建,而是使用文件系统来管理。此管理方式,相比于使用SQL的方式由如下几个显著的优点: UDF的文件与元数据统一存储,不会出现已经在元数据系统中注册了UDF,但错误删除了对应资源的情况。
  • 概述: 数据交换和数据联合查询是大数据分析中比较常见的使用场景,跨产品、跨部门、跨公司的数据联合查询可以弥补彼此数据使用的缺少。但是联合查询也引来一个很关键的问题:数据安全问题。以往的数据安全是通过账号控制(包括权限账号和权限授权等),但这都存在安全泄露的风险,且权限一旦授予后,用户就可以直接获得明文数据,能够对数据做任意拷贝。
  • 传输管理: 传输管理主要解决什么问题? 传输支持哪些数据源? 支持从数据库到数据库之间的同步吗? 传输定期调度需要哪些前提条件?支持以哪种字段来做增量?
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部