" name="sm-site-verification"/>
侧边栏壁纸
博主头像
PySuper博主等级

千里之行,始于足下

  • 累计撰写 203 篇文章
  • 累计创建 14 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

SparkCore 之 共享变量

PySuper
2022-05-15 / 0 评论 / 0 点赞 / 22 阅读 / 5397 字
温馨提示:
所有牛逼的人都有一段苦逼的岁月。 但是你只要像SB一样去坚持,终将牛逼!!! ✊✊✊

广播变量

  • 先将本地list对象,放入broadcast对象中,然后从broadcast内部取出,中间传输的就是broadcast这个对象
  • 只要传输的是broadcast,spark就只会给每个Executor发送一份对象信息(不会给每个分区都发)

场景:
1、本地集合对象不是很大的时候
2、本地集合对象 和 分布式集合对象(RDD) 进行关联的时候
3、如果将本地集合对象转化为RDD,在两个RDD关联的时候必然需要使用join算子,会产生shuffle(影响性能)
好处:
1、节省网络IO的次数
2、节省Executor的内存占用

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

stu_info_list = [
    (1, "张大仙", 11),
    (2, "王晓晓", 13),
    (3, "张甜甜", 11),
    (4, "王大力", 11)
]

score_info_rdd = sc.parallelize([
    (1, "语文", 99),
    (2, "数学", 99),
    (3, "英语", 99),
    (4, "编程", 99),
    (1, "语文", 99),
    (2, "编程", 99),
    (3, "语文", 99),
    (4, "英语", 99),
    (1, "语文", 99),
    (3, "英语", 99),
    (2, "编程", 99),
])


# TODO: 方法一
def map_func_1(data):
    """匹配本地ID和分布式RDD中的学生ID"""
    id = data[0]
    name = ""

    for stu_info in stu_info_list:
        stu_id = stu_info[0]
        if id == stu_id:
            # 匹配成功后,即可获得当前学生姓名
            name = stu_info[1]
    return (name, data[1], data[2],)


# TODO: 方法二:使用 广播变量
# 将本地list标记为 广播变量
broadcast = sc.broadcast(stu_info_list)


def map_func_2(data):
    """匹配本地ID和分布式RDD中的学生ID"""
    id = data[0]
    name = ""

    # 使用广播变量:从broadcast对象中取出
    for stu_info in broadcast.value:
        stu_id = stu_info[0]
        if id == stu_id:
            # 匹配成功后,即可获得当前学生姓名
            name = stu_info[1]
    return (name, data[1], data[2],)


print(score_info_rdd.map(map_func_2).collect())

累加器

注意事项:
当我们在一个RDD中使用累加器的时候
因为RDD的计算,不会自主保存中间数据
所以每次使用过的RDD会被销毁,再次使用的时候会构建整个RDD链条
可能会出现累加器结果异常
这时候可以将使用累加器的RDD缓存下来(cache,CheckPoint)

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)

# TODO: 无法实现, 分布式计算的累加问题
count = 0

# 非RDD代码,有Driver执行
def map_func(data):
    global count
    count += 1
    print(count)

rdd.map(map_func).collect()
print(count)

# TODO: 使用spark提供的累加器功能
acmlt = sc.accumulator(0)

def map_func(data):
    global acmlt
    acmlt += 1
    print(acmlt)

# TODO: 分布式计算的累加问题
rdd.map(map_func).collect()
print(acmlt)

综合案例

数据:

   hadoop spark # hadoop spark spark
mapreduce ! spark spark hive !
hive spark hadoop mapreduce spark %
   spark hive sql sql spark hive , hive spark !
!  hdfs hdfs mapreduce mapreduce spark hive
  #

需求:
1、对正常的单词进行单词计数
2、特殊字符,统计出现有多少个

特殊字符定义:

abnormal_char = [",", ".", "!", "#", "$", "%"] 
import os
import re

from pyspark import SparkConf, SparkContext

os.environ['JAVA_HOME'] = '/hadoop/software/jdk1.8.0_241'
# os.environ["HADOOP_CONF_DIR"] = "/hadoop/software/hadoop-3.3.0/etc/hadoop"

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

# 读取文件
file_rdd = sc.textFile("../00_data/input/accumulator_broadcast_data.txt")

# 定义特殊字符
abnormal_char = [",", ".", "!", "#", "$", "%"]

# TODO:将特殊字符list,包装成 广播变量
broadcast = sc.broadcast(abnormal_char)

# TODO:对特殊字符出现次数做累加,使用 累加器
acmlt = sc.accumulator(0)

# 数据处理,先处理数据的空行
# strip,有内容就是True,None就是False
lines_rdd = file_rdd.filter(lambda line: line.strip())

# 去除前后的空格
data_rdd = lines_rdd.map(lambda line: line.strip())

# 按照正则表达式对数据进行切分,因为某些单词之间的空格分隔符是两个或多个
words_rdd = data_rdd.flatMap(lambda line: re.split("\s+", line))


def filter_func(data):
    """过滤数据:保留正常单词用于单词计数;在过滤的过程中,对特殊字符计数"""
    global acmlt
    abnormal_chars = broadcast.value
    if data in abnormal_chars:
        # 表示这个数据是特殊字符
        acmlt += 1
        return False
    else:
        return True

# 当前RDD中有正常单词,也有特殊符号
normal_words_rdd = words_rdd.filter(filter_func)

# TODO: 正常单词
result_rdd = normal_words_rdd.map(lambda x:(x, 1)).reduceByKey(lambda a,b:a+b)
print(f"正常单词:{result_rdd.collect()}")

# TODO:特殊字符
print(f"特殊字符:{acmlt}")

总结

  • 广播变量解决了什么问题?
    • 分布式集合RDD和本地集合进行关联使用的时候, 降低内存占用以及减少网络IO传输, 提高性能
  • 累加器解决了什么问题?
    • 分布式代码执行中, 进行全局累加
0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区