jieba 库
import jieba
content = "今天天气真好!"
result_1 = jieba.cut(content, True)
print(result_1)
result_2 = jieba.cut(content, False)
print(list(result_2))
# 搜索引擎模式,等同于 允许二次组合的场景
result_3 = jieba.cut_for_search(content)
print(",".join(result_3))
搜索引擎日志分析
main.py
#!/usr/bin/env python3.8
# -*- coding: UTF-8 -*-
# FileName :01_jieba.py
# Author :zheng xingtao
# Date :2022/4/4 8:58
import os
from operator import add
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
from example_utils import *
os.environ['JAVA_HOME'] = '/hadoop/software/jdk1.8.0_241'
# os.environ["HADOOP_CONF_DIR"] = "/hadoop/software/hadoop-3.3.0/etc/hadoop"
conf = SparkConf().setAppName("test-main").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 读取数据文件
file_rdd = sc.textFile("../00_data/input/SogouQ.txt")
# 对数据进行切分
split_rdd = file_rdd.map(lambda x: x.split("\t"))
# 因为做多个需求,split_rdd 作为基础的RDD,会被多次使用
split_rdd.persist(StorageLevel.DISK_ONLY)
# TODO: 需求1:用户搜索的关键“词”分析
# 主要分析热点词
# 将所有的搜索内容取出
# 先随机取出几条数据查看
# print(split_rdd.takeSample(True, 3))
# 取出所有数据内容
context_rdd = split_rdd.map(lambda x: x[2])
# 对搜索的内容进行分词分析
word_rdd = context_rdd.flatMap(context_jieba)
# print(word_rdd.collect())
# 院校 帮 ==> 院校帮
# 博学 谷 ==> 博学谷
# 传智播 客 ==> 传智播客
filtered_rdd = word_rdd.filter(filter_word)
# 转换关键词
final_word_rdd = filtered_rdd.map(append_word)
# 对单词进行 分组 聚合 排序,求出前5名
result_1 = final_word_rdd.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False, numPartitions=1).take(5)
print(f"需求1结果:{result_1}")
# TODO:需求2:用户和关键词组合分析
# 1、我喜欢传智播客
# 1+我 1+喜欢 1+传智播客
user_content_rdd = split_rdd.map(lambda x: (x[1], x[2]))
# 对用户的搜索内容进行分词,分词后和用户ID再次组合
user_word_with_one_rdd = user_content_rdd.flatMap(extract_user_and_word)
# 对内容进行:分组 聚合 排序 求前5
result_2 = user_word_with_one_rdd.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False, numPartitions=1).take(5)
print(f"需求2结果:{result_2}")
# TODO: 需求3:热门搜索时间段分析
# 取出所有时间
time_rdd = split_rdd.map(lambda x: x[0])
# 对时间进行处理,只保留时间精度
hour_with_one_rdd = time_rdd.map(lambda x: (x.split(":")[0], 1))
# 分组 聚合 排序
result_3 = hour_with_one_rdd.reduceByKey(add).sortBy(lambda x: x[1], ascending=False, numPartitions=1).collect()
print(f"需求3结果:{result_3}")
example_utils.py
#!/usr/bin/env python3.8
# -*- coding: UTF-8 -*-
# FileName :utils.py
# Author :zheng xingtao
# Date :2022/4/4 18:27
import jieba
def context_jieba(data):
"""分词操作"""
seg = jieba.cut_for_search(data)
word_list = list()
for word in seg:
word_list.append(word)
return word_list
def filter_word(data):
"""过滤不要的 帮、谷、客"""
return data not in ["帮", "谷", "客"]
def append_word(data):
"""修订某些关键词的内容"""
if data == "院校":
data = "院校帮"
elif data == "博学":
data = "博学谷"
elif data == "传智播":
data = "传智播客"
return (data, 1)
def extract_user_and_word(data):
"""传入数据是元组:(1, 我喜欢传智播客)"""
user_id = data[0]
content = data[1]
# 对content进行分词
words = context_jieba(content)
return_list = []
for word in words:
# 不要忘记过: \谷 \帮 \客
if filter_word(word):
return_list.append((user_id + "_" + append_word(word)[0], 1))
return return_list
提交到集群运行
普通提交
注意:
- 删除指定master的部分
- 使用HDFS文件路径
/hadoop/software/spark/bin/spark-submit \
--master yarn \
--py-files ./example_utils.py \
./main.py
榨干集群性能提交
- 查看集群的资源:
- 查看CPU有几核:cat /proc/cpuinfo | grep processor | wc -l
- 查看内存有多大:free -g
- 这个Spark任务需要多少资源,简单规划:
- 需要6核CPU
- 需要12G内存
- 希望使用6个Executor,每个Executor用1核CPU+2G内存
/hadoop/software/spark/bin/spark-submit \
--master yarn \
--py-files ./example_utils.py \
--executor-memory 2g \
--executor-cores 1 \
--num-executors 6 \
./main.py
总结
- 为什么要在全部的服务器安装jieba库?
- 因为YARN是集群运行,
- Executor可以在所有服务器上执行, 所以每个服务器都需要有jieba库提供支撑
- 如何尽量提高任务计算的资源?
- 计算CPU核心和内存量
- –executor-memory 指定executor内存
- –executor-cores 指定executor的核心数
- –num-executors 指定总executor数量
评论区