1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window
from utils.templates import Templates
"""
获取搜索词 top100 相关指标
依赖 dwd_asin_measure 表 dwd_st_asin_measure 表 dim_asin_detail 表
输出为 Dws_top100_asin_info
"""
class DwsTop100AsinInfo(Templates):
def __init__(self, site_name, date_type, date_info):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
self.db_save = "dws_top100_asin_info"
self.df_search_term_asin = self.spark.sql("select 1+1;")
self.df_search_term_id = self.spark.sql("select 1+1;")
self.df_asin_bsr_orders = self.spark.sql("select 1+1;")
self.df_asin_detail = self.spark.sql("select 1+1;")
self.df_save = self.spark.sql("select 1+1;")
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=25)
partition_dict = {
"site_name": self.site_name,
"date_type": self.date_type,
"date_info": self.date_info,
}
hdfs_path = CommonUtil.build_hdfs_path(self.db_save, partition_dict=partition_dict)
print(f"清除hdfs目录中.....{hdfs_path}")
HdfsUtils.delete_file_in_folder(hdfs_path)
def read_data(self):
sql1 = f"""
select
search_term,
asin
from dwd_st_asin_measure
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self.df_search_term_asin = self.spark.sql(sql1).repartition(40, 'search_term', 'asin').cache()
self.df_search_term_asin.show(10, truncate=True)
sql2 = f"""
select
cast(st_key as integer) as search_term_id,
search_term
from ods_st_key
where site_name = '{self.site_name}'
"""
self.df_search_term_id = self.spark.sql(sql2).repartition(40, 'search_term').cache()
self.df_search_term_id.show(10, truncate=True)
sql3 = f"""
select
asin,
asin_bsr_orders as orders
from dwd_asin_measure
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self.df_asin_bsr_orders = self.spark.sql(sql3).repartition(40, 'asin').cache()
self.df_asin_bsr_orders.show(10, truncate=True)
sql4 = f"""
select
asin,
asin_launch_time,
asin_is_new
from dim_asin_detail
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self.df_asin_detail = self.spark.sql(sql4).repartition(40, 'asin').cache()
self.df_asin_detail.show(10, truncate=True)
def handle_data(self):
self.df_save = self.df_search_term_asin.join(
self.df_search_term_id, on='search_term', how='left'
).join(
self.df_asin_bsr_orders, on='asin', how='left'
).join(
self.df_asin_detail, on='asin', how='left'
)
# 取前一百
self.df_save = self.df_save.withColumn(
"row_number",
F.row_number().over(
Window.partitionBy(F.col('search_term_id')).orderBy(F.col("orders").desc_nulls_last())
)
)
self.df_save = self.df_save.filter("row_number <= 100")
self.df_save = self.df_save.withColumn(
"group_sum",
F.sum(F.col('orders')).over(
Window.partitionBy(F.col('search_term_id'))
)
)
self.df_save = self.df_save.groupby(F.col("search_term_id")).agg(
F.first("search_term").alias("search_term"),
F.concat_ws(',', F.collect_list("asin")).alias("top100_asin"),
F.concat_ws(',', F.collect_list(F.coalesce('orders', F.lit(0)))).alias("top100_orders"),
# 市场比例
F.concat_ws(',', F.collect_list(F.coalesce(F.round(F.expr('orders / group_sum'), 4), F.lit(0))))
.alias("top100_market_share"),
F.concat_ws(',', F.collect_list(F.coalesce(F.col("asin_is_new"), F.lit('-')))).alias("top100_is_new"),
F.concat_ws(',', F.collect_list("row_number")).alias("top100_rank"),
F.lit(self.site_name).alias("site_name"),
F.lit(self.date_type).alias("date_type"),
F.lit(self.date_info).alias("date_info"),
)
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
obj = DwsTop100AsinInfo(site_name, date_type, date_info)
obj.run()