数据工厂自动触发任务的执行

批量作业管理的原型可以理解为Linux上的crontab,按照配置的调度规则自动触发任务的执行。在pingo中扩展了作业间的DAG依赖执行,重试机制,任务以及执行状态管理,以及自定义的任务扩展能力。

概念介绍

作业:执行的最小单元,如上图中的Spark、依赖检查等都是具体的一种类型的作业。

作业组:作业的集合,调度的最小单元。比如以这个cron格式"0 * * * *"调度,就意味着每小时执行一次作业组。

作业组数据工厂:一个作业组执行后的镜像。比如2018.4.17凌晨一点执行后的镜像,数据工厂中保存了作业组中每个作业的状态(成功、失败等),以及执行日志。

基准时间:还是以trade_table表为例,需要有一个任务每小时运行一次,计算这一个小时内的交易量,那么每个任务都要关联一个具体时间(比如20180417-13)的数据分片,这个时间就是基准时间,在作业中可以通过环境变量BASETIME访问到。

基准时间差:这个主要用来解决例行数据延迟的问题。还是以某电商的trade_table为例,假如每小时的数据都要延迟2小时就绪,也就是说6点的数据要等到8点才能就绪,那每个任务处理的数据都是两小时以前的。

新建作业组

由于没有作业的作业组是没有意义的,所以不可以建立一个空的作业组,因此必须先新建一个作业才可以新建一个作业组。选择任意一个类型的作业,填入有效的名称和各项参数,即可新建一个作业,该作业的名字就会同时默认成为作业组的名字。

编辑作业组

点击"作业组列表"页面中的"编辑"按钮即可进入编辑作业组页面,如下图。点击"添加依赖作业"按钮可以添加一个新的作业到该作业组,拖动作业方框中的实心方块,可以建立作业间的依赖关系,这样在依赖的作业完成之前,后续的作业是不会启动的。双击作业方框可以进入作业的编辑页面。点击更改调度即可进入概念介绍示例的调度规则编辑页面。

Spark作业

在"创建新作业"中选择"Spark",即可创建spark类型的作业,Spark Plugin在批量作业中提供了Spark例行作业的入口,允许用户编辑Spark例行作业并设置调度例行运行。

下面针对各个部分进行详细介绍。

提交模式

Spark Plugin提供了源代码和包两种提交模式

  • 源代码模式对应于Spark-Client的交互式运行模式(spark-sql、spark-shell、pyspark),可以运行SQL/Scala/Python语言的Spark代码,无需编译直接提交运行
  • 包模式对应于Spark-Client的文件提供模式(spark-submit),支持Python/Scala/Java的文件提交。两种提交模式均提供了cluster运行模式,可以直接将任务运行到集群中

注意: Spark Plugin不支持client或者local运行模式

源代码提交模式

源代码提交模式核心部分由Spark语句输入框和语言类型组成,用户需要先选择语言类型,然后在Spark语句输入框中输入对应语言的Spark代码。由于源代码提交模式是一种交互类运行模式,因此不同语言类型有一些特殊的书写规范,下面将进行一一列举。

SQL

  • SQL语句可分行写,但最后要加";",当查询语句中存在";"时要进行转义

select *from hive_tablewhere select_name="sql;";

  • SQL语句允许添加注释,SQL语句的注释是--,目前不支持一行中同时存在注释和代码

select *-- hive_table是一个hive表from hive_tablewhere select_name="sql;";

  • SQL语句中支持使用时间宏。SQL中可用{}包裹时间宏,程序会自动替换相应的日期。具体时间宏的种类请参考后面详细介绍

-- 读取hive_table某一天的数据select *from hive_tablewhere event_day="{DATE}";

Python

  • 输入框模式将自动初始化SparkSession,因此请勿重复初始化SparkContext。下面变量将默认提供:spark、sc、sqlContext,其中spark对应于SparkSession,sc对应于SparkContext,sqlContext对应于SQLContext。
  • 输入框无法执行Python文件头等相关内容,一旦输入将导致运行失败。下面代码是无法执行的,请勿添加到输入框

下面代码写入输入框会出错,请勿添加到输入框!/bin/python -*- coding: utf-8 -*-

  • 输入框是自上而下顺序执行的,因此无需添加main函数,即使添加了也不会运行。
  • 输入框中请勿添加或修改sys,添加或修改容易导致集群中Python加载异常。Spark原生是utf8编码,无需额外设置

下面代码写入输入框会出错,请勿添加到输入框reload(sys)sys.setdefaultencoding('utf8')

  • 如果需要在Spark运行过程中添加打印日志,用print或者logging(配置stdout输出情况下)均会在运行结束后打印。需要用如下方法打印日志,日志将打印到Spark运行日志的stderr中。

def getLogger(): tag="application-user-log" log4j = spark._jvm.org.apache.log4j return log4j.LogManager.getLogger(tag)logger = getLogger()logger.info("test1")

  • Python语言类型中也支持时间宏,除了像SQL一样支持时间宏替换外,还支持通过env获取时间宏。具体时间宏的种类,请参考后面的时间宏部分

下面两个等价event_day="{DATE}"import osevent_day=os.environ["DATE"]

Scala

  • 输入框模式将自动初始化SparkSession,因此请勿重复初始化SparkContext。下面变量将默认提供:spark、sc、sqlContext,其中spark对应于SparkSession,sc对应于SparkContext,sqlContext对应于SQLContext。
  • 输入框是自上而下顺序执行的,因此无需添加main函数,即使添加了也不会运行。
  • 如果需要在Spark运行过程中添加打印日志,用print会在运行结束后打印。需要用如下方法打印日志,日志将打印到Spark运行日志的stderr中。

    import org.apache.log4j.LogManagerlogger = LogManager.getLogger("application-user-log")logger.info("test1")

  • Scala语言类型中也支持时间宏,除了像SQL一样支持时间宏替换外,还支持通过env获取时间宏。具体时间宏的种类,请参考后面的时间宏部分

    下面两个等价event_day="{DATE}"event_day=sys.env("DATE")

包提交模式

包运行模式下,允许用户提交jar包或者py文件运行,其中jar包支持Scala和Java两种语言。详细样例case请参考“快速入门”的“运行代码包”部分

如何提交

  • 运行包

    • 用户需要将运行的jar或者是py文件上传到集群目录中
    • 运行包位置填写完成集群路径
  • 主类名

    • 主类名是专门为jar包提供的,需要提供运行的主类
    • py文件仅一个,会自动使用内部声明的主函数
  • 运行参数

    • 允许用户添加运行参数,每行输入一个运行参数,右侧框添加参数解释

注意事项

  • 数据工厂默认是用了2.3版本的Spark,因此使用API或添加依赖时请注意Spark版本是2.3.3版本
  • Jar包内请勿打包Spark依赖,相关依赖请注明provided
  • 关于时间宏,运行包模式仅支持从环境变量中读取。具体时间宏的种类,请参考后面的时间宏部分

    Python: import os event_day=os.environ["DATE"]Java: String event_day=System.getenv().get("DATE")Scala: val event_day=sys.env["DATE"]

高级选项

高级选项提供了对Spark作业进一步使用和优化的能力,其中依赖文件帮助用户注入运行依赖,配置信息部分帮助用户添加配置,运行状态轮询时间(默认请勿修改)调整获取Spark状态间隔

依赖文件

依赖文件中提供了jars、pyFiles、files、archives4中类型的依赖,所有的依赖类型都需要填写依赖文件完整的集群路径。下面是这几个依赖类型的区别,请根据具体需求使用。

依赖类型依赖文件类型说明jars.jar通过jars依赖的文件将部署到driver和executor运行目录中,并自动添加到Java的CLASSPATH中,需要引用时,直接import即可pyFiles.py/.egg/.zip通过pyFiles依赖的文件将部署到driver和executor运行目录中,并自动添加到Python的LD_LIBRARY_PATH中,需要引用时,直接from … import即可files任何类型通过files依赖的文件将部署到driver和executor运行目录中,需要读取时,直接从当前目录读取即可archives.tar.gz/.zip等通过archives依赖的文件将部署到driver和executor运行目录中,并自动解压,解压后的目录名默认为压缩包名,可通过#定义别名。例如java.tar.gz#java解压后目录名为java

配置信息

允许用户指定Spark配置,类似spark-defaults.conf功能,使用每行第一个空格分隔key和value,可以使用/#添加注释。配置信息中也支持时间宏替换。具体时间宏的种类,请参考后面的时间宏部分

修改executor内存和task并行spark.executor.cores 2spark.executor.memory 4g 定义APP 名字spark.app.name spark_test-{DATE}
Spark作业时间宏

Spark Plugin默认提供的时间宏如下:(宏替换指的是{DATE}替换方式;环境变量获取指的是从环境变量获取DATE的值)

关键字对应时间使用位置基准时间20190501 12:30:28BASETIME20190501 12:30:28宏替换/环境变量获取DATE20190501宏替换/环境变量获取YEAR2019宏替换/环境变量获取MONTH05宏替换/环境变量获取DAY01宏替换/环境变量获取HOUR12宏替换/环境变量获取MINUTE30宏替换/环境变量获取SECOND28宏替换/环境变量获取YESTERDAY20190430宏替换/环境变量获取FORWARD_7_DAY20130424宏替换FORWARD_30_DAY20190401宏替换FORWARD_90_DAY20190131宏替换FORWARD_365_DAY20180501宏替换FORWARD_WEEK_BEGIN20190429宏替换FORWARD_WEEK_END20190429宏替换FORWARD_MONTH_BEGIN20190401宏替换FORWARD_MONTH_END20190430宏替换FORWARD_2_MONTH_BEGIN20190301宏替换FORWARD_2_MONTH_END20190331宏替换
依赖检查作业

还是以trade_table为例,如果实际数据延迟两小时到达,那么真正的到达时间还是不可能精确到整点,那我们在10点整启动一个任务处理8点的数据,可能由于数据是10:01到达而导致任务失败或者结果非预期。这个依赖检查作业就是用来检验数据是否真的到达的,参考编辑作业组,让真正处理数据的Spark任务在这个依赖检查作业后续执行,即可保证正确的处理逻辑。下图是配置一个依赖两小时前小时级数据的依赖检查作业。

标签