" name="sm-site-verification"/>
侧边栏壁纸
博主头像
PySuper博主等级

千里之行,始于足下

  • 累计撰写 203 篇文章
  • 累计创建 14 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

SparkCore 之 RDD案例分析

PySuper
2022-05-07 / 0 评论 / 0 点赞 / 29 阅读 / 5845 字
温馨提示:
所有牛逼的人都有一段苦逼的岁月。 但是你只要像SB一样去坚持,终将牛逼!!! ✊✊✊

jieba 库

import jieba

content = "今天天气真好!"

result_1 = jieba.cut(content, True)
print(result_1)

result_2 = jieba.cut(content, False)
print(list(result_2))

# 搜索引擎模式,等同于 允许二次组合的场景
result_3 = jieba.cut_for_search(content)
print(",".join(result_3))

搜索引擎日志分析

main.py

#!/usr/bin/env python3.8
# -*- coding: UTF-8 -*-
# FileName :01_jieba.py
# Author   :zheng xingtao
# Date     :2022/4/4 8:58


import os
from operator import add

from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel

from example_utils import *

os.environ['JAVA_HOME'] = '/hadoop/software/jdk1.8.0_241'
# os.environ["HADOOP_CONF_DIR"] = "/hadoop/software/hadoop-3.3.0/etc/hadoop"

conf = SparkConf().setAppName("test-main").setMaster("local[*]")
sc = SparkContext(conf=conf)

# 读取数据文件
file_rdd = sc.textFile("../00_data/input/SogouQ.txt")

# 对数据进行切分
split_rdd = file_rdd.map(lambda x: x.split("\t"))

# 因为做多个需求,split_rdd 作为基础的RDD,会被多次使用
split_rdd.persist(StorageLevel.DISK_ONLY)

# TODO: 需求1:用户搜索的关键“词”分析
# 主要分析热点词
# 将所有的搜索内容取出

# 先随机取出几条数据查看
# print(split_rdd.takeSample(True, 3))

# 取出所有数据内容
context_rdd = split_rdd.map(lambda x: x[2])

# 对搜索的内容进行分词分析
word_rdd = context_rdd.flatMap(context_jieba)

# print(word_rdd.collect())
# 院校 帮 ==> 院校帮
# 博学 谷 ==> 博学谷
# 传智播 客 ==> 传智播客
filtered_rdd = word_rdd.filter(filter_word)

# 转换关键词
final_word_rdd = filtered_rdd.map(append_word)

# 对单词进行 分组 聚合 排序,求出前5名
result_1 = final_word_rdd.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False, numPartitions=1).take(5)
print(f"需求1结果:{result_1}")

# TODO:需求2:用户和关键词组合分析
# 1、我喜欢传智播客
# 1+我 1+喜欢 1+传智播客
user_content_rdd = split_rdd.map(lambda x: (x[1], x[2]))

# 对用户的搜索内容进行分词,分词后和用户ID再次组合
user_word_with_one_rdd = user_content_rdd.flatMap(extract_user_and_word)

# 对内容进行:分组 聚合 排序 求前5
result_2 = user_word_with_one_rdd.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False, numPartitions=1).take(5)
print(f"需求2结果:{result_2}")

# TODO: 需求3:热门搜索时间段分析
# 取出所有时间
time_rdd = split_rdd.map(lambda x: x[0])

# 对时间进行处理,只保留时间精度
hour_with_one_rdd = time_rdd.map(lambda x: (x.split(":")[0], 1))

# 分组 聚合 排序
result_3 = hour_with_one_rdd.reduceByKey(add).sortBy(lambda x: x[1], ascending=False, numPartitions=1).collect()
print(f"需求3结果:{result_3}")

example_utils.py

#!/usr/bin/env python3.8
# -*- coding: UTF-8 -*-
# FileName :utils.py
# Author   :zheng xingtao
# Date     :2022/4/4 18:27


import jieba


def context_jieba(data):
    """分词操作"""
    seg = jieba.cut_for_search(data)
    word_list = list()
    for word in seg:
        word_list.append(word)
    return word_list


def filter_word(data):
    """过滤不要的 帮、谷、客"""
    return data not in ["帮", "谷", "客"]


def append_word(data):
    """修订某些关键词的内容"""
    if data == "院校":
        data = "院校帮"
    elif data == "博学":
        data = "博学谷"
    elif data == "传智播":
        data = "传智播客"
    return (data, 1)


def extract_user_and_word(data):
    """传入数据是元组:(1, 我喜欢传智播客)"""
    user_id = data[0]
    content = data[1]

    # 对content进行分词
    words = context_jieba(content)

    return_list = []
    for word in words:
        # 不要忘记过: \谷 \帮 \客
        if filter_word(word):
            return_list.append((user_id + "_" + append_word(word)[0], 1))
    return return_list

提交到集群运行

普通提交

注意:

  • 删除指定master的部分
  • 使用HDFS文件路径
/hadoop/software/spark/bin/spark-submit \
--master yarn \
--py-files ./example_utils.py \
./main.py

榨干集群性能提交

  • 查看集群的资源:
    • 查看CPU有几核:cat /proc/cpuinfo | grep processor | wc -l
    • 查看内存有多大:free -g
  • 这个Spark任务需要多少资源,简单规划:
    • 需要6核CPU
    • 需要12G内存
    • 希望使用6个Executor,每个Executor用1核CPU+2G内存
/hadoop/software/spark/bin/spark-submit \
--master yarn \
--py-files ./example_utils.py \
--executor-memory 2g \
--executor-cores 1 \
--num-executors 6 \
./main.py

总结

  • 为什么要在全部的服务器安装jieba库?
    • 因为YARN是集群运行,
    • Executor可以在所有服务器上执行, 所以每个服务器都需要有jieba库提供支撑
  • 如何尽量提高任务计算的资源?
    • 计算CPU核心和内存量
    • –executor-memory 指定executor内存
    • –executor-cores 指定executor的核心数
    • –num-executors 指定总executor数量

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区