2021-11 Spark 课程设计 | 笔记
引言
题目1
题目1:针对电影数据集,进行Spark中的RDD编程,具体要求如下:
1) 求取并显示 电影的总数量
2) 求取并显示 用户评价的总数量
3) 求取并显示 电影的总类数
4) 求取并显示电影 平均评分为5分的电影数
5) 求取并显示电影 平均评分大于4分的电影数
6) 求取并显示 每个用户的平均评分值
7) 求取并显示 每部电影的评价次数
python
注意:此种方法并不会去重,如下
平均值: 方法2 ( python )
好处:已去重
失败: 报错
平均值: 方法3 ( python )
好处: 已去重
完整代码
报错:
题目2
题目2:根据已给的一个文本
从上述结果可以看到模型的预测准确率为 0.9以及训练的决策树模型结构。
Q&A
补充
利用Spark SQL 完成数据的查询与统计
ee
发现在 $SPARK_HOME/conf 目录下,有一个 log4j.properties.template
然后,修改下面的这一个行:
改成:
再次运行时,不再出现大量的INFO 信息了。
失败: 还是会报错
Spark 中 ImportError: No module named numpy
参考:
spark中ImportError: No module named numpy原因和解决方法_DayOneMore的博客-CSDN博客
参考
感谢帮助!
独家 | PySpark和SparkSQL基础:如何利用Python编程执行Spark(附代码) - 云+社区 - 腾讯云
1.求取并显示 电影的总数量
1 | #coding=utf-8 |

2.求取并显示 用户评价的总数量
1 | #coding=utf-8 |

3.求取并显示 电影的总类数
1 | #coding=utf-8 |

4.求取并显示电影 平均评分为5分的电影数
1 | #coding=utf-8 |

5.求取并显示电影 平均评分大于4分的电影数
1 | #coding=utf-8 |

6.求取并显示 每个用户的平均评分值
参考: 新手向,从用Spark求平均值到reduceByKey详解_Tai_Park-CSDN博客 rdd利用reducebykey计算平均值_柯希莫的博客-CSDN博客 pyspark计算最大值、最小值、平均值 - skaarl - 博客园 ratings.csv userId, rating reduceByKey key: userId 平均值 方法1 ( scala )1 | rating_rdd.reduceByKey((x,y) => (x._1 + y._1,x._2 + y._2)) |
1 | rating_rdd.reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1])).map(lambda a: (a[0], a[1][0] / a[1][1])).sortByKey() |

1 | rating_rdd.groupByKey().mapValues(list).map(lambda x: (x[0], sum(x[1])/len(x[1]))) |
1 | rating_rdd.combineByKey(lambda x: (x, 1), |
1 | #coding=utf-8 |
1 | TypeError: unsupported operand type(s) for /: 'unicode' and 'int' |
7.求取并显示 每部电影的评价次数
参考: pyspark单词计数_醉糊涂仙的博客-CSDN博客_pyspark 单词统计 countByKey 每部电影 分组,对评价条数计数1 | #coding=utf-8 |
-
写一个程序,随机的产生单词
基于此程序, 编写Spark Streaming程序
-
统计每个时间段(自定)每个单词出现的总次数
统计每个时间段,所有单词出现的总次数
统计每个时间段的热词
1.写一个程序C,随机的产生单词
2.基于程序C, 编写Spark Streaming程序
2.1 统计每个时间段(自定)每个单词出现的总次数
1 | from pyspark import SparkContext |
2.2 统计每个时间段,所有单词出现的总次数
2.3 统计每个时间段的热词
题目3 题目3:根据给定的文本集-
利用Spark SQL 完成数据的查询与统计
利用SparkSQL 对数据的特征进行删减
利用Spark 的Kmeans算法,编写程序对数据进行聚类并调优
-
要求掌握TF-IDF的计算原理,并在程序中显示其值
要求能够调整Kmeans的相关参数,并分析出参数对最终结果的影响
-
利用Spark SQL 完成数据的查询与统计
利用SparkSQL 对数据的特征进行删减
利用Spark 的决策树算法,编写程序对数据进行分类并调优
-
要求能够调整决策树的相关参数,并分析出参数对最终结果的影响
1. 利用Spark SQL 完成数据的查询与统计
1 | from pyspark.ml.linalg import Vector,Vectors |

2. 利用SparkSQL 对数据的特征进行删减
3. 利用Spark 的决策树算法,编写程序对数据进行分类并调优
3.1 要求能够调整决策树的相关参数,并分析出参数对最终结果的影响
参考: Spark2 ML包之决策树分类Decision tree classifier详细解说_MONKEYMOMO的博客-CSDN博客1 | #coding=utf-8 |

加载包
1 | from __future__ import print_function |
1 | conf = SparkConf().setAppName(“abc”) |
手工创建一个DataFrame
1 | d = [{'name': 'Alice', 'age': 1},{'name': 'Bob', 'age': 5}] |
数据探索
展示
1 | df.show() # 不加参数默认展示前20行 |
统计行数
1 | df.count() |
查看schema
1 | df.printSchema() |
查看字段
1 | df.columns |
查看字段类型
1 | df.dtypes |
数据处理
查询
1 | df.select('age','name') # 带show才能看到结果 |
别名
1 | df.select(df.age.alias('age_value'),'name') |
筛选
1 | df.filter(df.name=='Alice') |
增加列
增加列有2种方法,一种是基于现在的列计算;一种是用pyspark.sql.functions的lit()增加常数列。1 | df.select(df.age+1,'age','name') |
增加行
1 | df.unionAll(df2) |
删除重复记录
1 | df.drop_duplicates() |
去重
1 | df.distinct() |
删除列
1 | df.drop('id') |
删除存在缺失值的记录
1 | df.dropna(subset=['age', 'name']) # 传入一个list,删除指定字段中存在缺失的记录 |
填补缺失值
1 | df.fillna({'age':10,'name':'abc'}) # 传一个dict进去,对指定的字段填充 |
分组计算
1 | df.groupby('name').agg(F.max(df['age'])) |
join
1 | df.groupby('name').agg(F.max(df['age'])) |
[Spark]如何设置使得spark程序不输出 INFO级别的内容
参考: Spark 通过 spark-submit 设置日志级别 - 简书 Spark程序在运行的时候,总是输出很多INFO级别内容
1 | cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties |

1 | log4j.rootCategory=INFO, console |
1 | log4j.rootCategory=ERROR, console |
UnicodeEncodeError: 'ascii' codec can't encode character u'9' in position 9: ordinal not in range(128)
参考: 为什么有时候必须添加sys.setdefaultencoding('utf-8')_crazyhacking的专栏-CSDN博客1 | import sys |
ImportError: No module named numpy
参考: 技术|如何在 Ubuntu 上安装 pip1 | apt install python3-pip |
1 | apt-get update |
NameError: name 'spark' is not defined
参考 pyspark : NameError: name 'spark' is not defined_Solar's Blog-CSDN博客1 | from pyspark.context import SparkContext |