分布式计算需要:
- 分区控制
- Shuffle 控制
- 数据存储/序列化/发送
- 数据计算API
- 等一系列功能
这些功能, 不能简单的通过Python内置的本地集合对象(如 List\ 字典等)去完成;
在分布式框架中,需要有一个统一的数据抽象对象,来实现上述分布式计算所需功能;
这个抽象对象,就是RDD!
什么是RDD
Resilient Distributed Dataset,弹性分布式数据集
Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算
的集合
- Dataset:一个数据集合,用于存放数据的
- Distributed:RDD中的数据是分布式存储的,可用于分布式计算
- Resilient:RDD中的数据
可以存储在内存中或者磁盘
中
RDD的5大特性
- RDD是有
分区
的- RDD的分区是RDD数据存储的最小单位
- 一份RDD的数据,本质上是分隔成了多个分区
sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3).glom().collect()
# [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 6).glom().collect()
# [[1], [2, 3], [4], [5, 6], [7], [8, 9]]
sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 2).glom().collect()
# [[1, 2, 3, 4], [5, 6, 7, 8, 9]]
- RDD 的方法会作用在其
所有的分区
上
sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 2).map(lambda x: x * (10)).glom().collect()
# [[10, 20, 30, 40], [50, 60, 70, 80, 90]]
- RDD 之间是有
依赖关系
的(迭代计算)
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("word_count")
sc = SparkContext(conf=conf)
# 层层依赖,形成一个依赖链条
rdd_1 = sc.textFile("./data/input/words.txt")
rdd_2 = rdd_1.flatMap(lambda x: x.split(" "))
rdd_3 = rdd_2.map(lambda x: (x, 1))
rdd_4 = rdd_3.reduceByKey(lambda a, b: a + b)
- Key-Value型的RDD可以有
分区器
- 默认分区器:Hash分区规则(相同数据分到一起),可以手动设置一个分区器(rdd_4.partitionBy()设置)
- 非必选,因为不是所有的RDD都是KV型
- KV RDD:RDD中存储的是 二元元组(只有两个元素的数组),这就是KV型的RDD
- RDD的分区规划,会尽量
靠近数据
所在的服务器- 在初始RDD(读取数据的时候)规划的时候,分区会尽量规划到存储数据所在的服务器上(尽量本地读取)
- 本地读取:
- Executor所在服务器,同样是一个DataNode
- 同时这个DataNode上有它要读的数据
- 所以可以直接读取机器硬盘,无需走网络传输
- 网络读取:
- 需要经过网络的传输才能读取到
- 本地读取:
- 性能:本地读取 >>> 网络读取
- 在确保并行计算能力的前提下,尽量确保本地读取,但不是一定
- 在初始RDD(读取数据的时候)规划的时候,分区会尽量规划到存储数据所在的服务器上(尽量本地读取)
评论区