" name="sm-site-verification"/>
侧边栏壁纸
博主头像
PySuper 博主等级

千里之行,始于足下

  • 累计撰写 206 篇文章
  • 累计创建 14 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

SparkCore 之 RDD编程

PySuper
2022-04-21 / 0 评论 / 0 点赞 / 52 阅读 / 0 字
温馨提示:
本文最后更新于2024-05-28,若内容或图片失效,请留言反馈。 所有牛逼的人都有一段苦逼的岁月。 但是你只要像SB一样去坚持,终将牛逼!!! ✊✊✊

RDD的创建

SparkContext

  • Spark RDD 编程的程序入口对象是SparkContext对象(不论何种编程语言)
  • 只有构建出SparkContext,基于它才能执行后续的API调用和计算
  • 本质上,SparkContext对编程来说,主要功能就是创建第一个RDD出来

并行化创建

rdd = sparkcontext.parallelize(参数1, 参数2)

  • 参数1:集合对象即可,如List
  • 参数2:分区数
import os

from pyspark import SparkConf, SparkContext

# 设置JAVA路径
os.environ['JAVA_HOME'] = '/hadoop/software/jdk1.8.0_241'

# 1、初始化执行环境,创建SparkContext对象
conf = SparkConf().setAppName("create_parallelize").setMaster("local[*]")
sc = SparkContext(conf=conf)

# 2、通过并行化集合的方式创建RDD
python_list = [1, 2, 3, 4, 5, 6, 7, 8, 9]

# parallelize():本地集合 ==> 分布式对象(RDD)
rdd_1 = sc.parallelize(python_list)
print(f"默认分区数:{rdd_1.getNumPartitions()}")  # 由CPU核心数来定

rdd_2 = sc.parallelize(python_list, 3)
print(f"手动分区数:{rdd_2.getNumPartitions()}")

# collect():分布式对象(RDD) ==> 本地集合
print(f"RDD的内容是:{rdd_2.collect()}")

读取文件创建

rdd = sparkcontext.textFile(参数1,参数2,参数3)

  • 参数1:必填,文件/文件夹路径
    • 支持本地文件
    • 支持HDFS
    • 支持一些如S3协议
  • 参数2:可选,表示最小分区数量

  • 注意:参数2话语权不足,Spark有自己的判断
    • 在它允许的范围内,参数2才有效果
    • 超过Spark允许的范围,参数2则无效

import os

from pyspark import SparkConf, SparkContext

# 设置JAVA路径
os.environ['JAVA_HOME'] = '/hadoop/software/jdk1.8.0_241'

# 创建SC对象
conf = SparkConf().setAppName("create_file").setMaster("local[*]")
sc = SparkContext(conf=conf)

file_rdd_1 = sc.textFile("../00_data/input/words.txt")
print(f"默认分区数:{file_rdd_1.getNumPartitions()}")
print(f"RDD内容:{file_rdd_1.collect()}")

file_rdd_2 = sc.textFile("../00_data/input/words.txt", 3)
print(f"file_rdd_2分区数:{file_rdd_2.getNumPartitions()}")

file_rdd_3 = sc.textFile("../00_data/input/words.txt", 100)
print(f"file_rdd_3分区数:{file_rdd_3.getNumPartitions()}")

rdd = sc.wholeTextFiles("…/00_data/input/tiny_files/")

  • 可以读取文件夹路径
  • 适用于多个小文件读取
import os

from pyspark import SparkConf, SparkContext

# 设置JAVA路径
os.environ['JAVA_HOME'] = '/hadoop/software/jdk1.8.0_241'

# 创建SC对象
conf = SparkConf().setAppName("create_file").setMaster("local[*]")
sc = SparkContext(conf=conf)

# 读取本地文件
whole_rdd = sc.wholeTextFiles("../00_data/input/tiny_files/")
print(f"默认分区数:{whole_rdd.getNumPartitions()}")
for tuple_obj in whole_rdd.collect():
    print(f"文件路径:{tuple_obj[0]}")
    print(f"文件内容:{tuple_obj[1]}")
    print("==" * 10)

RDD 算子

  • 算子是什么
    • 分布式集合对象上的API
    • 本地对象的API称为方法/函数
    • 而分布式对象的API称为算子
  • 算子分类
    • Transformation:转换算子
      • RDD的算子,返回值仍旧是一个RDD的,称为转换算子
      • 这类算子是懒加载,如果没有action算子,Transformation算子是不工作的
    • Action:动作(行动)算子
      • 返回值不是RDD的就是Action算子

Transformation算子相当于在构建执行计划

Action算子是一个指令让这个执行计划开始工作

没有Action算子,Transformation算子的迭代关系无法工作

Transformation算子

map

将RDD的数据一条条处理

处理的逻辑,基于map算子中接收的处理函数

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)

def parse(num):
    return num * 10

# 定义方法,作为算子的传入函数体
print(rdd.map(parse).collect())

# 使用lambda表达式
print(rdd.map(lambda num: num * 10).collect())

flatmap

对RDD执行map操作,然后解除嵌套

rdd = sc.parallelize(["python spark", "java zookper", "go serverless"])

print(rdd.map(lambda key: key.split(" ")).collect())
print(rdd.flatMap(lambda key: key.split(" ")).collect())

reduceByKey

针对K-V型RDD,自动按照key分组

然后根据提供的聚合逻辑,完成组内数据(value)的聚合操作

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("b", 1)])

print(rdd.reduceByKey(lambda a, b: a + b).collect())

单词统计

# 1、读取文件数据,构建RDD对象
rdd_1 = sc.textFile("../00_data/input/words.txt")

# 2、使用flarMap() 去除所有单词
rdd_2 = rdd_1.flatMap(lambda x: x.split(" "))

# 3、将所有单词转为元组 (单词, 1)
rdd_3 = rdd_2.map(lambda word: (word, 1))

# 4、对单词进行分组,并按照 value 聚合
rdd_4 = rdd_3.reduceByKey(lambda a, b: a + b)

# 5、将RDD数据收集到Driver中,输出
print(rdd_4.collect())

groupBy

将RDD的数据进行分组

rdd = sc.parallelize([("a", 1), ("a", 1), ("b", 1), ("b", 1), ("b", 1)])

# 通过groupby分组
# groupby传入的函数:通过这个函数,确定按照谁来分组
result_rdd = rdd.groupBy(lambda x: x[0])
print(result_rdd.collect())

filter

对RDD数据进行去重,返回新的RDD

接收一个函数参数,函数的参数必须是True/False

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7])

# 使用Filter过滤
print(rdd.filter(lambda x: x % 2 == 1).collect())

distinct

对RDD算子进行去重,返回新的RDD

参数:去除分区数量,一般不用传

rdd_1 = sc.parallelize([1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6])
rdd_2 = sc.parallelize([("a", 1), ("a", 1), ("a", 2)])

# 使用distinct去重
print(rdd_1.distinct().collect())
print(rdd_2.distinct().collect())

union

2个RDD合并成1个RDD返回

只合并,不去重

rdd_1 = sc.parallelize([1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6])
rdd_2 = sc.parallelize([("a", 1), ("a", 1), ("a", 2)])

# 使用union合并RDD
rdd_3 = rdd_2.union(rdd_1)
print(rdd_3.collect())

join

对两个RDD执行join操作(可以实现SQL内/外连接)

join算子只能用于二元元组

rdd_1 = sc.parallelize([(1001,"张三"),(1002,"李四"),(1003,"王二"),(1004,"麻子")])
rdd_2 = sc.parallelize([(1001, "销售部"), (1002, "人事部")])

print(rdd_1.join(rdd_2).collect())
# [(1001, ('张三', '销售部')), (1002, ('李四', '人事部'))]

# 左外连接
print(rdd_1.leftOuterJoin(rdd_2).collect())
# [(1001, ('张三', '销售部')), (1002, ('李四', '人事部')), (1003, ('王二', None)), (1004, ('麻子', None))]

# 右外连接,可以更换rdd的顺序,或者调用rightOuterJoin即可

intersection

求两个RDD的交集,返回一个新RDD

rdd_1 = sc.parallelize([("a",1),("b",2)])
rdd_2 = sc.parallelize([("a",1),("c",3)])

print(rdd_1.intersection(rdd_2).collect())
# [('a', 1)]

glom

将RDD的数据,加上嵌套,这个嵌套按照分区来进行

rdd_1 = sc.parallelize([1, 2, 3, 4, 5], 2)

print(rdd_1.glom().collect())
# [[1, 2], [3, 4, 5]]

# flatmap(lambda x:x)
# 这个技巧只解嵌套,而不进行map

groupByKey

针对K-V型RDD,自动按照key分组

rdd_1 = sc.parallelize([("a",1),("b",1),("a",2),("b",2),("c",1)])
rdd_2 = rdd_1.groupByKey()  # 这里返回的还是一个RDD

print(rdd_2.map(lambda x: (x[0], list(x[1]))).collect())
# [('b', [1, 2]), ('c', [1]), ('a', [1, 2])]

sortBy

对RDD数据进行排序,基于指定的排序依据

rdd_1 = sc.parallelize([("a", 1), ("b", 2), ("c", 3), ("d", 4), ("e", 5)], 2)

rdd_2 = rdd_1.sortBy(lambda x: x[1], ascending=False, numPartitions=2)
# 1、函数:告诉sortBy() 按照数据的哪个列排序
# 2、布尔值:升序/降序
# 3、int:排序的分区数
# 注意:如果要全局有序,排序分区数设置为 1

print(rdd_2.collect())

sortByKey

针对K-V型RDD,按照key进行排序

  • ascending:升序/降序,True升序,False降序,默认为升序
  • numPartitions:按照几个分区进行排序,如果全局有序,设置为1
  • keyfunc:在排序钱对key进行处理
rdd = sc.parallelize([("a", 1), ("E", 1), ("b", 1), ("C", 1), ("D", 1)], 2)

print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key: str(key).lower()).collect())
# [('a', 1), ('b', 1), ('C', 1), ('D', 1), ('E', 1)]

商品类别

import json
import os

from pyspark import SparkConf, SparkContext

os.environ['JAVA_HOME'] = '/hadoop/software/jdk1.8.0_241'

conf = SparkConf().setAppName("create_file").setMaster("local[*]")
sc = SparkContext(conf=conf)

# 读取数据文件
file_rdd = sc.textFile("../00_data/input/order.text")

# 进行rdd数据的split分割,按照 | 符号,得到一个个json数据
jsons_rdd = file_rdd.flatMap(lambda line: line.split("|"))

# 将json字符串,转换为 dict对象
dict_rdd = jsons_rdd.map(lambda json_str: json.loads(json_str))

# 过滤数据,只保存北京的数据
beijing_rdd = dict_rdd.filter(lambda dict_: dict_["areaName"] == "北京")

# 组合北京和商品类型,组成新的字符串
category_rdd = beijing_rdd.map(lambda x: x["areaName"] + "_" + x["category"])

# 对结果集进行去重操作
result_rdd = category_rdd.distinct()

print(result_rdd.collect())
# ['北京_平板电脑', '北京_家具', '北京_书籍', '北京_食品', '北京_服饰', '北京_手机', '北京_家电', '北京_电脑']

Demo On Yarn

#!/usr/bin/env python3.8
# -*- coding: UTF-8 -*-
# FileName :02_create_file.py
# Author   :zheng xingtao
# Date     :2022/3/30 14:01


import json
import os

from pyspark import SparkConf, SparkContext

from func_def import change

os.environ['JAVA_HOME'] = '/hadoop/software/jdk1.8.0_241'

# 添加hadoop配置文件路径
os.environ["HADOOP_CONF_DIR"] = "/hadoop/software/hadoop-3.3.0/etc/hadoop"

# 提交数据到yarn集群
conf = SparkConf().setAppName("test-yarn-1").setMaster("yarn")

# 设置需要同步上传的文件路径
conf.set("spark.submit.pyFiles", "./func_def.py")

sc = SparkContext(conf=conf)

# 在集群中运行,需要使用HDFS路径
file_rdd = sc.textFile("hdfs://node-211:8020/zheng/order.text")

jsons_rdd = file_rdd.flatMap(lambda line: line.split("|"))

dict_rdd = jsons_rdd.map(lambda json_str: json.loads(json_str))

beijing_rdd = dict_rdd.filter(lambda dict_: dict_["areaName"] == "北京")

# 使用匿名行数
# category_rdd = beijing_rdd.map(lambda x: x["areaName"] + "_" + x["category"])

# 使用 导入函数
# 如果提交到集群运行,除了主代码以外,还依赖了其他的代码文件
# 需要设置一个参数,告知spark,还有依赖文件要同步上传的到集群
# spark.submit.pyFiles
# 参数的值可以是单个py文件,也可以是zip压缩包(有多个依赖文件的时候可以用zip压缩后上传)
category_rdd = beijing_rdd.map(change)

result_rdd = category_rdd.distinct()

print(result_rdd.collect())

# 如果使用submit提交
# /hadoop/software/spark/bin/spark-submit --master yarn --py-files ./func_def.py /root/test-file/func_def.py 

Action算子

countByKey

统计key出现的次数(一般使用与K-V型的RDD)

rdd_1 = sc.textFile("../00_data/input/words.txt")
rdd_2 = rdd_1.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1))

resutl = rdd_2.countByKey()

print(resutl)

collect

将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象

  • 这个算子是将RDD各个分区数据,都拉取到Driver
  • 注意:RDD是分布式对象,其数据量可以很大
  • 所以用这个算子之前,要了解结果数据集不会太大,不然会把Driver内存撑爆

ruduce

对RDD数据集按照传入的逻辑进行聚合

rdd = sc.parallelize(range(10))

print(rdd.reduce(lambda a, b: a + b))
# 45

fold- 了解

和reduce一样,接收传入逻辑进行聚合,聚合是带有初始值的

初始值聚合作用在:

  • 分区内聚合
  • 分区间聚合
rdd = sc.parallelize(range(1, 10), 3)
print(rdd.glom().collect())
# [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

print(rdd.fold(10, lambda a, b: a + b))
# 85

first

取出RDD的第一个元素

take

取RDD的前N个元素,组成List返回

top

对RDD数据集进行降序排序,取前N个

count

计算RDD有多少条数据,返回一个数字

rdd = sc.parallelize(range(1, 10), 3)

print(rdd.first())
# 1

print(rdd.take(3))
# [1, 2, 3]

print(rdd.top(3))
# [9, 8, 7]

print(rdd.count())
# 9

takeSample

随机抽样RDD的数据

takeSample(参数1:true or false,参数2:采样数,参数3:随机种子数)

  • 参数1:True表示可以取同一个数据,False则不允许(指的是同一位置,和内容无关)
  • 参数2:抽样要几个
  • 参数3:随机数种子(随意给),如果给的是同一个值,那么取出的结果也是一致的
rdd = sc.parallelize(range(1, 10), 3)

print(rdd.takeSample(True, 3))
# [7, 6, 5]

takeOrdered

对RDD进行排序取前n个

使用元素自然顺序升序排序,如果要降序,可以使用参数2,更改排序数据

rdd.takeOedered(参数1:要几个数据,参数2:数据更改规则(不会更改数据本身))

rdd = sc.parallelize(range(1, 10), 3)

# 默认排序
print(rdd.takeOrdered(5))
# [1, 2, 3, 4, 5]

# 降序
print(rdd.takeOrdered(5, lambda x: -x))
# [9, 8, 7, 6, 5]

foreach

对RDD的每个元素,执行指定的业务逻辑,但是没有返回值

是由executor直接执行输出的

rdd = sc.parallelize(range(1, 10), 3)

# 这里需要注意分区的问题
rdd.foreach(lambda x: print(x * 2))
# 1428
# 16410
# 18612

saveAsTextFile

将RDD数据写入文本文件中(本地、HDFS)

由executor执行

rdd_1 = sc.parallelize(range(1, 10), 3)

rdd_1.saveAsTextFile("../00_data/input/save_file")
rdd_2 = sc.textFile("../00_data/input/save_file")
print(rdd_2.collect())

# 这里写入的文件个数,与原数据的分区数有关

分区操作算子

mapPartitions

mapPartitions一次,被传递的是一整个分区的数据,作为一个迭代器对象传入进来

rdd = sc.parallelize(range(1, 10), 3)

print(rdd.mapPartitions(lambda x: [i * 10 for i in x]).collect())
# [10, 20, 30, 40, 50, 60, 70, 80, 90]
# 这里接收的回一个 迭代器对象
# 网络、空间 方面会有较大性能提升

foreachPatition

和foreach一致,一次处理的是一整个分区的数据

rdd = sc.parallelize(range(1, 10), 3)

# 这里没有返回值的,不需要再用print输出
rdd.foreachPartition(lambda x: [print(i * 10) for i in x])

partitionBy

对RDD进行自定义分区操作

rdd.partitionBy(参数1,参数2)

  • 参数1:重新分区后有几个分区
  • 参数2:自定义分区规则,函数传入(分区编号从0开始)
  • 这里是以Key操作的
rdd = sc.parallelize([(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)])

# 自定义分区规则
print(rdd.partitionBy(2, lambda x: 0 if x % 2 == 0 else 1).glom().collect())


def change(num):
    if num % 2 == 0:
        return 0
    else:
        return 1


print(rdd.partitionBy(2, change).glom().collect())

repartition

对RDD重新执行分区(只针对于数量)

如果修改了分区:

  • 会影响并行计算(内存迭代的并行管道数量)
  • 分区如果增加,极大可能增加shuffle

coalesce

对分区 进行数量增减

建议使用,有个安全阀

rdd.coalesce(参数1,参数2)

  • 参数1:分区数
  • 参数2:True or false
    • True:允许shuffle,也就是可以加分区
    • False:不允许shuffle,也就是不能加分区,False是默认
rdd = sc.parallelize(range(0, 10))

print(rdd.repartition(3).glom().collect())
# [[], [0, 1, 5, 6], [2, 3, 4, 7, 8, 9]]

print(rdd.coalesce(2, shuffle=True).glom().collect())
# [[0, 1, 5, 6, 7, 8, 9], [2, 3, 4]]

面试题

groupByKey和reduceByKey的区别?

如果对数据执行分组+聚合,那么这两个算子的性能差别是很大的

reduceByKey的性能远大于groupByKey + 聚合逻辑

reduceByKey自带聚合逻辑,可以在分区内预聚合,然后再分组再最终聚合

而groupByKey只能分组,需要先分组(更多的网络IO),然后在Driver上再聚合

分组 + 聚合,首选reduceByKey,数据量越大,groupByKey的优势越大

总结

  • RDD创建有哪几种方法?
    • 通过并行化集合的方式(本地集合转分布式集合)
    • 或者读取数据的方式创建(TextFile\WholeTextFile)
  • RDD分区数如何查看?
    • 通过 getNumPartitions API 查看, 返回值Int
  • Transformation 和 Action的区别?
    • 转换算子的返回值100%是RDD, 而Action算子的返回值100%不是RDD.
    • 转换算子是懒加载的, 只有遇到Action才会执行. Action就是转换算子处理链条的开关.
  • 哪两个Action算子的结果不经过Driver, 直接输出?
    • foreach 和 saveAsTextFile 直接由Executor执行后输出
    • 不会将结果发送到Driver上去
  • reduceByKey 和 groupByKey的区别?
    • reduceByKey自带聚合逻辑, groupByKey不带
    • 如果做数据聚合reduceByKey的效率更好, 因为可以先聚合后shuffle再最终聚合, 传输的IO小
  • mapPartitions 和 foreachPartition 的区别?
    • mapPartitions 带有返回值 foreachPartition不带
  • 对于分区操作有什么要注意的地方?
    • 尽量不要增加分区, 可能破坏内存迭代的计算管道

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区