过程数据
- rdd-3被二次使用,第一次使用之后,其实rdd-3就不存在了
- 第二次用的时候,只能基于RDD的血缘关系,从rdd-1重新执行
- 重新构建rdd-3后,供rdd-5使用
RDD之间进行相互迭代计算(Transformation的转换)
这个特性可以最大化的利用资源,老旧RDD没用了,就从内存中清理,给后续的计算腾出内存空间
RDD 缓存
- 缓存技术可以将RDD过程数据,持久化到内存或者硬盘上
- 但是,这个保存在设定上认为是不安全的(缓存的数据有丢失风险)
- 所以缓存的一个特点:
保存RDD之间的依赖关系
from pyspark.storagelevel import StorageLevel
rdd_1 = sc.textFile("../00_data/input/words.txt")
rdd_2 = rdd_1.flatMap(lambda x: x.split(" "))
rdd_3 = rdd_2.map(lambda x: (x, 1))
# 将rdd_3写入缓存
rdd_3.cache()
rdd_3.persist(StorageLevel.MEMORY_ONLY) # 仅内存缓存
rdd_3.persist(StorageLevel.MEMORY_ONLY_2) # 仅内存缓存,2个副本
rdd_3.persist(StorageLevel.MEMORY_AND_DISK) # 先放内存,不够放硬盘
rdd_3.persist(StorageLevel.MEMORY_AND_DISK_2) # 先放内存,不够放硬盘,2个副本
rdd_4 = rdd_3.reduceByKey(lambda a, b: a + b)
print(rdd_4.collect())
# 清除RDD缓存
rdd_3.unpersist()
RDD将自己分区的数据,自行保存到所在Executor内存和硬盘上 ==> 分散存储
RDD CheckPoint
也是将RDD的数据保存起来,但是
仅支持硬盘存储
- 它的设计认为是安全的的
- 不保留依赖关系
集中收集各个分区数据进行存储
# 告知Spark,开启CheckPoint
sc.setCheckpointDir("hdfs://node-211:8020/zheng/checkpoint")
rdd_1 = sc.textFile("../00_data/input/words.txt")
rdd_2 = rdd_1.flatMap(lambda x: x.split(" "))
rdd_3 = rdd_2.map(lambda x: (x, 1))
# 用CheckPoint保存数据
rdd_3.cache()
rdd_4 = rdd_3.reduceByKey(lambda a, b: a + b)
print(rdd_4.collect())
区别
CheckPoint不管分区数量多少,风险都是一样的,缓存中分区越多,风险越高
CheckPoint支持写入HDFS,而缓存不行,HDFS是高可用存储,所以CheckPoint被认为是安全的的
CheckPoint不支持内存,而缓存可以,缓存方案中,如果写内存,性能要比CheckPoint好
CheckPoint因为设计认为安全,所以不保留依赖关系,而缓存中保留
注意
CheckPoint是一种重量级的使用
也就是RDD的重新计算成本很高的时候,或者数据量很大的时候,使用CheckPoint比较合适
如果数据量小,或者RDD的重新计算是比较快的,就没有使用CheckPoint的必要,直接使用缓存就可以
Cache和CheckPoint都不是Action类型的算子
所以要他们工作,后面必须接上Action
接上Action目的:让RDD有数据,而不是为了让CheckPoint和Cache工作
总结
- Cache和Checkpoint区别
- Cache是轻量化存储,可存储在内存和硬盘, 是分散存储, 设计上数据是不安全的(保留RDD血缘关系)
- CheckPoint是重量级存储, 是集中存储, 只能存储在硬盘(HDFS)上, 设计上是安全的(不保留RDD血缘关系)
- Cache 和 CheckPoint的性能对比?
- Cache性能更好,因为是分散存储,各个Executor并行执行,效率高,可以保存到内存中(占内存),更快
- CheckPoint比较慢,因为是集中存储,涉及到网络IO,但是存储到HDFS上更加安全(多副本)
评论区