" name="sm-site-verification"/>
侧边栏壁纸
博主头像
PySuper博主等级

千里之行,始于足下

  • 累计撰写 203 篇文章
  • 累计创建 14 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

SparkCore 之 内核调度

PySuper
2022-05-23 / 0 评论 / 0 点赞 / 38 阅读 / 6711 字
温馨提示:
所有牛逼的人都有一段苦逼的岁月。 但是你只要像SB一样去坚持,终将牛逼!!! ✊✊✊

DAG

  • DAG:有向无环图

    • 有向:有方向
    • 无环:没有闭环
    • 有方向,没有形成闭环的执行流程图

    image-20220705113753451

  • 如图:

    • 有方向:RDD-1 ==> RDD-2 ==> … … ==> collect 结束
    • 无闭环:以action(collect)结束,没有形成闭环
    • 作用:标识代码的逻辑执行流程
Job 和 Action

Action

  • 返回值不是RDD的算子

  • 它的作用是一个触发开关,会将action算子之前的一串RDD依赖链条执行起来

image-20220705114833561

  • 一个Action会将其前面一串的RDD依赖关系(Transformation)执行
  • 也就是一个Action会产生一个DGA图
  • 上图中,得到了2个DGA图

结论

一个action会产生一个DAG(如果代码中有3个action,就产生3个DAG)

一个action产生的DAG,会在程序中产生一个Job

所以:1个action = 1个DAG = 1个Job

如果一个代码中,写了3个action,那么这个代码运行起来会产生3个job,每个job会有自己的DAG

一个代码运行起来,在Spark中称之为:Application

  • 层级关系:

    • 一个application中,可以有多个job
    • 每个job内含有一个DAG
    • 同时,每个job都是由一个action产生的

宽窄依赖

在SparkRDD前后之间关系,分为:

  • 窄依赖:父RDD的一个分区,全部将数据发给子RDD的一个分区

    image-20230228091710909

  • 宽依赖(shuffle):父RDD的一个分区,将数据分发给子RDD的多个分区

    image-20230228092116720

阶段划分

划分依据:从后向前,遇到宽依赖就划分出一个阶段,称之为stage

如图可以看到,在DAG中,基于快毅力啊,将DAG划分成了两个Stage

在stage的内部,一定都是窄依赖

image-20230228100002615

内存迭代计算

如图,带有分区的DAG以及阶段划分,可与会从图中得到逻辑上最优的task分配:一个task是一个线程来具体执行

那么图中,task-1钟的rdd-1、rdd-2、rdd-3的迭代计算,都是由一个task(线程)完成==>纯内存计算

图中,task-1、task-2、task-3 就形成了三个并行的内存计算管道。

image-20230228100821309

Spark默认受到全局并行度的限制,除了个别算子有特殊分区情况,大部分的算子,都会遵循全局并行度的要求,来规划自己的分区数。如果全局并行度是3,其实大部分算子分区都是3

注意:Spark一般推荐只设置全局并行度,不要在算子上设置并行度

除了一些排序算子外,计算算子就让他们默认开分区就可以了

面试题

Spark是怎么做内存计算的?

DAG的作用?

Stage阶段划分的作用?

1、Spark会产生DAG图

2、DAG图会基于分区和宽窄依赖关系划分 阶段

3、一个阶段的内部都是窄依赖,窄依赖内,如果形成前后1:1的分区对应关系,就可以产生许多内存迭代计算的管道

4、这些内存迭代计算的管道,就是一个个具体执行的Task

5、一个Task是一个具体的线程,任务跑在一个线程内,就是走内存计算了

Sparrk为什么比MapReduce快

1、Spark的算子丰富,MapReduce算子匮乏(Map和Reduce),MapReduce这个编程模型,很难在一套MR中处理复杂任务,很多的复杂任务,是需要写多个Mapeduce进行串联,多个MR串联通过磁盘交互数据

2、Spark可以执行内存迭代,算子之间形成DAG基于依赖划分阶段后,在阶段内形成内存迭代管道,但是MapReduce的Map和Reduce之间的交互依旧是通过硬盘来交互的

结论:

  • 编程模型上Spark占优(算子多)
  • 算子交互、计算上,可以尽量多的内存计算,而非磁盘迭代

并行度

概念

Spark的并行: 在同一时间内, 有多少个task在同时运行

并行度:并行能力的设置,比如设置并行度6.其实就是要6个task并行在跑

在有了6个task并行的前提下,rdd的分区就被规划成6个分区了

优先级

可以在代码中和配置文件中以及提交程序的客户端参数中设置,优先级从高到低:

1、代码中

2、客户端提交参数中

3、配置文件中

4、默认(1,但是不会全部以1来跑,多数时候基于读取文件的分片数量来作为默认并行度)

全局并行度配置的参数:spark.default.parallelism

全局并行度

全局并行度是推荐设置,不要针对RDD改分区

可能会影响内存迭代管道的构建,或者会产生额外的Shufle

# 配置文件中
# conf/spark-defaults.conf中设置
spark.default.parallelism 100

# 在客户端提交参数中
bin/spark-submit --conf "spark.default.parallelism=100"

# 在代码中设置
conf = SaprkConf()
conf.set("spark.default.parallelism","100")

规划并行度

只看集群总CPU核数

结论:设置为CPU总核心的2-10倍

比如 集群可用CPU核心是100个,建议并行度是200~1000

确保是CPU核心的整数倍即可,最小是2倍,最大一般10倍或更高(适量)均可

为什么要设置最少2倍?

CPU的一个核心同一时间只能干一件事情

所以在100个核心的情况下,设置100个并行,就能让CPU 100%出力

这种设置下,如果task的压力不均衡,某个task先执行完了就导致某个CPU核心空闲

所以我们将Task(并行)分配的数量变多,比如800个并行同一时间只有100个在运行,700个在等待

但是可以确保,某个task运行完了.后续有task补上,不让cpu闲下来,最大程度利用集群的资源

任务调度

Spark的任务 由Driver进行调度,这个工作包含:

  • 逻辑DAG产生
  • 分区DAG产生
  • Task划分
  • 将Task分配给Executor并监控其工作

调度流程

如下,Spark程序的调度流程如图

image-20230228110431884

  • Driver被构建出来
  • 构建SparkContext(执行环境入口对象)
  • 基于DAG Scheduler(DAG调度器)构建逻辑Task分配
  • 基于TaskScheduler(Task调度器)将逻辑Task 分配到各个Executor上干活,并监控它们
  • Worker(Executor),被TaskScheduler管理监控,听从它们的指令工作,并定期汇报进度

两个组件

DAG调度器

将逻辑的DAG图进行处理,最终得到逻辑上的Task划分

Task调度器

基于DAGScheduler的产出,规划这些逻辑Task,应该在哪些物理的Executor上运行

以及监控管理它们的运行

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区