1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
"""
@Author : HuangJian
@Description : 关键词与Asin详情维表
@SourceTable :
①dwd_st_asin_measure
②dwt_aba_st_analytics
@SinkTable : dwt_aba_last_change_rate
@CreateTime : 2022/03/13 14:55
@UpdateTime : 2022/03/13 14:55
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil, DateTypes
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
class DwtAbaLastChangeRate(object):
def __init__(self, site_name, date_type, date_info):
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.hive_tb = "dwt_aba_last_change_rate"
app_name = f"{ self.hive_tb}:{site_name}:{date_type}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
self.partitions_num = CommonUtil.reset_partitions(site_name, 1)
hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.hive_tb}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
print(f"清除hdfs目录中数据:{hdfs_path}")
HdfsUtils.delete_hdfs_file(hdfs_path)
self.last_year_index = int
# 计算环比日期
self.last_date_info = self.handle_date_offset(0)
# 计算同比日期
self.last_year_date_info = self.handle_date_offset(1)
# 初始化全局df
self.df_aba_analytics = self.spark.sql(f"select 1+1;")
self.df_aba_analytics_old = self.spark.sql(f"select 1+1;")
self.df_st_base_data = self.spark.sql(f"select 1+1;")
self.df_st_last_data = self.spark.sql(f"select 1+1;")
self.df_st_last_year_data = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
def handle_date_offset(self, handle_type: int):
# handle_type = 0 代表计算环比日期,等于 1 代表计算同比日期
handle_date = self.date_info
if handle_type == 0:
# 计算环比计算日期--通过检索表的各分区,取当前计算日上一周期
if self.date_type == DateTypes.last365day.name:
# 当date_type为['last365day']时,检索分区dwt_aba_last365
date_df = CommonUtil.select_partitions_df(self.spark, "dwt_aba_last365")
handle_date = date_df.filter(
f"site_name = '{self.site_name}' date_type = '{self.date_type}' and and date_info < '{self.date_info}' "
).selectExpr("max(date_info)").rdd.flatMap(lambda ele: ele).collect()[0]
else:
# 当date_type为['day','week','month','last30day']时,检索分区dwt_aba_st_analytics
date_df = CommonUtil.select_partitions_df(self.spark, "dwt_aba_st_analytics")
handle_date = date_df.filter(
f"site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info < '{self.date_info}' "
).selectExpr("max(date_info)").rdd.flatMap(lambda ele: ele).collect()[0]
else:
# 计算同比计算日期 (无论哪个日期类型,开头参数分割第一个参数均为年)
year_int = int(CommonUtil.safeIndex(handle_date.split("-"), 0, None))
last_year_int = year_int - 1
self.last_year_index = last_year_int
# 将当前年份,替换成同比年份
handle_date = handle_date.replace(str(year_int), str(last_year_int))
print("计算处理之后的日期:", handle_date)
return handle_date
def run(self):
# aba :365的取值表逻辑与其他时间区间(day,week,month,last30day)的取值逻辑不一致
if self.date_type == DateTypes.last365day.name:
self.handle_365_data()
else:
self.read_data()
self.handle_base()
self.handle_year_ratio()
self.save_data()
def read_data(self):
sql1 = f"""
select
id,
search_term,
rank,
bsr_orders,
asin_cn_count,
asin_fbm_count,
asin_amazon_count,
search_volume
from dwt_aba_st_analytics
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self.df_aba_analytics = self.spark.sql(sql1).repartition(40, 'id').cache()
self.df_aba_analytics.show(10, truncate=True)
sql2 = f"""
select
id,
rank as last_rank,
bsr_orders as last_bsr_orders,
asin_cn_count as last_asin_cn_count,
asin_fbm_count as last_asin_fbm_count,
asin_amazon_count as last_asin_amazon_count
from dwt_aba_st_analytics
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.last_date_info}'
"""
self.df_aba_analytics_old = self.spark.sql(sql2).repartition(40, 'id').cache()
self.df_aba_analytics_old.show(10, truncate=True)
# 获取同比周期数据--(2022年的同比数据只能通过dim_st_detail取到搜索词排名)
if self.last_year_date_info <= '2022-09':
sql = f"""
select
search_term,
st_rank as last_year_rank,
null as last_year_bsr_orders,
null as last_year_asin_cn_count,
null as last_year_asin_fbm_count,
null as last_year_asin_amazon_count,
st_search_num as last_year_search_volume
from dim_st_detail
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.last_year_date_info}'
"""
else:
sql = f"""
select
search_term,
rank as last_year_rank,
bsr_orders as last_year_bsr_orders,
asin_cn_count as last_year_asin_cn_count,
asin_fbm_count as last_year_asin_fbm_count,
asin_amazon_count as last_year_asin_amazon_count,
search_volume as last_year_search_volume
from dwt_aba_st_analytics
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.last_year_date_info}'
"""
self.df_st_last_year_data = self.spark.sql(sql).repartition(40, 'search_term').cache()
self.df_st_last_year_data.show(10, truncate=True)
def handle_base(self):
self.df_st_base_data = self.df_aba_analytics.join(
self.df_aba_analytics_old, on='id', how='left'
)
self.df_st_base_data = self.df_st_base_data.withColumn(
'rank_rate_of_change',
F.round((F.col('rank') - F.col('last_rank')) / F.col('last_rank'), 3)
).withColumn(
'bsr_orders_rate_of_change',
F.round((F.col('bsr_orders') - F.col('last_bsr_orders')) / F.col('last_bsr_orders'), 3)
).withColumn(
'cn_seller_rate_of_change',
F.round((F.col('asin_cn_count') - F.col('last_asin_cn_count')) / F.col('last_asin_cn_count'), 3)
).withColumn(
'fbm_rate_of_change',
F.round((F.col('asin_fbm_count') - F.col('last_asin_fbm_count')) / F.col('last_asin_fbm_count'), 3)
).withColumn(
'amazon_rate_of_change',
F.round((F.col('asin_amazon_count') - F.col('last_asin_amazon_count')) / F.col('last_asin_amazon_count'), 3)
).select(
'id', 'search_term', 'rank', 'bsr_orders', 'asin_cn_count', 'asin_fbm_count', 'asin_amazon_count',
'rank_rate_of_change', 'bsr_orders_rate_of_change', 'cn_seller_rate_of_change',
'fbm_rate_of_change', 'amazon_rate_of_change', 'search_volume'
).repartition(40, 'search_term').cache()
self.df_aba_analytics.unpersist()
self.df_aba_analytics_old.unpersist()
def handle_year_ratio(self):
# 计算同比逻辑
df_year_ratio = self.df_st_base_data.join(
self.df_st_last_year_data, on='search_term', how='left'
)
df_year_ratio = df_year_ratio.withColumn(
"rank_change_rate",
F.round(F.expr("(rank - last_year_rank) / last_year_rank"), 3)
).withColumn(
"bsr_orders_change_rate",
F.round(F.expr("(bsr_orders - last_year_bsr_orders) / last_year_bsr_orders"), 3)
).withColumn(
"cn_seller_change_rate",
F.round(F.expr("(asin_cn_count - last_year_asin_cn_count) / last_year_asin_cn_count"), 3)
).withColumn(
"fbm_change_rate",
F.round(F.expr("(asin_fbm_count - last_year_asin_fbm_count) / last_year_asin_fbm_count"), 3)
).withColumn(
"amazon_change_rate",
F.round(F.expr("(asin_amazon_count - last_year_asin_amazon_count) / last_year_asin_amazon_count"), 3)
).withColumn(
"search_volume_change_rate",
F.round(F.expr("(search_volume - last_year_search_volume) / last_year_search_volume"), 3)
).na.fill({
# 默认值1000,本次有数据 同比(环比)没有数据 归为上升,排名负数代表上升
"rank_change_rate": -1000.000,
"rank_rate_of_change": -1000.000,
"bsr_orders_change_rate": 1000.000,
"bsr_orders_rate_of_change": 1000.000,
"cn_seller_change_rate": 1000.000,
"cn_seller_rate_of_change": 1000.000,
"fbm_change_rate": 1000.000,
"fbm_rate_of_change": 1000.000,
"amazon_change_rate": 1000.000,
"amazon_rate_of_change": 1000.000,
"search_volume_change_rate": 1000.000
})
self.df_save = df_year_ratio
def handle_365_data(self):
sql = f"""
with base_data as (
select
id,
search_term,
rank,
bsr_orders
from dwt_aba_last365
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
),
chain_ratio_data as (
select
id,
rank as last_rank,
bsr_orders as last_bsr_orders
from dwt_aba_last365
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.last_date_info}'
),
year_ratio_data as (
select
id,
rank as last_year_rank,
bsr_orders as last_year_bsr_orders
from dwt_aba_last365
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.last_year_date_info}'
)
select
base.id,
base.search_term,
base.rank,
base.bsr_orders,
round((base.rank - chain.last_rank)/chain.last_rank,3) as rank_rate_of_change,
round((base.bsr_orders - chain.last_bsr_orders)/chain.last_bsr_orders,3) as bsr_orders_rate_of_change,
round((base.rank - year.last_year_rank)/year.last_year_rank,3) as rank_change_rate,
round((base.bsr_orders - year.last_year_bsr_orders)/year.last_year_bsr_orders,3) as bsr_orders_change_rate
from base_data base left join chain_ratio_data chain
on base.id = chain.id
left join year_ratio_data year
on base.id = year.id
"""
print("动态365的同比、环比计算sql语句:", sql)
self.df_save = self.spark.sql(sqlQuery=sql)
# 字段补全
self.df_save = self.df_save.withColumn(
"cn_seller_rate_of_change", F.lit(None)
).withColumn(
"cn_seller_change_rate", F.lit(None)
).withColumn(
"fbm_rate_of_change", F.lit(None)
).withColumn(
"fbm_change_rate", F.lit(None)
).withColumn(
"amazon_rate_of_change", F.lit(None)
).withColumn(
"amazon_change_rate", F.lit(None)
)
def save_data(self):
self.df_save = self.df_save.select(
F.col("id").alias("search_term_id"),
F.col("search_term"),
F.col("rank_rate_of_change"),
F.col("rank_change_rate"),
F.col("bsr_orders_rate_of_change"),
F.col("bsr_orders_change_rate"),
F.col("cn_seller_rate_of_change"),
F.col("cn_seller_change_rate"),
F.col("fbm_rate_of_change"),
F.col("fbm_change_rate"),
F.col("amazon_rate_of_change"),
F.col("amazon_change_rate"),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias("created_time"),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias("updated_time"),
F.col("search_volume_change_rate"),
F.lit(self.site_name).alias("site_name"),
F.lit(self.date_type).alias("date_type"),
F.lit(self.date_info).alias("date_info")
)
# 类型转换
self.df_save = CommonUtil.auto_transfer_type(self.spark, self.df_save, self.hive_tb)
self.df_save = self.df_save.repartition(self.partitions_num)
partition_by = ["site_name", "date_type", "date_info"]
print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
self.df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
obj = DwtAbaLastChangeRate(site_name, date_type, date_info)
obj.run()