1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.storagelevel import StorageLevel
class DwtMerchantwordsTopAsinInfo(Templates):
def __init__(self, site_name='us', batch='2024-00'):
super().__init__()
self.site_name = site_name
self.batch = batch
self.db_save = 'dwt_merchantwords_top_asin_info'
self.spark = self.create_spark_object(app_name=f"DwtMerchantwordsTopAsinInfo: {self.site_name}, {self.batch}")
self.partitions_num = 50
self.partitions_by = ['site_name', 'batch']
self.df_zr_asin_info = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.db_save}/site_name={self.site_name}/batch={self.batch}"
print(f"清除hdfs目录中.....{hdfs_path}")
HdfsUtils.delete_hdfs_file(hdfs_path)
def read_data(self):
print("读取ods_merchantwords_search_term_zr表")
sql = f"""
select
search_term as keyword,
asin,
page_row,
created_time,
updated_time,
date_info
from ods_merchantwords_search_term_zr
where site_name = '{self.site_name}'
and page = 1
and page_row <= 10;
"""
self.df_zr_asin_info = self.spark.sql(sqlQuery=sql)
self.df_zr_asin_info = self.df_zr_asin_info.repartition(50).persist(StorageLevel.MEMORY_ONLY)
self.df_zr_asin_info.show(10, truncate=True)
def handle_data(self):
# 防止出现重复的搜索词,先去重
window = Window.partitionBy('keyword').orderBy(
F.desc_nulls_last('date_info'), F.desc_nulls_last('created_time'), F.desc_nulls_last('updated_time')
)
self.df_zr_asin_info = self.df_zr_asin_info.withColumn(
"u_rank",
F.rank().over(window=window)
)
self.df_zr_asin_info = self.df_zr_asin_info.filter('u_rank=1')\
.drop('u_rank', 'date_info', 'created_time', 'updated_time')
self.df_zr_asin_info = self.df_zr_asin_info.orderBy('keyword', 'page_row')
self.df_save = self.df_zr_asin_info.groupby('keyword').agg(
F.concat_ws(",", F.collect_list('asin')).alias('zr_top_asin_list'),
F.concat_ws(",", F.collect_list('page_row')).alias('zr_rank_list')
)
self.df_save = self.df_save.withColumn(
'site_name',
F.lit(self.site_name)
).withColumn(
'batch',
F.lit(self.batch)
)
if __name__ == '__main__':
site_name = sys.argv[1]
batch = sys.argv[2]
handle_obj = DwtMerchantwordsTopAsinInfo(site_name=site_name, batch=batch)
handle_obj.run()