1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
"""
author: wangrui
description: 根据ods_keepa_asin_bsr_rank得到最小产品线市场数据汇总
table_read_name: ods_keepa_asin_bsr_rank\dim_asin_detail
table_save_name: dwt_keepa_asin_bsr_rank
table_save_level: dwt
version: 2.0
created_date: 2023-08-22
updated_date: 2023-08-22
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql import functions as F
from pyspark.sql import Window
from yswg_utils.common_df import get_node_first_id_df
from pyspark.sql.types import IntegerType
from utils.db_util import DBUtil
from utils.spark_util import SparkUtil
from yswg_utils.common_udf import udf_parse_amazon_orders
class DwtAsinBsrRank(Templates):
def __init__(self, site_name="us", date_type="month", date_info="2023-01", run_type=1):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.run_type = int(run_type)
self.db_save_summary = f"dwt_keepa_asin_bsr_rank"
self.db_save_detail = f"dws_keepa_asin_bsr_rank"
self.spark = self.create_spark_object(
app_name=f"keepa_asin_bsr_rank {self.site_name} {self.date_type} {self.date_info}")
self.year, self.month = self.date_info.split('-')
self.get_year_month_days_dict(year=int(self.year))
self.complete_date_info_tuple = self.get_complete_week_tuple()
self.orders_transform_rate = self.get_orders_transform_rate()
self.df_save_summary = self.spark.sql(f"select 1+1;")
self.df_save_detail = self.spark.sql(f"select 1+1;")
self.df_asin_detail = self.spark.sql(f"select 1+1;")
self.df_keepa_asin = self.spark.sql(f"select 1+1;")
self.df_asin_new_cate = self.spark.sql(f"select 1+1;")
self.df_inv_asin = self.spark.sql(f"select 1+1;")
self.df_inv_asin_detail = self.spark.sql(f"select 1+1;")
self.df_complete_inv_asin = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(60)
self.u_parse_amazon_orders = F.udf(udf_parse_amazon_orders, IntegerType())
def get_complete_week_tuple(self):
self.df_date = self.spark.sql(f"select * from dim_date_20_to_30 ;")
df = self.df_date.toPandas()
df_loc = df.loc[(df.year_month == f"{self.date_info}") & (df.week_day == 1)]
return tuple(df_loc.year_week)
def get_orders_transform_rate(self):
month_days = self.year_month_days_dict[int(self.month)]
if self.date_type in ['day', 'week']:
if self.date_type == 'day':
return 1 / month_days
if self.date_type == 'week':
return 7 / month_days
else:
return 1
def read_data(self):
print("1. 读取dim_asin_detail, 获取上个月所有asin信息")
if self.run_type == 1:
sql = f"""
select
asin,
rank,
price,
node_id as cat_id,
category,
buy_sales,
created_at as dt
from ods_asin_detail where site_name='{self.site_name}' and date_type='month' and date_info = '{self.date_info}' and node_id is not null;
"""
else:
sql = f"""
select
asin,
rank,
price,
node_id as cat_id,
category,
buy_sales,
created_at as dt
from ods_asin_detail where site_name='{self.site_name}' and date_type='month_week' and date_info = '{self.date_info}' and node_id is not null;
"""
print("sql:", sql)
self.df_asin_detail = self.spark.sql(sqlQuery=sql).repartition(60).cache()
self.df_asin_detail.show(20, truncate=False)
print("2. 读取内部asin信息")
sql = f"select asin, 1 as is_self_asin from us_self_asin where delete_time is null group by asin "
print("sql:", sql)
mysql_con_info = DBUtil.get_connection_info(db_type='mysql', site_name=self.site_name)
if mysql_con_info is not None:
self.df_self_asin = SparkUtil.read_jdbc_query(session=self.spark, url=mysql_con_info['url'],
pwd=mysql_con_info['pwd'],
username=mysql_con_info['username'],
query=sql).cache()
self.df_self_asin.show(20, truncate=False)
# 读取dim_bsr_category_tree 新的类目树 获取一级分类id
self.df_asin_new_cate = get_node_first_id_df(self.site_name, self.spark)
self.df_asin_new_cate = self.df_asin_new_cate.withColumnRenamed("node_id", "cat_id")
self.df_asin_new_cate = self.df_asin_new_cate.withColumnRenamed("category_first_id", "cate_1_id").cache()
self.df_asin_new_cate.show(20, truncate=False)
print("3. 读取inv_asin数据")
sql = f"""
select asin, 1 as is_inner from us_inv_asin group by asin
"""
if mysql_con_info is not None:
self.df_inv_asin = SparkUtil.read_jdbc_query(session=self.spark, url=mysql_con_info['url'],
pwd=mysql_con_info['pwd'],
username=mysql_con_info['username'],
query=sql).cache()
self.df_inv_asin.show(20, truncate=False)
print("4. 读取self_asin_detail")
sql = f"""
select asin ,node_id as cat_id from us_self_asin_detail where asin_type like '%%11%%' and created_at >= DATE_FORMAT(CURDATE(), '%Y-%m-01') group by asin, node_id
"""
if mysql_con_info is not None:
self.df_inv_asin_detail = SparkUtil.read_jdbc_query(session=self.spark, url=mysql_con_info['url'],
pwd=mysql_con_info['pwd'],
username=mysql_con_info['username'],
query=sql).cache()
self.df_inv_asin_detail.show(20, truncate=False)
def handle_inv_asin_cat(self):
self.df_inv_asin = self.df_inv_asin.join(
self.df_inv_asin_detail, on=['asin'], how='left'
)
self.df_inv_asin = self.df_inv_asin.filter((F.col("cat_id").isNotNull()) & (F.col("cat_id") != '')).cache()
self.df_keepa_asin = self.df_inv_asin.select("cat_id", "is_inner")
def handle_asin_detail(self):
self.df_asin_detail = self.df_asin_detail.repartition(60)
window = Window.partitionBy(['asin']).orderBy(
self.df_asin_detail.dt.desc()
)
self.df_asin_detail = self.df_asin_detail.withColumn('dt_rank', F.row_number().over(window=window))
self.df_asin_detail = self.df_asin_detail.filter("dt_rank = 1")
self.df_asin_detail = self.df_asin_detail.drop("dt_rank")
self.df_asin_detail = self.df_asin_detail.withColumn("orders",
self.u_parse_amazon_orders(self.df_asin_detail.buy_sales))
self.df_asin_detail = self.df_asin_detail.join(
self.df_self_asin, on=['asin'], how='left'
)
self.df_asin_detail = self.df_asin_detail.filter((F.col("is_self_asin") != 1) | (F.col("is_self_asin").isNull())).drop("is_self_asin").cache()
def handle_data_join(self):
self.df_asin_detail = self.df_asin_detail.repartition(60)
self.df_asin_detail = self.df_asin_detail.join(
self.df_keepa_asin, on=['cat_id'], how='left'
).join(
self.df_asin_new_cate, on=['cat_id'], how='left'
)
self.df_asin_detail = self.df_asin_detail.drop_duplicates(['asin']).cache()
self.df_asin_detail = self.df_asin_detail.na.fill({"is_inner": 2})
def handle_keepa_asin_detail(self):
type_expr = """
CASE
WHEN
(price is not null and price < 7 and orders >= 150) or
(price >= 7 and price < 15 and orders >= 100) or
(price >= 15 and price < 20 and orders >= 80) or
(price >= 20 and price < 35 and orders >= 40) or
(price >= 35 and price < 50 and orders >= 30) or
(price >= 50 and orders >= 20) or
(price is null and orders >=50)
THEN
1
ELSE
2
END
"""
self.df_asin_detail = self.df_asin_detail.withColumn('type', F.expr(type_expr))
self.df_save_detail = self.df_asin_detail.select("cat_id", "asin", "rank", "price", "cate_1_id", "orders",
"type", "category").cache()
def handle_keepa_asin_summary(self):
self.df_save_summary = self.df_asin_detail.select("cat_id", "orders", "type", "is_inner").cache()
self.df_save_summary = self.df_save_summary.groupby(['cat_id']).agg(
F.first(F.col("is_inner")).alias("is_inner"),
F.sum(F.col("orders")).alias("orders_sum"),
F.sum(F.when(F.col('type') == 1, F.col("orders")).otherwise(F.lit(0))).alias("success_orders_sum"),
F.sum(F.when(F.col('type') == 2, F.col("orders")).otherwise(F.lit(0))).alias("fail_orders_sum"),
F.sum(F.when(F.col('type') == 1, 1).otherwise(F.lit(0))).alias("success_num"),
F.sum(F.when(F.col('type') == 2, 1).otherwise(F.lit(0))).alias("fail_num"),
F.sum(F.when(F.col("orders") >= 50, 1).otherwise(0)).alias("50_qty_asin_count")
)
df_category_info = self.df_asin_detail.select("cat_id", "category", "dt")
df_with_category_info = df_category_info.filter(F.col("category").isNotNull())
window = Window.partitionBy(['cat_id']).orderBy(
df_with_category_info.dt.desc()
)
df_category_info = df_with_category_info.withColumn("c_rank", F.row_number().over(window=window))
df_new_category_info = df_category_info.filter("c_rank = 1")
df_new_category_info = df_new_category_info.drop("c_rank", "dt")
self.df_save_summary = self.df_save_summary.join(
df_new_category_info, on=['cat_id'], how='left'
).cache()
def handle_data_group(self):
self.df_save_detail = self.df_save_detail.withColumn("created_time", F.date_format(F.current_timestamp(),
'yyyy-MM-dd HH:mm:SS')). \
withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \
withColumn("string_field1", F.lit("null")). \
withColumn("string_field2", F.lit("null")). \
withColumn("string_field3", F.lit("null")). \
withColumn("int_field1", F.lit(0)). \
withColumn("int_field2", F.lit(0)). \
withColumn("int_field3", F.lit(0)). \
withColumn("site_name", F.lit(self.site_name)). \
withColumn("date_type", F.lit(self.date_type)). \
withColumn("date_info", F.lit(self.date_info))
self.df_save_summary = self.df_save_summary.withColumn("create_time", F.date_format(F.current_timestamp(),
'yyyy-MM-dd HH:mm:SS')). \
withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \
withColumn("string_field1", F.lit("null")). \
withColumn("string_field2", F.lit("null")). \
withColumn("string_field3", F.lit("null")). \
withColumn("int_field1", F.lit(0)). \
withColumn("int_field2", F.lit(0)). \
withColumn("int_field3", F.lit(0)). \
withColumn("site_name", F.lit(self.site_name)). \
withColumn("date_type", F.lit(self.date_type)). \
withColumn("date_info", F.lit(self.date_info))
def handle_data(self):
self.read_data()
self.handle_inv_asin_cat()
self.handle_asin_detail()
self.handle_data_join()
self.handle_keepa_asin_detail()
self.handle_keepa_asin_summary()
self.handle_data_group()
def save_data(self):
self.save_data_common(
df_save=self.df_save_detail,
db_save=self.db_save_detail,
partitions_num=self.partitions_num,
partitions_by=self.partitions_by
)
self.save_data_common(
df_save=self.df_save_summary,
db_save=self.db_save_summary,
partitions_num=self.partitions_num,
partitions_by=self.partitions_by
)
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1
run_type = sys.argv[4]
handle_obj = DwtAsinBsrRank(site_name=site_name, date_type=date_type, date_info=date_info, run_type=run_type)
handle_obj.run()