DAG
-
DAG:有向无环图
- 有向:有方向
- 无环:没有闭环
- 有方向,没有形成闭环的执行流程图
-
如图:
- 有方向:RDD-1 ==> RDD-2 ==> … … ==> collect 结束
- 无闭环:以action(collect)结束,没有形成闭环
- 作用:标识代码的
逻辑
执行流程
Job 和 Action
Action
-
返回值不是RDD的算子
-
它的作用是一个触发开关,会将action算子之前的
一串RDD依赖链条
执行起来
- 一个Action会将其前面一串的RDD依赖关系(Transformation)执行
- 也就是一个Action会产生一个DGA图
- 上图中,得到了2个DGA图
结论
一个action会产生一个DAG(如果代码中有3个action,就产生3个DAG)
一个action产生的DAG,会在程序中产生一个Job
所以:1个action = 1个DAG = 1个Job
如果一个代码中,写了3个action,那么这个代码运行起来会产生3个job,每个job会有自己的DAG
一个代码运行起来,在Spark中称之为:
Application
-
层级关系:
- 一个application中,可以有多个job
- 每个job内含有一个DAG
- 同时,每个job都是由一个action产生的
宽窄依赖
在SparkRDD前后之间关系,分为:
-
窄依赖:父RDD的一个分区,全部将数据发给子RDD的一个分区
-
宽依赖(
shuffle
):父RDD的一个分区,将数据分发给子RDD的多个分区
阶段划分
划分依据:从后向前,遇到
宽依赖
就划分出一个阶段,称之为stage
如图可以看到,在DAG中,基于快毅力啊,将DAG划分成了两个Stage
在stage的内部,一定都是窄依赖
内存迭代计算
如图,带有分区的DAG以及阶段划分,可与会从图中得到逻辑上最优的task分配:一个task是一个线程来具体执行
那么图中,task-1钟的rdd-1、rdd-2、rdd-3的迭代计算,都是由一个task(线程)完成==>纯内存计算
图中,task-1、task-2、task-3 就形成了三个并行的内存计算管道。
Spark默认受到全局并行度的限制,除了个别算子有特殊分区情况,大部分的算子,都会遵循全局并行度的要求,来规划自己的分区数。如果全局并行度是3,其实大部分算子分区都是3
注意:Spark一般推荐只设置全局并行度,不要在算子上设置并行度
除了一些排序算子外,计算算子就让他们默认开分区就可以了
面试题
Spark是怎么做内存计算的?
DAG的作用?
Stage阶段划分的作用?
1、Spark会产生DAG图
2、DAG图会基于分区和宽窄依赖关系划分 阶段
3、一个阶段的内部都是窄依赖
,窄依赖内,如果形成前后1:1的分区对应关系,就可以产生许多内存迭代计算的管道
4、这些内存迭代计算的管道,就是一个个具体执行的Task
5、一个Task是一个具体的线程,任务跑在一个线程内,就是走内存计算了
Sparrk为什么比MapReduce快
1、Spark的算子丰富,MapReduce算子匮乏(Map和Reduce),MapReduce这个编程模型,很难在一套MR中处理复杂任务,很多的复杂任务,是需要写多个Mapeduce进行串联,多个MR串联通过磁盘交互数据
2、Spark可以执行内存迭代,算子之间形成DAG基于依赖划分阶段后,在阶段内形成内存迭代管道,但是MapReduce的Map和Reduce之间的交互依旧是通过硬盘来交互的
结论:
- 编程模型上Spark占优(算子多)
- 算子交互、计算上,可以尽量多的内存计算,而非磁盘迭代
并行度
概念
Spark的并行: 在同一时间内, 有多少个task在同时运行
并行度:并行能力的设置
,比如设置并行度6.其实就是要6个task并行在跑
在有了6个task并行的前提下,rdd的分区就被规划成6个分区了
优先级
可以在代码中和配置文件中以及提交程序的客户端参数中设置,优先级从高到低:
1、代码中
2、客户端提交参数中
3、配置文件中
4、默认(1,但是不会全部以1来跑,多数时候基于读取文件的分片数量来作为默认并行度)
全局并行度配置的参数:spark.default.parallelism
全局并行度
全局并行度是推荐设置,不要针对RDD改分区
可能会影响内存迭代管道的构建,或者会产生额外的Shufle
# 配置文件中
# conf/spark-defaults.conf中设置
spark.default.parallelism 100
# 在客户端提交参数中
bin/spark-submit --conf "spark.default.parallelism=100"
# 在代码中设置
conf = SaprkConf()
conf.set("spark.default.parallelism","100")
规划并行度
只看集群总CPU核数
结论:设置为CPU总核心的2-10倍
比如 集群可用CPU核心是100个,建议并行度是200~1000
确保是CPU核心的整数倍即可,最小是2倍,最大一般10倍或更高(适量)均可
为什么要设置最少2倍?
CPU的一个核心同一时间只能干一件事情
所以在100个核心的情况下,设置100个并行,就能让CPU 100%出力
这种设置下,如果task的压力不均衡,某个task先执行完了就导致某个CPU核心空闲
所以我们将Task(并行)分配的数量变多,比如800个并行同一时间只有100个在运行,700个在等待
但是可以确保,某个task运行完了.后续有task补上,不让cpu闲下来,最大程度利用集群的资源
任务调度
Spark的任务 由Driver进行调度,这个工作包含:
- 逻辑DAG产生
- 分区DAG产生
- Task划分
- 将Task分配给Executor并监控其工作
调度流程
如下,Spark程序的调度流程如图
- Driver被构建出来
- 构建SparkContext(执行环境入口对象)
- 基于DAG Scheduler(DAG调度器)构建逻辑Task分配
- 基于TaskScheduler(Task调度器)将逻辑Task 分配到各个Executor上干活,并监控它们
- Worker(Executor),被TaskScheduler管理监控,听从它们的指令工作,并定期汇报进度
两个组件
DAG调度器
将逻辑的DAG图进行处理,最终得到逻辑上的Task划分
Task调度器
基于DAGScheduler的产出,规划这些逻辑Task
,应该在哪些物理
的Executor上运行
以及监控管理它们的运行
评论区