1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from utils.common_util import CommonUtil
from yswg_utils.udf_util import UdfUtil
from pyspark.sql.types import ArrayType, IntegerType, StringType
"""
asin 标题分词
"""
class ExportTitleTmp(object):
def __init__(self, site_name):
app_name = f"{self.__class__.__name__}"
self.site_name = site_name
self.spark = SparkUtil.get_spark_session(app_name)
self.udf_word_tokenize_reg = F.udf(self.udf_word_tokenize, ArrayType(StringType()))
pass
@staticmethod
def udf_word_tokenize(title: str):
if title is None:
return None
result = UdfUtil.word_tokenize(title)
filter_arr = [
" ", "\t", "\r", "\n", "(", ")", ",", ",", "[", "]", "、", "-", ":", "&", "|", "+", "``", "''",
]
return list(filter(lambda x: x not in filter_arr, result))
def run(self):
assert self.site_name is not None, "站点不能为空!"
sql = f"""
select asin_title
from big_data_selection.dim_cal_asin_history_detail
where site_name = '{self.site_name}'
and asin_title is not null
and asin_crawl_date > '2022-06-16'
"""
save_df = self.spark.sql(sql)
word_df = save_df.select(
F.explode(self.udf_word_tokenize_reg(F.col("asin_title"))).alias("word")
)
df_save = word_df.groupby("word") \
.agg(
F.count(F.col("word")).alias("count")
)
df_save.write.saveAsTable(name="word_count_tmp1", format='hive', mode='overwrite')
pass
if __name__ == '__main__':
try:
site_name = CommonUtil.get_sys_arg(1, None)
obj = ExportTitleTmp(site_name)
obj.run()
CommonUtil.send_wx_msg(['wujicang'], "提醒", "asin标题count完成!")
except:
import traceback
traceback.print_exc()
CommonUtil.send_wx_msg(['wujicang'], "提醒", "asin标题异常!")