dws_st_num_stats.py
5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from utils.templates import Templates
"""
获取搜索词对应asin标题最大最小数量等其他指标
依赖 dwd_asin_title_number 表 dwd_st_asin_measure 表
输出为 Dws_st_num_stats
"""
class DwsStNumStats(Templates):
def __init__(self, site_name, date_type, date_info):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
self.reset_partitions(partitions_num=1)
self.asin_title_num = self.spark.sql("select 1+1;")
self.st_asin_count = self.spark.sql("select 1+1;")
self.self_asin = self.spark.sql("select 1+1;")
self.search_term_id = self.spark.sql("select 1+1;")
self.df_save = self.spark.sql("select 1+1;")
self.db_save = "dws_st_num_stats"
self.partitions_by = ['site_name', 'date_type', 'date_info']
hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dws/{self.db_save}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
print(f"清除hdfs目录中.....{hdfs_path}")
HdfsUtils.delete_hdfs_file(hdfs_path)
def read_data(self):
print("读取dwd_asin_title_number:")
sql1 = f"""
select
asin,
value
from dwd_asin_title_number
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self.asin_title_num = self.spark.sql(sql1).repartition(40, 'asin').cache()
self.asin_title_num.show(10, truncate=True)
print("读取dwd_st_asin_measure:")
sql2 = f"""
select
search_term,
asin,
count(asin) over (partition by search_term) as asin_count
from dwd_st_asin_measure
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
group by search_term, asin
"""
self.st_asin_count = self.spark.sql(sql2).repartition(40, 'search_term', 'asin').cache()
self.st_asin_count.show(10, truncate=True)
print("读取ods_self_asin:")
sql3 = f"""
select
asin as max_num_asin,
1 as is_self_max_num_asin
from ods_self_asin
where site_name = '{self.site_name}'
"""
self.self_asin = self.spark.sql(sql3).repartition(40, 'asin').cache()
self.self_asin.show(10, truncate=True)
print("读取ods_st_key:")
sql4 = f"""
select
cast(st_key as integer) as search_term_id,
search_term
from ods_st_key
where site_name = '{self.site_name}'
"""
self.search_term_id = self.spark.sql(sql4).repartition(40, 'search_term').cache()
self.search_term_id.show(10, truncate=True)
def handle_data(self):
# st+asin维度与标题数量关联
self.df_save = self.st_asin_count.join(
self.asin_title_num, on='asin', how='left'
).fillna(
{'value': 1}
)
# 计算搜索词下打包数量大于1count
self.df_save = self.df_save.withColumn(
'num_flag',
F.when(F.col('value') > 1, 1).otherwise(0)
)
# 聚合,计算asin_count
self.df_save = self.df_save.groupby('search_term').agg(
F.max('asin_count').alias('asin_count'),
F.sum('num_flag').alias('num_count'),
F.max(F.struct('value', 'asin')).alias('max_row')
).repartition(30, 'search_term')
# 关联搜索词id
self.df_save = self.df_save.join(
self.search_term_id, on='search_term', how='inner'
)
# 计算多数量占比
self.df_save = self.df_save.withColumn(
'max_num_asin',
self.df_save.max_row.asin
).withColumn(
'max_num',
self.df_save.max_row.value
).withColumn(
'most_proportion',
F.round(F.col('num_count')/F.col('asin_count'), 3)
)
# 关联内部asin
self.df_save = self.df_save.join(
self.self_asin, on='max_num_asin', how='left'
).fillna(
{'is_self_max_num_asin': 0}
)
# 字段入库前处理
self.df_save = self.df_save.select(
'search_term',
'asin_count',
'num_count',
'max_num',
'max_num_asin',
'most_proportion',
'search_term_id',
'is_self_max_num_asin',
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info')
)
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
obj = DwsStNumStats(site_name, date_type, date_info)
obj.run()