代码实战
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# FileName :01_word_count.py
# Author :zheng xingtao
# Date :2022/3/11 13:35
import os
from pyspark import SparkConf, SparkContext
# 设置环境变量,解决无法找到Java
os.environ['JAVA_HOME'] = '/hadoop/software/jdk1.8.0_241'
def work_count():
"""
单词计数,读取HDFS上的words.txt文件
:return: 对其内部的单词统计出现的数量
"""
conf = SparkConf().setMaster("local[*]").setAppName("word_count")
# 通过 SparkConf 对象构建 SoarkContext 对象
sc = SparkContext(conf=conf)
# 读取文件
# HDFS文件
# file_count = sc.textFile("hdfs://node-211:8020/zheng/words.txt")
# 本地文件
file_count = sc.textFile("./data/input/words.txt")
# 将单词进行切割,得到一个存储全部单词的集合对象
words_count = file_count.flatMap(lambda line: line.split(" "))
# 将单词转换成元组对象,key-单词,value-数字
words_with_one_count = words_count.map(lambda x: (x, 1))
# 将元组的value 按照key来分组,对所有的value执行聚合操作(相加)
result_rdd = words_with_one_count.reduceByKey(lambda a, b: a + b)
# 通过collect方法手机RDD的数据输出
print(result_rdd.collect())
if __name__ == '__main__':
work_count()
流程解析
集群执行
- 在代码中不要设置master,如果设置了,会以代码为准,spark-submit工具的设置就无效了
- 提交程序到集群中的时候,读取的文件一定是各个机器都能访问到的地址,如:HDFS
# 以客户端提交
# 在Local下执行
bin/spark-submit --master local[*] /root/project/PySpark/01_word_count.py
# 在yarn中执行
bin/spark-submit --master yarn /root/project/PySpark/01_word_count.py
流程详解
分布式代码执行分析
集群角色
- 资源管理层面
- Master(ResourceManager):集群大管家,整个集群的资源管理和分配
- Worker(NodeManager):单个机器的管家,负责在单个服务器上提供运行容器,管理当前机器的资源
- 任务执行层面
- Driver
- 单个 Spark 任务的管理者
- 管理 Executor 的任务执行和任务分解分配,类似YARN的 ApplicationMaster
- Executor:具体干活的进程,Spark的工作任务(Task)都由Executor来负责执行
- Driver
代码执行
Python On Spark
- Driver :Py4j模块将Python代码翻译成JVM代码
- Executor:pyspark守护进程做中转站
总结
- 分布式代码执行的重要特征是什么?
- 代码在集群上运行,是被分布式运行的
- 在Spark中,非任务处理部分由Driver执行(非RDD代码),任务处理部分由Executor执行(RDD代码)
- Executor的数量可以很多,所以任务的计算是分布式在运行的
- PySpark的架构体系
- Driver端由JVM 执行
- Executor 端由 JVM 做命令转发,底层由Python解释器进行工作
评论区