1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql.functions import count, explode, lit, desc, sum
from pyspark.sql.types import ArrayType, StringType
from textblob import Word
from googletrans import Translator
class ABA2023YearWordFrequency(object):
def __init__(self):
self.spark = SparkUtil.get_spark_session("spark_task: aba_2023_year_word_frequency")
self.df_aba_2023 = self.spark.sql(f"select 1+1;")
self.df_beside_category = self.spark.sql(f"select 1+1;")
self.df_translate = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
self.df_save1 = self.spark.sql(f"select 1+1;")
self.df_save2 = self.spark.sql(f"select 1+1;")
self.df_agg = self.spark.sql(f"select 1+1;")
# 自定义udf
self.u_get_singular_form = self.spark.udf.register('get_singular_form', self.get_singular_form, StringType())
self.u_word_tokenize = self.spark.udf.register('word_tokenize', self.word_tokenize, ArrayType(StringType()))
# self.u_word_translate = self.spark.udf.register('word_translate', self.word_translate, StringType())
@staticmethod
def get_singular_form(word: str):
"""
将单词全部转化为单数形式
"""
if word:
singular_form = Word(word).lemmatize("n")
# word_object = Word(word)
# singular_form = word_object.singularize()
return singular_form
return word
@staticmethod
def word_tokenize(title: str):
"""
分词器
"""
from nltk.tokenize import word_tokenize
result = word_tokenize(title, "english")
return result
# @staticmethod
# def word_translate(word: str):
# if word:
# try:
# translator = Translator()
# result = translator.translate(word, src='en', dest='zh-cn')
# return result.text
# except Exception as e:
# # 处理其他未知错误
# print(f"An unexpected error occurred: {e}")
# return None
# return None
def read_data(self):
sql1 = f"""
select
search_term,
category_id
from dwt_aba_last365
where site_name = 'us'
and date_type = 'last365day'
and date_info = '2023-12';
"""
self.df_aba_2023 = self.spark.sql(sql1).cache()
print("df_aba_2023的数量:")
print(self.df_aba_2023.count())
sql2 = f"""
select
category_id
from dim_bsr_category_tree
where site_name = 'us'
and en_name in ('Audible Books & Originals', 'Books', 'Kindle Store', 'Apps & Games', 'Movies & TV', 'CDs & Vinyl', 'Software', 'Video Games')
and category_parent_id = 0;
"""
self.df_beside_category = self.spark.sql(sql2).cache()
print("df_beside_category的数量:")
print(self.df_beside_category.count())
sql3 = f"""
select
word,
simple_cn as cn
from tmp_en_dict;
"""
self.df_translate = self.spark.sql(sql3).cache()
print("df_translate的数量:")
print(self.df_translate.count())
def handle_data(self):
self.df_save = self.df_aba_2023.join(
self.df_beside_category, on='category_id', how='left_anti'
).select('search_term')
self.df_save = self.df_save.select(explode(self.u_word_tokenize(self.df_save['search_term'])).alias('word'))
self.df_save = self.df_save.groupby(['word']).agg(
count('word').alias('word_frequency')
)
self.df_save = self.df_save.join(
self.df_translate, on='word', how='left'
).withColumn(
'word_singular_form',
self.u_get_singular_form(self.df_save['word'])
).cache()
self.df_save1 = self.df_save.select(
'word', 'word_frequency', 'cn'
).orderBy(
desc('word_frequency')
).withColumn(
'date_info',
lit('2023')
)
print("df_save1的数量:")
print(self.df_save1.count())
self.df_save1.write.saveAsTable(name='tmp_word_frequency', format='hive', mode='append', partitionBy='date_info')
print("df_save1存储完成!")
self.df_agg = self.df_save.groupby(['word_singular_form']).agg(
sum('word_frequency').alias('word_frequency')
)
self.df_save2 = self.df_save.select('word', 'cn', 'word_singular_form').join(
self.df_agg, on='word_singular_form', how='left'
).select(
'word', 'word_frequency', 'cn'
).orderBy(
desc('word_frequency')
).withColumn(
'date_info',
lit('2023-merge')
)
print("df_save2的数量:")
print(self.df_save2.count())
self.df_save2.write.saveAsTable(name='tmp_word_frequency', format='hive', mode='append', partitionBy='date_info')
print("df_save2存储完成!")
if __name__ == '__main__':
obj = ABA2023YearWordFrequency()
obj.read_data()
obj.handle_data()