2021-11 Spark 课程设计 | 笔记

引言

题目1

题目1:针对电影数据集,进行Spark中的RDD编程,具体要求如下:

1) 求取并显示 电影的总数量

2) 求取并显示 用户评价的总数量

3) 求取并显示 电影的总类数

4) 求取并显示电影 平均评分为5分的电影数

5) 求取并显示电影 平均评分大于4分的电影数

6) 求取并显示 每个用户的平均评分值

7) 求取并显示 每部电影的评价次数

1.求取并显示 电影的总数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#coding=utf-8

from __future__ import print_function

import sys
import time
import re
from datetime import datetime

from pyspark import SparkContext


def get_column(line, column_index1,column_index2):
if 'userId' in line or 'movieId' in line:
return (-1,-1)
parts = line.split(',')
return (int(parts[column_index1]), parts[column_index2])

if __name__ == "__main__":
# movies
sc = SparkContext(appName="movies")

movie_id_index_movie = 0
movie_title_index = 1
movie_lines = sc.textFile("/data-1/movies.csv")
movie_rdd = movie_lines.map(lambda x: get_column(x, movie_id_index_movie ,movie_title_index))

print("movies.csv: \n")
print(movie_rdd.take(5))
print("\n")

movie_count = movie_rdd.count()
print("电影总数量: %s" % movie_count)
print("\n")

sc.stop()

2.求取并显示 用户评价的总数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#coding=utf-8

from __future__ import print_function

import sys
import time
import re
from datetime import datetime

from pyspark import SparkContext


def get_column(line, column_index1,column_index2):
if 'userId' in line or 'movieId' in line:
return (-1,-1)
parts = line.split(',')
return (int(parts[column_index1]), parts[column_index2])

if __name__ == "__main__":
# ratings
sc = SparkContext(appName="ratings")

movie_id_index_rating = 1
rating_index = 2
rating_lines = sc.textFile("/data-1/ratings.csv")
rating_rdd = rating_lines.map(lambda x: get_column(x, movie_id_index_rating ,rating_index))

print("ratings.csv: \n")
print(rating_rdd.take(5))
print("\n")

rating_count = rating_rdd.count()
print("用户评价的总数量: %s" % rating_count)
print("\n")

sc.stop()

3.求取并显示 电影的总类数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#coding=utf-8

from __future__ import print_function

import sys
import time
import re
from datetime import datetime

from pyspark import SparkContext

# import sys
# reload(sys)
# sys.setdefaultencoding('utf-8')


def get_column(line, column_index1,column_index2):
if 'userId' in line or 'movieId' in line:
return (-1,-1)
parts = line.split(',')
return (int(parts[column_index1]), parts[column_index2])

#genres_list = []

def genres_count(line):
if -1 in line: # 跳过 (-1,-1)
return

global genres_list

#print(line)
genres = line[1]
#print(genres)
genres_list = genres_list + genres.split("|")
# 去重
genres_list = list(set(genres_list))

print("电影的总类数: %s" % len(genres_list))
print("\n")


if __name__ == "__main__":
genres_list = []

# movies
sc = SparkContext(appName="movies")

movie_id_index_movie = 0
movie_genres_index = 2
movie_lines = sc.textFile("/data-1/movies.csv")
movie_rdd = movie_lines.map(lambda x: get_column(x, movie_id_index_movie ,movie_genres_index))

movie_rdd.foreach(genres_count)

# TODO: Global 失效,这里为0
# movie_genres_count = len(genres_list)

# print("电影的总类数: %s" % movie_genres_count)
# print("\n")

sc.stop()

4.求取并显示电影 平均评分为5分的电影数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#coding=utf-8

from __future__ import print_function

import sys
import time
import re
from datetime import datetime

from pyspark import SparkContext


def get_column(line, column_index1,column_index2):
if 'userId' in line or 'movieId' in line:
return (-1,-1)
parts = line.split(',')
return (int(parts[column_index1]), parts[column_index2])

def filter_rating(line, column_index,threshold):
if 'userId' in line or 'movieId' in line:
return False
parts = line.split(',')
part = parts[column_index]
if float(part) >= threshold:
return True
else:
return False

def seq_op(r, rating):
total_rating = r[0] + float(rating)
count = r[1] + 1
return (total_rating, count)

def comb_op(p1,p2):
return (p1[0]+p2[0],p1[1]+p2[1])

def my_avg(x):
(key, (total, count)) = x
return (key,total/count)

if __name__ == "__main__":
# ratings
sc = SparkContext(appName="movies")

movie_id_index_rating = 1
rating_index = 2
# 电影 平均评分为 5
threshold = 5

rating_lines = sc.textFile("/data-1/ratings.csv")
rating_rdd = rating_lines.map(lambda x: get_column(x, movie_id_index_rating ,rating_index))

#print(rating_rdd.take(5))

agg_rating_rdd = rating_rdd.aggregateByKey ((0, 0), seq_op , comb_op)

#print(agg_rating_rdd.take(5))

avg_rating_rdd = agg_rating_rdd.map(lambda x: my_avg(x))
# 电影 平均评分为 5
filtered_rating_rdd = avg_rating_rdd.filter(lambda x: x[1] == threshold)

#print(filtered_rating_rdd.take(5))

# movies
movie_id_index_movie = 0
movie_title_index = 1
movie_lines = sc.textFile("/data-1/movies.csv")
movie_rdd = movie_lines.map(lambda x: get_column(x, movie_id_index_movie ,movie_title_index))

#print(movie_rdd.take(5))

result_rdd = movie_rdd.join(filtered_rating_rdd)

print(result_rdd.take(5))

result_rdd_count = result_rdd.count()
print("电影 平均评分为5分的电影数 %s" % result_rdd_count)

sc.stop()

5.求取并显示电影 平均评分大于4分的电影数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#coding=utf-8

from __future__ import print_function

import sys
import time
import re
from datetime import datetime

from pyspark import SparkContext


def get_column(line, column_index1,column_index2):
if 'userId' in line or 'movieId' in line:
return (-1,-1)
parts = line.split(',')
return (int(parts[column_index1]), parts[column_index2])

def filter_rating(line, column_index,threshold):
if 'userId' in line or 'movieId' in line:
return False
parts = line.split(',')
part = parts[column_index]
if float(part) >= threshold:
return True
else:
return False

def seq_op(r, rating):
total_rating = r[0] + float(rating)
count = r[1] + 1
return (total_rating, count)

def comb_op(p1,p2):
return (p1[0]+p2[0],p1[1]+p2[1])

def my_avg(x):
(key, (total, count)) = x
return (key,total/count)

if __name__ == "__main__":
# ratings
sc = SparkContext(appName="movies")

movie_id_index_rating = 1
rating_index = 2
# 电影 平均评分大于4分
threshold = 4

rating_lines = sc.textFile("/data-1/ratings.csv")
rating_rdd = rating_lines.map(lambda x: get_column(x, movie_id_index_rating ,rating_index))

#print(rating_rdd.take(5))

agg_rating_rdd = rating_rdd.aggregateByKey ((0, 0), seq_op , comb_op)

#print(agg_rating_rdd.take(5))

avg_rating_rdd = agg_rating_rdd.map(lambda x: my_avg(x))
# 电影 平均评分大于4分
filtered_rating_rdd = avg_rating_rdd.filter(lambda x: x[1] > threshold)

#print(filtered_rating_rdd.take(5))

# movies
movie_id_index_movie = 0
movie_title_index = 1
movie_lines = sc.textFile("/data-1/movies.csv")
movie_rdd = movie_lines.map(lambda x: get_column(x, movie_id_index_movie ,movie_title_index))

#print(movie_rdd.take(5))

result_rdd = movie_rdd.join(filtered_rating_rdd)

print(result_rdd.take(5))

result_rdd_count = result_rdd.count()
print("电影 平均评分大于4分的电影数 %s" % result_rdd_count)

sc.stop()

6.求取并显示 每个用户的平均评分值

参考:

ratings.csv

userId, rating

reduceByKey

key: userId

平均值

方法1 ( scala )

1
2
rating_rdd.reduceByKey((x,y) => (x._1 + y._1,x._2 + y._2))
.map(a => (a._1,a._2._1 / a._2._2)).sortByKey()

python

1
rating_rdd.reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1])).map(lambda a: (a[0], a[1][0] / a[1][1])).sortByKey()

注意:此种方法并不会去重,如下

平均值: 方法2 ( python )

好处:已去重

失败: 报错

1
rating_rdd.groupByKey().mapValues(list).map(lambda x: (x[0], sum(x[1])/len(x[1])))

平均值: 方法3 ( python )

好处: 已去重

1
2
3
rating_rdd.combineByKey(lambda x: (x, 1),
lambda x, y: (x[0]+y, x[1]+1),
lambda x, y: (x[0]+y[0], x[1]+y[1])).map(lambda x: (x[0], x[1][0]/x[1][1]))

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#coding=utf-8

from __future__ import print_function

import sys
import time
import re
from datetime import datetime

from pyspark import SparkContext

def get_column(line, column_index1,column_index2):
if 'userId' in line or 'movieId' in line:
return (-1,-1)
parts = line.split(',')
return (int(parts[column_index1]), parts[column_index2])

if __name__ == "__main__":
# ratings
sc = SparkContext(appName="ratings")

userId_index = 0
rating_index = 2
rating_lines = sc.textFile("/data-1/ratings.csv")
rating_rdd = rating_lines.map(lambda x: get_column(x, userId_index ,rating_index))

#avg_rating_rdd = rating_rdd.reduceByKey(lambda x,y: (x[1] + y[1], x[2] + y[2])).map(lambda a: (a[1], a[2][1] / a[2][2])).sortByKey()
avg_rating_rdd = rating_rdd.combineByKey(lambda x: (x, 1),
lambda x, y: (x[0]+y, x[1]+1),
lambda x, y: (x[0]+y[0], x[1]+y[1])).map(lambda x: (x[0], x[1][0]/x[1][1]))

print("每个用户的平均评分值: \n")
print(avg_rating_rdd.collect())

sc.stop()

报错:

1
TypeError: unsupported operand type(s) for /: 'unicode' and 'int'

7.求取并显示 每部电影的评价次数

参考:

countByKey

每部电影 分组,对评价条数计数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#coding=utf-8

from __future__ import print_function

import sys
import time
import re
from datetime import datetime

from pyspark import SparkContext

def get_column_count(line, column_index1):
if 'userId' in line or 'movieId' in line:
return (-1,-1)
parts = line.split(',')
return (int(parts[column_index1]), 1)

def get_column(line, column_index1,column_index2):
if 'userId' in line or 'movieId' in line:
return (-1,-1)
parts = line.split(',')
return (int(parts[column_index1]), parts[column_index2])

if __name__ == "__main__":
sc = SparkContext(appName="ratings")

# movies
movieId_index = 0
movie_title_index = 1
movie_lines = sc.textFile("/data-1/movies.csv")
movie_rdd = rating_lines.map(lambda x: get_column(x, movieId_index, movie_title_index))

# ratings
movieId_index = 1
rating_lines = sc.textFile("/data-1/ratings.csv")
# [(movieId, 1), ...]
rating_rdd = rating_lines.map(lambda x: get_column_count(x, movieId_index))

# movie_rating_count_rdd
# key: movieId, 按 moviedId 计数
movie_rating_count_rdd = rating_rdd.reduceByKey(lambda a, b: a + b)

# join: result_rdd
result_rdd = movie_rating_count_rdd.join(movie_rdd)

print("每部电影的评价次数: \n")
print(result_rdd.collect())

sc.stop()

题目2

题目2:根据已给的一个文本

  1. 写一个程序,随机的产生单词

  2. 基于此程序, 编写Spark Streaming程序

  1. 统计每个时间段(自定)每个单词出现的总次数

  2. 统计每个时间段,所有单词出现的总次数

  3. 统计每个时间段的热词

参考:

1.写一个程序C,随机的产生单词

2.基于程序C, 编写Spark Streaming程序

2.1 统计每个时间段(自定)每个单词出现的总次数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# syan改为相应的主机名,SparkStreaming是自定义的程序名字
sc = SparkContext(appName='SparkStreaming')

# 生成流计算上下文,30秒计算/批处理一次
ssc = StreamingContext(sc,30)

# IP对应自己的地址,9999是监听端口
# 绑定数据端口9999(从socker中获取数据,即获取数据)
lines = ssc.socketTextStream("127.0.0.1",9999)

# 解析单词出现的频率
counts = lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)

# 打印输出到控制台
counts.pprint()

ssc.start();ssc.awaitTermination()

2.2 统计每个时间段,所有单词出现的总次数

2.3 统计每个时间段的热词

题目3

题目3:根据给定的文本集

  1. 利用Spark SQL 完成数据的查询与统计

  2. 利用SparkSQL 对数据的特征进行删减

  3. 利用Spark 的Kmeans算法,编写程序对数据进行聚类并调优

  1. 要求掌握TF-IDF的计算原理,并在程序中显示其值

  2. 要求能够调整Kmeans的相关参数,并分析出参数对最终结果的影响

参考:

题目4

题目4:根据给定的数据集

  1. 利用Spark SQL 完成数据的查询与统计

  2. 利用SparkSQL 对数据的特征进行删减

  3. 利用Spark 的决策树算法,编写程序对数据进行分类并调优

  1. 要求能够调整决策树的相关参数,并分析出参数对最终结果的影响

参考:

1. 利用Spark SQL 完成数据的查询与统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from pyspark.ml.linalg import Vector,Vectors
from pyspark.sql import Row
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer,VectorIndexer


# 读取数据,简要分析

def f(x):
rel = {}
rel['features'] = Vectors.dense(float(x[0]),float(x[1]),float(x[2]),float(x[3]))
rel['label'] = str(x[4])
return rel


from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

data = spark.sparkContext.textFile("/data-4/iris.txt").map(lambda line: line.split(',')).map(lambda p: Row(**f(p))).toDF()

data.createOrReplaceTempView("iris")

df = spark.sql("select * from iris")

rel = df.rdd.map(lambda t : str(t[1])+":"+str(t[0])).collect()
for item in rel:
print(item)

df_temp_1 = spark.sql("select count(*) from iris group by label")
df_temp_1.show()

2. 利用SparkSQL 对数据的特征进行删减

3. 利用Spark 的决策树算法,编写程序对数据进行分类并调优

3.1 要求能够调整决策树的相关参数,并分析出参数对最终结果的影响

参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#coding=utf-8

from pyspark.ml.linalg import Vector,Vectors
from pyspark.sql import Row
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer,VectorIndexer

def f(x):
rel = {}
rel['features'] = Vectors.dense(float(x[0]),float(x[1]),float(x[2]),float(x[3]))
rel['label'] = str(x[4])
return rel


from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

# 读取数据, 简要分析
data = spark.sparkContext.textFile("/data-4/iris.txt").map(lambda line: line.split(',')).map(lambda p: Row(**f(p))).toDF()

data.createOrReplaceTempView("iris")

df = spark.sql("select * from iris")


# 进一步处理特征和标签,以及数据分组

## 分别获取标签列和特征列,进行索引,并进行了重命名。
labelIndexer = StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)

featureIndexer = VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(df)

## 这里我们设置一个labelConverter,目的是把预测的类别重新转化成字符型的。
labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
## 接下来,我们把数据集随机分成训练集和测试集,其中训练集占70%。
trainingData, testData = data.randomSplit([0.7, 0.3])

# 构建决策树分类模型

## 导入所需要的包
from pyspark.ml.classification import DecisionTreeClassificationModel,DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
## 训练决策树模型,这里我们可以通过 setter 的方法来设置决策树的参数,也可以用ParamMap来设置(具体的可以查看spark mllib的官网)。具体的可以设置的参数可以通过explainParams()来获取。
dtClassifier = DecisionTreeClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
dtClassifier = dtClassifier.setImpurity( "entropy" ) # 不纯度
.setMaxBins( 100 ) # 离散化"连续特征"的最大划分数
.setMaxDepth( 5 ) # 树的最大深度
.setMinInfoGain( 0.01 ) # 一个节点分裂的最小信息增益,值为[0,1]
.setMinInstancesPerNode( 10 ) # 每个节点包含的最小样本数
.setSeed( 123456 )

## 在pipeline中进行设置
pipelinedClassifier = Pipeline().setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])
## 训练决策树模型
modelClassifier = pipelinedClassifier.fit(trainingData)
## 进行预测
predictionsClassifier = modelClassifier.transform(testData)
## 查看部分预测的结果
predictionsClassifier.select("predictedLabel", "label", "features").show(20)

# 评估决策树分类模型

evaluatorClassifier = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")

accuracy = evaluatorClassifier.evaluate(predictionsClassifier)

print("Test Error = " + str(1.0 - accuracy))


treeModelClassifier = modelClassifier.stages[2]

print("Learned classification tree model:\n" + str(treeModelClassifier.toDebugString))



从上述结果可以看到模型的预测准确率为 0.9以及训练的决策树模型结构。

Q&A

补充

利用Spark SQL 完成数据的查询与统计

加载包
1
2
3
4
5
6
7
8
from __future__ import print_function
import pandas as pd
from pyspark.sql import HiveContext
from pyspark import SparkContext,SparkConf
from sqlalchemy import create_engine
import datetime
import pyspark.sql.functions as F
12345671234567
1
2
3
conf = SparkConf().setAppName(“abc”)
sc = SparkContext(conf=conf)
hiveCtx = HiveContext(sc)
手工创建一个DataFrame
1
2
3
d = [{'name': 'Alice', 'age': 1},{'name': 'Bob', 'age': 5}]
df = sqlContext.createDataFrame(d)
df.show()

ee

数据探索
展示
1
2
df.show() # 不加参数默认展示前20行
11
统计行数
1
2
df.count() 
11
查看schema
1
2
df.printSchema() 
11
查看字段
1
2
df.columns
11
查看字段类型
1
2
df.dtypes
11

数据处理
查询
1
2
df.select('age','name') # 带show才能看到结果
11
别名
1
2
df.select(df.age.alias('age_value'),'name')
11
筛选
1
2
df.filter(df.name=='Alice')
11
增加列

增加列有2种方法,一种是基于现在的列计算;一种是用pyspark.sql.functions的lit()增加常数列。

1
2
3
df.select(df.age+1,'age','name')
df.select(F.lit(0).alias('id'),'age','name')
12
增加行
1
2
df.unionAll(df2)
11
删除重复记录
1
2
df.drop_duplicates()
11
去重
1
2
df.distinct()
11
删除列
1
2
df.drop('id')
1
删除存在缺失值的记录
1
2
df.dropna(subset=['age', 'name'])  # 传入一个list,删除指定字段中存在缺失的记录
1
填补缺失值
1
2
df.fillna({'age':10,'name':'abc'})  # 传一个dict进去,对指定的字段填充
1
分组计算
1
2
df.groupby('name').agg(F.max(df['age']))
1
join
1
2
df.groupby('name').agg(F.max(df['age']))
1

[Spark]如何设置使得spark程序不输出 INFO级别的内容

参考:

Spark程序在运行的时候,总是输出很多INFO级别内容

发现在 $SPARK_HOME/conf 目录下,有一个 log4j.properties.template

1
2
cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties
vim $SPARK_HOME/conf/log4j.properties

然后,修改下面的这一个行:

1
log4j.rootCategory=INFO, console

改成:

1
log4j.rootCategory=ERROR, console

再次运行时,不再出现大量的INFO 信息了。

UnicodeEncodeError: 'ascii' codec can't encode character u'9' in position 9: ordinal not in range(128)

参考:

1
2
3
import sys
reload(sys) # Python2.5 + 初始化后会删除 sys.setdefaultencoding 这个方法,我们需要重新载入
sys.setdefaultencoding('utf-8')

失败: 还是会报错

ImportError: No module named numpy

参考:

1
apt install python3-pip

Spark 中 ImportError: No module named numpy

参考:

1
2
apt-get update
apt-get install python-numpy

NameError: name 'spark' is not defined

参考

1
2
3
4
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

参考

感谢帮助!