springboot项目集成大数据第三方dolphinscheduler调度器

举报
刘大猫 发表于 2025/07/18 21:31:51 2025/07/18
【摘要】 springboot项目集成大数据第三方dolphinscheduler调度器

image.png
@[TOC]

摘要

项目背景

使大数据平台具备更见健全及多种脚本模型等支持,需要搭载dolphinscheduler调度器,以DAG图的方式将Task按照任务的依赖关系关联起来,可实时可视化监控任务的运行状态,支持丰富的任务类型:Shell、MR、Spark、SQL(mysql、postgresql、hive、sparksql),Python,Sub_Process、Procedure等,支持工作流定时调度、依赖调度、手动调度、手动暂停/停止/恢复,同时支持失败重试/告警、- 从指定节点恢复失败、Kill任务等操作,支持任务日志在线查看及滚动、在线下载日志等。

功能要求

大数据平台可视化配置spark执行任务、datax同步任务 -> 通过接入dolphinscheduler调度器 管理、执行。

功能说明

(大数据平台页面可显示的)大数据平台接入dolphinscheduler功能如下:项目管理操作、spark任务功能管理操作、datax数据同步任务管理、执行、停止任务操作。
(大数据平台页面不可见的)工作流管理操作,包括工作流创建、更新、删除操作页面。工作流实例页面、任务实例页面。

1.1用例图

image.png

1.2业务流程分析

image.png

1.3业务ER图

dolphinscheduler中项目、工作流、节点任务ER图
image.png

1.4 管理任务流程图

image.png

1.5功能设计详细说明点

说明:
① “项目定义”:由大数据平台所有用户所共享

② “工作流定义”:每个用户单独创建一类工作流,spark任务工作流叫userId-dragSparkTask、数据同步任务工作流叫:userId-dataxTask

③ “创建或更新工作流时更新参数locations”得x,y坐标,
第一次创建工作流节点x=0,y=0,更新节点时获取坐标y最大且离远点最远节点判断临界条件(如果x<1500 -> x+80,如果x>=1500 -> y+80),确保两节点不重叠且在同一页面

④ 
==1)针对spark任务:==
“创建或更新工作流时更新参数processDefinitionJson”时,需传转义字符()和字符串的双引号("),不然工作流节点的主程序参数无法设置且在dolphinscheduler节点中无法执行,其中参数“mainArgs”中“}}”替换为“} }”,且需要拼接“\”转义字符,另外任务参数需要“\\”,不然接收方节点得main的args参数格式无法设置进去、解析及使用。
==2)针对数据同步任务:==
“创建或更新工作流时更新参数processDefinitionJson”时,需传转字符串的双引号("),且params参数的json参数需要转义字符(),不然工作流节点的主程序参数无法设置且在dolphinscheduler节点中无法执行。
⑤ “创建或更新工作流”时,参数locations和processDefinitionJson中的两个参数(id、name)要保持一致,其中“spark-任务ID”和“datax-任务ID”,“name”:“xxx”->代表工作流中节点名字,举例说明:

locations {“spark-110”:{“name”:“spark-110” 、 {“datax-110”:{“name”:“datax-110”
processDefinitionJson “id”:“spark-110”,“name”:“spark-110” 、 “id”:“datax-110”,“name”:“datax-110”


1)创建spark任务节点参数说明

参数 说明
connects [] 流程定义节点图标连接信息(json格式)
locations {“spark-100”:{“name”:“spark-100”,“targetarr”:"",“nodenumber”:“0”,“x”:50,“y”:50}} 流程定义节点坐标位置信息(json格式)
name spark-任务ID 流程定义名称
id 94 流程定义ID
processDefinitionJson {“globalParams”:[],“tasks”:[{“type”:“SPARK”,“id”:“spark-100”,“name”:“spark-100”,“params”:{“mainClass”:“Test2”,“mainJar”:{“id”:64},“deployMode”:“cluster”,“resourceList”:[],“localParams”:[],“driverCores”:1,“driverMemory”:“1G”,“numExecutors”:“1”,“executorMemory”:“1G”,“executorCores”:“1”,“mainArgs”:""{\“nodes\”:[{\“outputs\”:[\“9e3be626\”],\“color\”:\"#1890FF\",\“shape\”:\“flow-circle\”,\“inputs\”:[],\“index\”:0,\“label\”:\“dataSource_8548f316\”,\“modelType\”:\“dataSource\”,\“type\”:\“node\”,\“size\”:\“7272\",\“x\”:227.05078125,\“name\”:\“数据源_8548f316\”,\“y\”:131.0625,\“id\”:\“8548f316\”,\“config\”:{\“targetTable\”:\“machine_learning_house_info2\”,\“sourceType\”:\“mysql\”} },{\“outputs\”:[],\“color\”:\"#cba022\",\“shape\”:\“flow-rect\”,\“inputs\”:[\“8548f316\”],\“index\”:1,\“label\”:\“simpleSampling_9e3be626\”,\“modelType\”:\“simpleSampling\”,\“type\”:\“node\”,\“size\”:\"15033\”,\“x\”:228.05078125,\“name\”:\“采样《随机采样》_9e3be626\”,\“y\”:346.0625,\“id\”:\“9e3be626\”,\“config\”:{\“seed\”:\"\",\“withReplacement\”:false,\“countOrRatio\”:\“200\”,\“sampleType\”:0} }],\“name\”:\“da’s13\”,\“id\”:44,\“describeInfo\”:\“sdasad\”}"",“others”:"",“programType”:“JAVA”,“sparkVersion”:“SPARK2”},“description”:"",“timeout”:{“strategy”:"",“interval”:null,“enable”:false},“runFlag”:“NORMAL”,“conditionResult”:{“successNode”:[""],“failedNode”:[""]},“dependence”:{},“maxRetryTimes”:“0”,“retryInterval”:“1”,“taskInstancePriority”:“MEDIUM”,“workerGroup”:“default”,“preTasks”:[]}],“tenantId”:3,“timeout”:0} ==流程定义详细信息(json格式),type:节点类型,id:tasks-任务ID,代表节点ID,name,spark-任务ID,代表节点名称,params:节点所有参数,mainClass:spark任务jar得主函数,mainJar:代dolphinscheduler调度器得资源中心上得资源ID,mainArgs:代表spark任务jar所需要的参数String[] args tenantId:租户ID,也就是hdfs租户ID==

2)创建datax任务节点参数说明

参数 说明
connects [] 流程定义节点图标连接信息(json格式)
locations {“datax-100”:{“name”:“datax-100”,“targetarr”:"",“nodenumber”:“0”,“x”:50,“y”:50}} 流程定义节点坐标位置信息(json格式)
name datax-任务ID 流程定义名称
id 95 流程定义ID
processDefinitionJson {“tenantId”:3,“globalParams”:[],“timeout”:0,“tasks”:[{“conditionResult”:{“successNode”:[],“failedNode”:[]},“description”:"",“runFlag”:“NORMAL”,“type”:“DATAX”,“params”:{“customConfig”:1,“json”:"{“job”:{“content”:[{“reader”:{“parameter”:{“password”:“gee123456”,“connection”:[{“querySql”:[“SELECT id, name FROM test_test”],“jdbcUrl”:[“jdbc:mysql://192.168.20.75:9950/geespace_bd_platform_dev”]}],“username”:“geespace”},“name”:“mysqlreader”},“writer”:{“parameter”:{“password”:“gee123456”,“column”:[“id”,“name”],“connection”:[{“jdbcUrl”:“jdbc:mysql://192.168.20.75:9950/geespace_bd_platform_dev”,“table”:[“test_test_1”]}],“writeMode”:“insert”,“username”:“geespace”},“name”:“mysqlwriter”}}],“setting”:{“errorLimit”:{“record”:0,“percentage”:0.02},“speed”:{“channel”:1}}}}",“localParams”:[]},“timeout”:{“enable”:false,“strategy”:""},“maxRetryTimes”:“0”,“taskInstancePriority”:“MEDIUM”,“name”:“datax-144”,“dependence”:{},“retryInterval”:“1”,“preTasks”:[],“id”:“datax-144”,“workerGroup”:“default”}]} ==流程定义详细信息(json格式),type:节点类型,id:datax-任务ID,代表节点ID,name:datax-任务ID,代表节点名称,params:节点所有参数,customConfig:代表数据同步类型,json:数据同步datax的json,localParams:[],tenantId:租户ID,也就是hdfs租户ID==

1.6页面原型

image.png
image.png
image.png
image.png
image.png
image.png

三、本人相关其他文章链接

1.springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理:
https://blog.csdn.net/a924382407/article/details/117119831

2.springboot项目集成dolphinscheduler调度器 实现datax数据同步任务:
https://blog.csdn.net/a924382407/article/details/120951230

3.springboot项目集成dolphinscheduler调度器 项目管理:
https://blog.csdn.net/a924382407/article/details/117118931

4.springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
https://blog.csdn.net/a924382407/article/details/117121181

5.springboot项目集成大数据第三方dolphinscheduler调度器
https://blog.csdn.net/a924382407/article/details/117113848

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。