当前位置: 首页>后端>正文

大数据定时调用azkaban

目录

  • 本地调试
    • 本地启动SingleServer模式
    • 本地启动分开配置模式
  • 定时任务调度使用场景
    • 场景
    • 使用
  • 总架构图
    • 原始架构图
    • 处理后架构图
  • 源码
    • web server
    • exec server
  • elasticjob-lite对比azkaban
  • 参考文章

本地调试

  • 注意本文是win10系统方案
  • github地址azkaban

本地启动SingleServer模式

  • azkaban有几种启动模式,一种是web server和exec server分开部署的一般用于线上,一种是SingleServer模式部署,一般用于调试,数据库用的H2
  1. gradle build -x test
  2. AzkabanSingleServer里面设置工作目录, 就是下载的idea目录


    大数据定时调用azkaban,第1张
    idea edit.png
  3. 在源码目录下新建个conf目录,放置azkaban conf文件,可以复制***\azkaban-web-server\src\main\resources\conf\这里面conf文件到新建的conf目录里,log配置可以改成
log4j.rootLogger=INFO,C
log4j.appender.C=org.apache.log4j.ConsoleAppender
log4j.appender.C.Target=System.err
log4j.appender.C.layout=org.apache.log4j.PatternLayout
log4j.appender.C.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
大数据定时调用azkaban,第2张
conf目录.png
  1. conf/azkaban.properties里面的数据库连接记得修改成你自己的mysql数据库连接
  2. azkaban.properties文件里面 记得配置下web路径
web.resource.dir=D:\idea_repo\azkaban-master\azkaban-web-server\build\install\azkaban-web-server\web

本地启动分开配置模式

  • 先按照本地启动SingleServer模式来一波,然后数据库执行doc底下那个all
  • AzkabanExecutorServer配置
-conf D:\idea_repo\azkaban-master\conf
-Dlog4j.configuration=file:D:\idea_repo\azkaban-master\conf\log4j.properties -Dserverpath=D:\idea_repo\azkaban-master\azkaban-exec-server\build\install\azkaban-exec-server
大数据定时调用azkaban,第3张
AzkabanExecutorServer.png
  • AzkabanWebServer配置
-conf D:\idea_repo\azkaban-master\conf
-Dlog4j.configuration=file:D:\idea_repo\azkaban-master\conf\log4j.properties -Dserverpath=file:D:\idea_repo\azkaban-master\azkaban-web-server\build\install\azkaban-web-server
大数据定时调用azkaban,第4张
AzkabanWebServer.png
  • 第一次执行失败,要创建目录,再执行就好了

定时任务调度使用场景

场景

  • Azkaban开源的工作流管理器,轻量级调度框架, 支持执行Shell, python等脚本任务,python脚本做数据清洗,比java传统的crud, dao映射更快捷,python第三方依赖包更强大。一些有依赖的任务,Azkaban支持的很好
  • 前面分析过elastic-job系列分布式定时任务elastic-job(一), ElasticJob是分布式无作业中心化调度,支持分片,失败重试,错过重新支持,高可用分片执行,无单点问题,适合C端B端复杂业务需要定时执行,保障性能和可用性的任务

使用

  • 使用UI界面创建项目,上传项目


    大数据定时调用azkaban,第5张
    UI界面之创建项目.png

    大数据定时调用azkaban,第6张
    上传项目.png
  • 项目结构打成zip包然后上传


    大数据定时调用azkaban,第7张
    test项目.png
  • test.job, 里面***要替换成自己用户名,这个是python安装地址
type=command
command=C:\Users\***\AppData\Local\Programs\Python\Python311\python.exe test.py
  • test.py, win10 Python安装自行百度
if __name__ == '__main__':
   print("test12138")

  • 点执行


    大数据定时调用azkaban,第8张
    执行.png
  • 立即执行或者定时执行


    大数据定时调用azkaban,第9张
    立即执行或者定时执行.png

总架构图

原始架构图

  • 可以看到ui界面点执行或者定时执行是到web server服务这边,web server服务是单点的
  • 大部分线上场景是定时执行,azkaban定时任务默认是自己起线程扫描,当然也可以配置quartz(功能更强大些,有错过重新执行等,但开启之后还要配置一些东西)。但这两者在web server分布式环境都会有重复执行任务的问题。所以azkaban这边设置成单点的
  • web server选择exec server具体执行任务时,如果处于web server分布式环境也需要做分布式一致性处理
  • 当初设计时,由于azkaban的任务不会太重,web server也相对轻量,所以,web server设计成单点的,本身azkaban使用场景也不是那种重业务的C端场景,可以作为数据清洗,数据分析之类的
  • exec server是分布式的集群部署,web server通过一定的机制比如轮训等选择exec server, 主要是从数据库获取exec server状态,当然这边执行任务时候也可以指定选择某台exec server


    大数据定时调用azkaban,第10张
    原始架构图.png

处理后架构图

  • 这边做个拓展,并不是说业务一定要二开进行改造,只是讨论说可以进去分布式处理的方案
  • web server分布式集群部署,需要保持集群状态决定谁是主避免定时任务重复执行,可以使用zk决定谁是主,如果集群机器下线,zk listen可以动态感知变化重新选主,这个在elasticjob-lite文章有分析过,zk脑裂问题也可以解决
  • 在决定出谁是主之后,定时任务执行时判断不是主则不执行,是主则执行分配到exec server。为避免故障发生,建议这里的任务做成幂等的合适些


    大数据定时调用azkaban,第11张
    处理后架构图.png

源码

web server

  • web server之默认定时调度是自己实现的起线程,一个项目一个项目轮训,如果符合core条件则执行,这就是为什么我们定的某个时间点执行,但是好像都会稍微晚一点点时间。如果开启quartz,那quartz定时和默认轮训会并存,代码使用sync锁加数据库状态避免重复
  • 默认使用轮训线程轮训,启动时启动TriggerManager.TriggerScannerThread
  • TriggerManager.TriggerScannerThread不断执行run方法,run方法中checkAllTriggers方法for循环所有project,一个个判断是否满足corn条件,满足执行条件则去ExecuteFlowAction.doAction
  • 再去ExecutorManagerAdapter.submitExecutableFlow, 这边适配器模式把几个功能做适配
  • 具体执行ExecutorManager.submitExecutableFlow放入队列,这边是做一个轻量的生产者消费者模式,如果宕机了就会丢失,这次任务执行不了


    大数据定时调用azkaban,第12张
    web server之默认定时调度.png
  • 启动时启动了消费者线程ExecutorManager.QueueProcessorThread
  • ExecutorManager.QueueProcessorThread中run轻量级的生产消费模式,wait避免浪费空转,notifyAll是在dispatch到exec server后,线程select一个合适之后,http远程调用exec server


    大数据定时调用azkaban,第13张
    web server之默认定时调1.png

exec server

  • 通过实现servlet提供http服务,servlet到FlowRunnerManager.submitFlowRunner提交到线程池
  • FlowRunner.run线程池执行任务,观察者模式发送开始事件,执行任务封装JobRunner,提交到全局线程池,所以如果依赖任务多可以考虑设置大一点线程数。然后如果有失败的任务会重新执行,最后会关闭这个局部线程池。两个线程池目的是解耦


    大数据定时调用azkaban,第14张
    exec server.png
  • 全局线程池JobRunner执行到AbstractProcessJob根据你配置的common选择实现类python是ProcessJob。执行脚本,多个依赖脚本for循环本exec server执行


    大数据定时调用azkaban,第15张
    exec server 1.png

elasticjob-lite对比azkaban

  • elastic-job系列分析分布式定时任务elastic-job(一)。这边elasticjob-lite就是使用在分布式场景
  • App1就是项目的应用,elastic-job就是通过jar引入,依赖zookeeper,分布式部署的,具体细节可以看析分布式定时任务elastic-job(一)
    大数据定时调用azkaban,第16张
    对比.png

参考文章

  • 大数据之工作流调度器Azkaban
  • 大数据Azkaban快速入门
  • 大数据调度平台 azkaban windows IDEA/eclipse debug 环境搭建
  • Azkaban源码分析(1)——Executor选择
  • Azkaban 源码分析之作业执行篇
  • Azkaban工作流调度器(二)执行python工作流
  • Azkaban 单个Flow 任务执行流程 源码解读
  • Azkaban 内置和 Quartz 任务调度核心源码分析
  • azkaban web-server单点问题

https://www.xamrdz.com/backend/3uw1916436.html

相关文章: