1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil, DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window
class DwtStSearchVolumeLast365(object):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info
app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
pass
def run(self):
last_12_month = []
for i in range(0, 12):
last_12_month.append(CommonUtil.get_month_offset(self.date_info, -i))
print(f"过去12个月为{last_12_month}")
use_date_type = DateTypes.month.name if date_info > '2023-10' else DateTypes.month_old.name
sql1 = f"""
select
search_term,
total_search_volume,
row_number() over (order by total_search_volume desc) as sv_rank,
'{self.site_name}' as site_name,
'{self.date_info}' as date_info
from (
select
search_term,
sum(st_search_num) as total_search_volume
from (
select
search_term,
st_search_num
from dim_st_detail
where site_name = '{self.site_name}'
and date_type = '{use_date_type}'
and date_info in ({CommonUtil.list_to_insql(last_12_month)})
)
group by search_term
);
"""
df_month = self.spark.sql(sql1).repartition(40, 'search_term').cache()
df_save1 = df_month.repartition(1)
hive_tb = "dwt_st_sv_last365_month"
partition_by = ["site_name", "date_info"]
print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{hive_tb}/site_name={self.site_name}/date_info={self.date_info}"
print(f"清除hdfs目录中.....{hdfs_path}")
HdfsUtils.delete_hdfs_file(hdfs_path)
df_save1.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
sql2 = f"""
select
st_key,
search_term
from ods_st_key
where site_name = '{self.site_name}';
"""
df_st_key = self.spark.sql(sql2).repartition(40, 'search_term').cache()
# 重新获取月搜索量
df_all = df_month.join(
df_st_key, "search_term", "inner"
).select(
df_st_key['st_key'].alias("search_term_id"),
"search_term",
"total_search_volume",
F.lit(self.site_name).alias("site_name"),
F.lit(self.date_info).alias("date_info"),
).dropDuplicates(
['search_term_id']
).withColumn(
"sv_rank",
F.row_number().over(Window.partitionBy(['site_name']).orderBy(F.col("total_search_volume").desc()))
).repartition(40, 'sv_rank').cache()
df_month.unpersist()
df_st_key.unpersist()
# 读取ods_rank_search_rate_repeat表
sql3 = f"""
select
rank,
search_num,
date_info
from ods_rank_search_rate_repeat
where site_name = '{self.site_name}'
and date_type = 'month';
"""
df_rank_sv = self.spark.sql(sql3).cache()
window = Window.partitionBy(["rank"]).orderBy(df_rank_sv.date_info.desc())
df_rank_sv = df_rank_sv.withColumn(
"date_info_rank", F.row_number().over(window=window)
).filter(
"date_info_rank = 1"
).drop(
"date_info_rank", "date_info"
).repartition(40, 'rank').cache()
df_all = df_all.join(
df_rank_sv, df_all['sv_rank'] == df_rank_sv['rank'], "left"
).select(
"search_term_id",
"search_term",
"total_search_volume",
"sv_rank",
F.col("search_num").alias("sv_month"),
"site_name",
"date_info",
)
df_rank_sv.unpersist()
df_save2 = df_all.repartition(2)
hive_tb = "dwt_st_sv_last365"
partition_by = ["site_name", "date_info"]
print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{hive_tb}/site_name={self.site_name}/date_info={self.date_info}"
print(f"清除hdfs目录中.....{hdfs_path}")
HdfsUtils.delete_file_in_folder(hdfs_path)
df_save2.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_info = CommonUtil.get_sys_arg(2, None)
obj = DwtStSearchVolumeLast365(site_name, date_info)
obj.run()