基于pyspark分析全球游戏销量

方案规划

平台:Ubuntu 18.04

软件:Hadoop 3.1.3,Spark 2.4.0,Python 3.6.9

首先获取游戏销量数据,然后将数据上传到HDFS,之后使用pyspark进行处理分析,最后使用pyecharts将分析结果可视化。

数据获取

kaggle上获取数据集:Sales Of Video Games

下载的数据集vgsales.csv共有16600行,11列。列名和相应描述如下:

列名列描述
RankRanking of overall sales
NameThe games name (unique values)
PlatformPlatform of the games release
YearYear of the game’s release
GenreGenre of the game
PublisherPublisher of the game
NA_SalesSales in North America (in millions)
EU_SalesSales in Europe (in millions)
JP_SalesSales in Japan (in millions)
Other_SalesSales in the rest of the world (in millions)
Global_SalesTotal worldwide sales (in millions)

数据处理

初步处理

使用pandas读取.csv数据,过滤存在空值的行,并用制表符分隔列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# CsvToTxt.py
import pandas as pd


def has_nan(line) -> bool:
for value in line:
if pd.isnull(value):
return True
return False


cnt = 0
#.csv->.txt
data = pd.read_csv('vgsales.csv')
with open('vgsales.txt','a+',encoding='utf-8') as f:
for line in data.values:
if not has_nan(line): # 过滤空值
f.write(str(line[0]) + '\t' + str(line[1]) + '\t' + str(line[2]) + '\t' + \
str(line[3]) + '\t' + str(line[4]) + '\t' + str(line[5]) + '\t' + \
str(line[6]) + '\t' + str(line[7]) + '\t' + str(line[8]) + '\t' + \
str(line[9]) + '\t' + str(line[10]) + '\n'
)

转化结果保存在vgsales.txt中,使用head -5 vgsales.txt查看前5行:

上传HDFS

  1. start-dfs.sh启动HDFS

  2. hdfs dfs -mkdir vgsales创建目录

  3. hdfs dfs -put vgsales.txt vgsales/上传文件

使用hdfs dfs -cat vgsales/vgssales.txt | head -5查看上传文件的前5行,跟本地的内容一样。

分析数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# analyze.py
from pyspark import SparkConf,SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as func


# 建立相应的数据字段
schema = StructType([
StructField("Rank", IntegerType(), False), # 0 总销量排名
StructField("Name", StringType(), False), # 1 游戏名
StructField("Platform", StringType(), False), # 2 游戏平台
StructField("Year", IntegerType(), False), # 3 发布年份
StructField("Genre", StringType(), False), # 4 类型
StructField("Publisher", StringType(), False), # 5 发行商
StructField("NA_Sales", FloatType(), False), # 6 北美销售额
StructField("EU_Sales", FloatType(), False), # 7 欧洲
StructField("JP_Sales", FloatType(), False), # 8 日本
StructField("Other_Sales", FloatType(), False), # 9 其它
StructField("Global_Sales", FloatType(), False),# 10 全球
])

spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

# 读取HDFS里的文件vgsales.txt
rdd0 = spark.sparkContext.textFile("hdfs://localhost:9000/user/hadoop/vgsales/vgsales.txt")

# 按格式分割数据,将rdd0转换成新的数据集rdd1
rdd1 = rdd0 \
.map(lambda x: x.split('\t')) \
.map(lambda p: Row( \
int(p[0]), p[1], p[2], int(p[3]), p[4], p[5], \
float(p[6]), float(p[7]), float(p[8]), float(p[9]), float(p[10])
)
)

# 基于rdd1建立DataFrame并创建临时视图“SalesData”
schemaSalesData = spark.createDataFrame(rdd1, schema)
schemaSalesData.createOrReplaceTempView("SalesData")


# 处理结果保存路径
def json_path(index):
return "vgsales/results/result%d" % index


# 对应年份发布的所有游戏在各大地区的总销售额
df1 = schemaSalesData \
.groupBy('Year') \
.agg(func.sum("NA_Sales"), func.sum("EU_Sales"), func.sum("JP_Sales"), func.sum("Other_Sales"), func.sum("Global_Sales")) \
.sort(schemaSalesData['Year'].asc()) \
.withColumnRenamed("sum(NA_Sales)", "NA_Sales") \
.withColumnRenamed("sum(EU_Sales)", "EU_Sales") \
.withColumnRenamed("sum(JP_Sales)", "JP_Sales") \
.withColumnRenamed("sum(Other_Sales)", "Other_Sales") \
.withColumnRenamed("sum(Global_Sales)", "Global_Sales")
df1.repartition(1).write.json(json_path(1)) # result下面有_SUCCESS和part-0000-XXXXXX.json


# 所有游戏总销售排名前10的发行商
df2 = spark.sql("select Publisher, sum(Global_Sales) as allSales from SalesData group by Publisher order by allSales desc limit 10")
df2.repartition(1).write.json(json_path(2))

# 全球销售额排名前10的游戏类型
df3 = spark.sql("select Genre, sum(Global_Sales) as allSales from SalesData group by Genre order by allSales desc limit 10")
df3.repartition(1).write.json(json_path(3))

# 全球销售额排名前10的游戏平台
df4 = spark.sql("select Platform, sum(Global_Sales) as allSales from SalesData group by Platform order by allSales desc limit 10")
df4.repartition(1).write.json(json_path(4))

# 北美销售额排名前10的游戏
df5 = spark.sql("select Name, NA_Sales from SalesData order by NA_Sales desc limit 10")
df5.repartition(1).write.json(json_path(5))

# 欧洲销售额排名前10的游戏
df6 = spark.sql("select Name, EU_Sales from SalesData order by EU_Sales desc limit 10")
df6.repartition(1).write.json(json_path(6))

# 日本销售额排名前10的游戏
df7 = spark.sql("select Name, JP_Sales from SalesData order by JP_Sales desc limit 10")
df7.repartition(1).write.json(json_path(7))

# 全球销售额排名前10的游戏
df8 = spark.sql("select Name, Global_Sales from SalesData order by Global_Sales desc limit 10")
df8.repartition(1).write.json(json_path(8))


# 各大地区总销售额
df9 = schemaSalesData \
.agg(func.sum("NA_Sales"), func.sum("EU_Sales"), func.sum("JP_Sales"), func.sum("Other_Sales")) \
.withColumnRenamed("sum(NA_Sales)", "NA_Sales") \
.withColumnRenamed("sum(EU_Sales)", "EU_Sales") \
.withColumnRenamed("sum(JP_Sales)", "JP_Sales") \
.withColumnRenamed("sum(Other_Sales)", "Other_Sales")
df9.repartition(1).write.json(json_path(9))

数据可视化

将结果取回本地

1
2
3
4
5
6
7
8
9
# GetToLocal.sh
hdfs dfs -get /user/hadoop/vgsales/results .

# 对结果文件统一命名
nums=("1" "2" "3" "4" "5" "6" "7" "8" "9")
for num in ${nums[@]}
do
mv results/result$num/*.json results/result$num/part-00000.json
done

ls -R results查看文件:

生成图表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# ViewCharts.py
from pyecharts import options as opts
from pyecharts.charts import Bar
from pyecharts.charts import Line
from pyecharts.components import Table
from pyecharts.charts import WordCloud
from pyecharts.charts import Pie
from pyecharts.charts import Funnel
from pyecharts.charts import Scatter
from pyecharts.charts import PictorialBar
from pyecharts.options import ComponentTitleOpts
from pyecharts.globals import SymbolType
import json


# 处理结果所在路径
def root_path(index):
return "results/result%d/part-00000.json" % index


# 图表存放路径
def render_path(index):
return "results/result%d/result.html" % index


# 对应年份发布的所有游戏在各大地区的总销售额
def drawChart1():
root = root_path(1)
year = []
na_sales = []
eu_sales = []
jp_sales = []
other_sales = []
global_sales = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line:
break
js = json.loads(line)
year.append(str(js['Year']))
na_sales.append(round(float(js['NA_Sales']), 2))
eu_sales.append(round(float(js['EU_Sales']), 2))
jp_sales.append(round(float(js['JP_Sales']), 2))
other_sales.append(round(float(js['Other_Sales']), 2))
global_sales.append(round(float(js['Global_Sales']), 2))
d = (
Bar()
.add_xaxis(year)
.add_yaxis("北美销售额", na_sales, stack="stack1")
.add_yaxis("欧洲销售额", eu_sales, stack="stack2")
.add_yaxis("日本销售额", jp_sales, stack="stack3")
.add_yaxis("其它地区销售额", other_sales, stack="stack4")
.add_yaxis("全球销售额", global_sales, stack="stack4")
.set_series_opts(label_opts=opts.LabelOpts(is_show=False))
.set_global_opts(title_opts=opts.TitleOpts(title="发布年份对应总销售额"))
.render(render_path(1))
)


# 所有游戏总销售排名前10的发行商
def drawChart2():
root = root_path(2)
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line:
break
js = json.loads(line)
row=(str(js['Publisher']), round(float(js['allSales']), 2))
data.append(row)

c = (
WordCloud()
.add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
.set_global_opts(title_opts=opts.TitleOpts(title="全球销售额前10的发行商"))
.render(render_path(2))
)


# 全球销售额排名前10的游戏类型
def drawChart3():
root = root_path(3)
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到EOF终止
break
js = json.loads(line)
row=(str(js['Genre']), round(float(js['allSales']), 2))
data.append(row)

c = (
WordCloud()
.add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
.set_global_opts(title_opts=opts.TitleOpts(title="全球销售额前10的游戏类型"))
.render(render_path(3))
)


# 全球销售额排名前10的游戏平台
def drawChart4():
root = root_path(4)
platform = []
allSales = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line:
break
js = json.loads(line)
platform.insert(0, str(js['Platform']))
allSales.insert(0, round(float(js['allSales']), 2))

c = (
PictorialBar()
.add_xaxis(platform)
.add_yaxis(
"",
allSales,
label_opts=opts.LabelOpts(is_show=False),
symbol_size=18,
symbol_repeat="fixed",
symbol_offset=[0, 0],
is_symbol_clip=True,
symbol=SymbolType.ROUND_RECT,
)
.reversal_axis()
.set_global_opts(
title_opts=opts.TitleOpts(title="销售额前10的平台"),
xaxis_opts=opts.AxisOpts(is_show=False),
yaxis_opts=opts.AxisOpts(
axistick_opts=opts.AxisTickOpts(is_show=False),
axisline_opts=opts.AxisLineOpts(
linestyle_opts=opts.LineStyleOpts(opacity=0)
),
),
)
.render(render_path(4))
)


# 北美销售额排名前10的游戏
def drawChart5():
root = root_path(5)
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到EOF终止
break
js = json.loads(line)
row=(str(js['Name']), round(float(js['NA_Sales']), 2))
data.append(row)

c = (
WordCloud()
.add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
.set_global_opts(title_opts=opts.TitleOpts(title="北美销售额前10的游戏"))
.render(render_path(5))
)


# 欧洲销售额排名前10的游戏
def drawChart6():
root = root_path(6)
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到EOF终止
break
js = json.loads(line)
row=(str(js['Name']), round(float(js['EU_Sales']), 2))
data.append(row)

c = (
WordCloud()
.add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
.set_global_opts(title_opts=opts.TitleOpts(title="欧洲销售额前10的游戏"))
.render(render_path(6))
)


# 日本销售额排名前10的游戏
def drawChart7():
root = root_path(7)
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到EOF终止
break
js = json.loads(line)
row=(str(js['Name']), round(float(js['JP_Sales']), 2))
data.append(row)

c = (
WordCloud()
.add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
.set_global_opts(title_opts=opts.TitleOpts(title="日本销售额前10的游戏"))
.render(render_path(7))
)


# 全球销售额排名前10的游戏
def drawChart8():
root = root_path(8)
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到EOF终止
break
js = json.loads(line)
row=(str(js['Name']), round(float(js['Global_Sales']), 2))
data.append(row)

c = (
WordCloud()
.add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
.set_global_opts(title_opts=opts.TitleOpts(title="全球销售额前10的游戏"))
.render(render_path(8))
)


# 各大地区总销售额占比
def drawChart9():
root = root_path(9)
values = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line:
break
js = json.loads(line)
na_sales = round(float(js['NA_Sales']), 2)
eu_sales = round(float(js['EU_Sales']), 2)
jp_sales = round(float(js['JP_Sales']), 2)
other_sales = round(float(js['Other_Sales']), 2)
all_sales = round(na_sales + eu_sales + jp_sales + other_sales, 2)
values.append(["北美(%)", round(na_sales / all_sales * 100, 2)])
values.append(["欧洲(%)", round(eu_sales / all_sales * 100, 2)])
values.append(["日本(%)", round(jp_sales / all_sales * 100, 2)])
values.append(["其它地区(%)", round(other_sales / all_sales * 100, 2)])
c = (
Pie()
.add("", values)
.set_colors(["red", "orange", "purple", "blue"])
.set_global_opts(title_opts=opts.TitleOpts(title="各大地区销售额占比"))
.set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))
.render(render_path(9))
)


# 主程序:
drawChart1()
drawChart2()
drawChart3()
drawChart4()
drawChart5()
drawChart6()
drawChart7()
drawChart8()
drawChart9()

ls -R results查看生成的图表:

  1. result1

  2. result2

    Nintendo(任天堂[日本])、Electronic(EA[美国])、Activision(动视[美国])、Sony Computer Entertainment(索尼[日本])、Ubisoft(育碧[法国])、Sega(世嘉[日本])、THQ([美国])、Take-Two Interactive(Take-Two互动[美国])、Namco Bandai Games(万代南梦宫[日本])、Konami Digital Entertainment(科乐美[日本])。

  3. result3

    Action(动作类)、Sports(体育类)、Misc.(混合类)、Shooter(射击类)、Racing(赛车类)、Role-Playing(角色扮演类)、Fighting(格斗类)、Platform(平台类)、Simulation(模拟类)、Puzzle(解谜类)。

  4. result4

    PS2(PlayStation 2)、X360(微软Xbox 360)、PS3(索尼PlayStation 3)、wii(任天堂家用主机)、DS(任天堂便携式游戏机)、PS(索尼PlayStation)。

  5. result5

    Wii Sports(任天堂Wii体育)、Super Mario Bros.(任天堂超级马里奥兄弟)、Duck Hunt(任天堂打鸭子)等。

  6. result6

    Wii Sports(任天堂Wii体育)、Mario Kart Wii(任天堂马里奥赛车)、Nintendogs(任天狗狗)等。

  7. result7

    Super Mario Bros.(任天堂超级马里奥兄弟)、New Super Mario Bros.(任天堂新超级马里奥兄弟)、Pokemon Black/Pokemon White(任天堂宝可梦黑·白)、Pokemon Ruby/Pokemon Sapphire(任天堂宝可梦红宝石·蓝宝石)、Animal Crossing:Wild World(任天堂动物之森)。

  8. result8

    Wii Sports(任天堂Wii体育)、Super Mario Bros.(任天堂超级马里奥兄弟)、Mario Kart Wii(任天堂马里奥赛车)、Pokemon Red/Pokemon Blue(任天堂宝可梦红·蓝)等。

  9. result9

    北美销售额接近全球的一半,然后是欧洲销售额占比超过了四分之一,其次是日本,最后是其他地区。