广播变量
- 先将本地list对象,放入broadcast对象中,然后从broadcast内部取出,中间传输的就是broadcast这个对象
- 只要传输的是broadcast,spark就只会给每个Executor发送一份对象信息(不会给每个分区都发)
场景:
1、本地集合对象不是很大的时候
2、本地集合对象 和 分布式集合对象(RDD) 进行关联的时候
3、如果将本地集合对象转化为RDD,在两个RDD关联的时候必然需要使用join算子,会产生shuffle(影响性能)
好处:
1、节省网络IO的次数
2、节省Executor的内存占用
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
stu_info_list = [
(1, "张大仙", 11),
(2, "王晓晓", 13),
(3, "张甜甜", 11),
(4, "王大力", 11)
]
score_info_rdd = sc.parallelize([
(1, "语文", 99),
(2, "数学", 99),
(3, "英语", 99),
(4, "编程", 99),
(1, "语文", 99),
(2, "编程", 99),
(3, "语文", 99),
(4, "英语", 99),
(1, "语文", 99),
(3, "英语", 99),
(2, "编程", 99),
])
# TODO: 方法一
def map_func_1(data):
"""匹配本地ID和分布式RDD中的学生ID"""
id = data[0]
name = ""
for stu_info in stu_info_list:
stu_id = stu_info[0]
if id == stu_id:
# 匹配成功后,即可获得当前学生姓名
name = stu_info[1]
return (name, data[1], data[2],)
# TODO: 方法二:使用 广播变量
# 将本地list标记为 广播变量
broadcast = sc.broadcast(stu_info_list)
def map_func_2(data):
"""匹配本地ID和分布式RDD中的学生ID"""
id = data[0]
name = ""
# 使用广播变量:从broadcast对象中取出
for stu_info in broadcast.value:
stu_id = stu_info[0]
if id == stu_id:
# 匹配成功后,即可获得当前学生姓名
name = stu_info[1]
return (name, data[1], data[2],)
print(score_info_rdd.map(map_func_2).collect())
累加器
注意事项:
当我们在一个RDD中使用累加器的时候
因为RDD的计算,不会自主保存中间数据
所以每次使用过的RDD会被销毁,再次使用的时候会构建整个RDD链条
可能会出现累加器结果异常
这时候可以将使用累加器的RDD缓存下来
(cache,CheckPoint)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)
# TODO: 无法实现, 分布式计算的累加问题
count = 0
# 非RDD代码,有Driver执行
def map_func(data):
global count
count += 1
print(count)
rdd.map(map_func).collect()
print(count)
# TODO: 使用spark提供的累加器功能
acmlt = sc.accumulator(0)
def map_func(data):
global acmlt
acmlt += 1
print(acmlt)
# TODO: 分布式计算的累加问题
rdd.map(map_func).collect()
print(acmlt)
综合案例
数据:
hadoop spark # hadoop spark spark
mapreduce ! spark spark hive !
hive spark hadoop mapreduce spark %
spark hive sql sql spark hive , hive spark !
! hdfs hdfs mapreduce mapreduce spark hive
#
需求:
1、对正常的单词进行单词计数
2、特殊字符,统计出现有多少个
特殊字符定义:
abnormal_char = [",", ".", "!", "#", "$", "%"]
import os
import re
from pyspark import SparkConf, SparkContext
os.environ['JAVA_HOME'] = '/hadoop/software/jdk1.8.0_241'
# os.environ["HADOOP_CONF_DIR"] = "/hadoop/software/hadoop-3.3.0/etc/hadoop"
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 读取文件
file_rdd = sc.textFile("../00_data/input/accumulator_broadcast_data.txt")
# 定义特殊字符
abnormal_char = [",", ".", "!", "#", "$", "%"]
# TODO:将特殊字符list,包装成 广播变量
broadcast = sc.broadcast(abnormal_char)
# TODO:对特殊字符出现次数做累加,使用 累加器
acmlt = sc.accumulator(0)
# 数据处理,先处理数据的空行
# strip,有内容就是True,None就是False
lines_rdd = file_rdd.filter(lambda line: line.strip())
# 去除前后的空格
data_rdd = lines_rdd.map(lambda line: line.strip())
# 按照正则表达式对数据进行切分,因为某些单词之间的空格分隔符是两个或多个
words_rdd = data_rdd.flatMap(lambda line: re.split("\s+", line))
def filter_func(data):
"""过滤数据:保留正常单词用于单词计数;在过滤的过程中,对特殊字符计数"""
global acmlt
abnormal_chars = broadcast.value
if data in abnormal_chars:
# 表示这个数据是特殊字符
acmlt += 1
return False
else:
return True
# 当前RDD中有正常单词,也有特殊符号
normal_words_rdd = words_rdd.filter(filter_func)
# TODO: 正常单词
result_rdd = normal_words_rdd.map(lambda x:(x, 1)).reduceByKey(lambda a,b:a+b)
print(f"正常单词:{result_rdd.collect()}")
# TODO:特殊字符
print(f"特殊字符:{acmlt}")
总结
- 广播变量解决了什么问题?
- 分布式集合RDD和本地集合进行关联使用的时候, 降低内存占用以及减少网络IO传输, 提高性能
- 累加器解决了什么问题?
- 分布式代码执行中, 进行全局累加
评论区