1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
"""
@Author : HuangJian
@Description : 关键词与Asin详情维表
@SourceTable :
①ods_search_term_(zr,sp,sb,ac,bs,er,tr)
②ods_asin_keep_date
③ods_asin_variat
④ods_asin_detail
⑤dwd_bs_category_asin
@SinkTable : dim_st_asin_detail
@CreateTime : 2022/11/10 9:56
@UpdateTime : 2022/11/10 9:56
"""
import os
import sys
import datetime
import traceback
from datetime import date, timedelta
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType
class DimStAsinDetail(Templates):
def __init__(self, site_name='us', date_type="day", date_info='2022-10-01'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = f'dim_st_asin_detail'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")
self.year_week = self.get_year_week()
self.year_week_tuple = self.get_last_4_week()
self.df_save = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=60)
self.data_type_list = ['tr', 'er', 'bs', 'ac', 'sb1', 'sb2', 'sb3', 'sp', 'zr'] # 小表拼大表
self.df_st_asin_info = self.spark.sql(
f"select search_term, asin, page, page_row, 'zr' as data_type, updated_time,site_name,date_type,date_info from ods_st_rank_zr limit 0;")
self.df_asin_keep_date = self.spark.sql(f"select 1+1;")
self.df_asin_variat = self.spark.sql(f"select 1+1;")
self.df_asin_detail = self.spark.sql(f"select 1+1;")
self.df_bs_category = self.spark.sql("select 1+1;")
# 自定义udf函数相关对象
self.u_launch_time = self.spark.udf.register("u_launch_time", self.udf_launch_time, IntegerType())
self.u_days_diff = self.spark.udf.register("u_days_diff", self.udf_days_diff, IntegerType())
self.u_year_week = self.spark.udf.register('u_year_week', self.udf_year_week, StringType())
@staticmethod
def udf_page_rank(page, page_1_count, page_2_count, page_row):
"""
处理 zr, sp 的page_rank字段
:param page:
:param page_1_count:
:param page_2_count:
:param page_row:
:return: page_rank
"""
if page == 1:
return page_row
elif page == 2:
return page_1_count + page_row
else:
return page_2_count + page_row
def handle_data_page_rank(self, df, data_type):
print(f"{data_type}--page_rank计算")
u_page_rank = self.spark.udf.register('u_page_rank', self.udf_page_rank, IntegerType())
# 由于zr,sp存在重复值,改成max,而不是使用count
df_page_1 = df.filter(f"page=1").groupBy(['search_term']).agg({f"page_row": "max"})
df_page_2 = df.filter(df[f'page'] == 2).groupBy(['search_term']).agg(
{f"page_row": "max"})
df_page_1 = df_page_1.withColumnRenamed(f'max(page_row)', 'page_1_count')
df_page_2 = df_page_2.withColumnRenamed(f'max(page_row)', 'page_2_count_old')
df = df.join(df_page_1, on='search_term', how='left'). \
join(df_page_2, on='search_term', how='left')
df = df.fillna(0)
df = df.withColumn("page_2_count", df.page_1_count + df.page_2_count_old)
df = df.withColumn(f"page_rank", u_page_rank(
df[f'page'], df.page_1_count, df.page_2_count, df[f'page_row']))
# df.show(n=10, truncate=False)
return df
def get_last_4_week(self):
# 根据当前周获取,最近的四周
print("调用get_last_4_week,当前年-周:",self.year_week)
self.df_week = self.spark.sql(f"select * from dim_week_20_to_30;")
df = self.df_week.toPandas()
self.year, self.week = int(self.year_week.split("-")[0]), int(self.year_week.split("-")[1])
df_week = df.loc[df.year_week == self.year_week]
current_id = list(df_week.id)[0] if list(df_week.id) else None
id_tuple = (current_id, current_id - 1, current_id - 2, current_id - 3)
df_4_week = df.loc[df.id.isin(id_tuple)]
df_4_week = tuple(df_4_week.year_week) if tuple(df_4_week.year_week) else ()
return df_4_week
def get_year_week(self):
# 根据日期获取当前周
if self.date_type == "day":
sql = f"select year_week from dim_date_20_to_30 where `date`='{self.date_info}'"
df = self.spark.sql(sqlQuery=sql).toPandas()
print(list(df.year_week)[0])
return list(df.year_week)[0]
@staticmethod
def udf_launch_time(launch_time,date_type,date_info):
# 针对launch_time字段进行计算与当前日期的间隔天数
if "-" in str(launch_time):
# print(DwdFeedBack.week_date)
asin_date_list = str(launch_time).split("-")
try:
asin_date = datetime.date(year=int(asin_date_list[0]),
month=int(asin_date_list[1]),
day=int(asin_date_list[2]))
week_date = '2022-10-01'
if date_type == 'week':
cur_year = str(date_info).split("-")[0]
cur_week = str(date_info).split("-")[1]
d = date(cur_year, 1, 1)
d = d - timedelta(d.weekday())
dlt = timedelta(days=(cur_week) * 7)
week_date = d + dlt
if date_type == 'day':
week_date=date_info
cur_date_list = str(week_date).split("-")
cur_date = datetime.date(year=int(cur_date_list[0]),
month=int(cur_date_list[1]),
day=int(cur_date_list[2]))
days_diff = (cur_date - asin_date).days
except Exception as e:
print(e, traceback.format_exc())
print(launch_time, asin_date_list)
days_diff = -2
else:
days_diff = -1
return days_diff
@staticmethod
def udf_days_diff(days_diff):
# 针对days_diff字段进行计算180天,判断是否为新品
if 0 <= days_diff <= 180:
return 1
else:
return 0
@staticmethod
def udf_year_week(dt):
year, week = dt.split("-")[0], dt.split("-")[1]
if int(week) < 10:
return f"{year}-0{week}"
else:
return f"{year}-{week}"
def read_data(self):
# 通过ods层的ods_search_term_(zr,sp,sb,ac,bs,er,tr) 得到st与asin的映射关系
for data_type in self.data_type_list:
print(f"site_name: {self.site_name}, data_type: {data_type}")
if data_type in ['zr', 'sp']:
sql = f"select search_term, asin, page, page_row, '{data_type}' as data_type,created_time, updated_time, site_name,date_type,date_info from ods_search_term_{data_type} " \
f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
df = self.spark.sql(sqlQuery=sql)
# 处理page_rank
df = self.handle_data_page_rank(df=df, data_type=data_type)
df = df.drop('page_1_count', 'page_2_count', 'page_2_count_old')
else:
if data_type in ['sb1', 'sb2', 'sb3']:
sql = f"select search_term, asin, page, '{data_type}' as data_type,created_time, updated_time, site_name,date_type,date_info from ods_search_term_sb " \
f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and data_type={int(data_type[-1])};"
else:
sql = f"select search_term, asin, page, '{data_type}' as data_type,created_time, updated_time, site_name,date_type,date_info from ods_search_term_{data_type} " \
f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
df = self.spark.sql(sqlQuery=sql)
# print(f"site_name: {self.site_name}, data_type: {data_type}, partitions: {df.rdd.getNumPartitions()}")
self.df_st_asin_info = self.df_st_asin_info.unionByName(df, allowMissingColumns=True)
# 补充year_week参数,方便后面取周表去重
self.df_st_asin_info = self.df_st_asin_info.withColumn("year_week", F.lit(self.year_week))
print("self.df_st_asin_info", self.df_st_asin_info.show(10, truncate=False))
# print("self.df_save.count():", self.df_save.count ())
# 获取ods层的ods_asin_keep_date
sql = f"select asin, launch_time as keepa_launch_time, site_name from ods_asin_keep_date " \
f"where state = 3 and site_name='{self.site_name}'"
self.df_asin_keep_date = self.spark.sql(sqlQuery=sql)
print("self.df_asin_keep_date", self.df_asin_keep_date.show(10, truncate=False))
# 获取ods的ods_asin_variat
sql = f"select asin,color,`size`,style,state as is_sale from dim_asin_variation_info " \
f"where state is not null and site_name='{self.site_name}'"
self.df_asin_variat = self.spark.sql(sqlQuery=sql)
print("self.df_asin_variat", self.df_asin_variat.show(10, truncate=False))
print("测试打印,self.year_week_tuple:",self.year_week_tuple)
# 获取ods层的ods_asin_detail,用in方案可以同时取多周,但是要考虑去重问题
sql = f"select asin,title,title_len,price,rating,total_comments,buy_box_seller_type,page_inventory,category," \
f"volume,weight,`rank` as asin_rank,launch_time,img_num,img_type,category_state,activity_type,one_two_val," \
f"three_four_val,five_six_val,eight_val,site_name,dt from ods_asin_detail " \
f"where site_name='{self.site_name}' and dt in {self.year_week_tuple} ;"
self.df_asin_detail = self.spark.sql(sqlQuery=sql)
print("self.df_asin_detail", self.df_asin_detail.show(10, truncate=False))
# 读取dwd_bs_category_asin表
sql = f"select asin, cate_1_id as bsr_cate_1_id, dt from selection_off_line.dwd_bs_category_asin " \
f"where site='{self.site_name}' and dt= '{self.year_week}';"
self.df_bs_category = self.spark.sql(sqlQuery=sql)
print("self.df_bs_category", self.df_bs_category.show(10, truncate=False))
def handle_data(self):
# 因为取多周asin_detail,因此需要对asin_detail去重
self.handle_asin_detail_duplicated()
# 将处理好的数据(st与Asin映射数据)与asin_detail进行关联
self.handle_asin_detail_base()
# 处理判断是否为新品的标签
self.handle_asin_is_new()
self.df_asin_detail = self.df_asin_detail.drop("dt").drop("keepa_launch_time").drop("days_diff").drop("site_name")
self.df_save = self.df_asin_detail.select("search_term", "asin", "page", "page_row", "page_rank",
"data_type", "title", "title_len", "price", "rating",
"total_comments", "buy_box_seller_type", "page_inventory",
"category", "volume", "weight", "color", "`size`", "style",
"is_sale", "asin_rank", "launch_time", "is_asin_new",
"img_num", "img_type", "category_state","bsr_cate_1_id", "activity_type",
"one_two_val", "three_four_val", "five_six_val", "eight_val",
"created_time", "updated_time")
# 空值处理
self.hadnle_empty_column()
# 分区字段补全
self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
self.df_save.show(10, truncate=False)
print("self.df_save.columns:",self.df_save.columns)
# 根据asin去重,取dt最大的asin保留
def handle_asin_detail_duplicated(self):
self.df_asin_detail = self.df_asin_detail.withColumn(
"dt_sort", self.u_year_week(self.df_asin_detail.dt)
)
# 窗口内排序,按照dt降序
window = Window.partitionBy(['asin']).orderBy(
self.df_asin_detail.title.asc_nulls_last(),
self.df_asin_detail.dt_sort.desc()
)
self.df_asin_detail = self.df_asin_detail.withColumn("sort_top", F.row_number().over(window=window))
# 取按asin分组的组内第一条,就是去重后的最新asin
self.df_asin_detail = self.df_asin_detail.filter("sort_top=1")
def handle_asin_detail_base(self):
# 将基础属性join进行补全;ps:df_asin_detail、df_bs_category为周爬取,还需考虑如何尽可能补全日数据
self.df_asin_detail = self.df_st_asin_info. \
join(self.df_asin_detail, on='asin', how='left'). \
join(self.df_asin_variat, on='asin', how='left'). \
join(self.df_bs_category, on='asin', how='left')
print("df_asin_detail:", self.df_asin_detail.show(10, truncate=False))
# 根据asin,且launch_time为空的,去找keep_date补全launch_time
self.df_asin_detail = self.df_asin_detail. \
join(self.df_asin_keep_date, on='asin',how='left')
#如果自身的launch_time为null则用keepa_launch_time补全,否则保留自己的launch_time
print("df_asin_detail join df_asin_keep_date: ", self.df_asin_detail.show(10, truncate=False))
self.df_asin_detail = self.df_asin_detail.withColumn("launch_time_new",F.when(F.isnull("launch_time"), F.col("keepa_launch_time")))
#删除旧的launch_time,并将处理后的launch_time_new更名为launch_time
self.df_asin_detail = self.df_asin_detail.drop("launch_time").withColumnRenamed("launch_time_new","launch_time")
# 判断是否新上asin处理逻辑
def handle_asin_is_new(self):
# 生成days_diff字段为判断is_asin_new做准备
print("处理days_diff,为判断是否asin_new做准备")
self.df_asin_detail = self.df_asin_detail.withColumn("days_diff", self.u_launch_time(
self.df_asin_detail.launch_time, F.lit(self.date_type), F.lit(self.date_info)))
# 通过dasy_diff走自定义udf,生成is_asin_new字段(是否asin新品标记)
print("处理is_asin_new标签")
self.df_asin_detail = self.df_asin_detail.withColumn("is_asin_new", self.u_days_diff(
self.df_asin_detail.days_diff))
print("self.df_asin_detail:", self.df_asin_detail.show(10, truncate=False))
# 空值处理
def hadnle_empty_column(self):
# int类型空值处理
self.df_save = self.df_save.\
na.fill({"page_row":0,"page_rank":0, "title_len":0,"price":0.0,"rating":0.0,"buy_box_seller_type":0,"page_inventory":0,
"weight":0.0,"is_sale":-1,"asin_rank":0,"is_asin_new":-1,"img_num":0,"bsr_cate_1_id":-999999})
# String类型空值处理
self.df_save = self.df_save.\
na.fill({"title":"null","category":"null","volume":"null","color":"null","size":"null","style":"null",
"launch_time":"1900-01-01","img_type":"null","activity_type":"null"})
# 一些需要特殊处理的
self.df_save.withColumn("color",F.when(F.col("color")=="None",F.lit("null")))
self.df_save.withColumn("size", F.when(F.col("size") == "None", F.lit("null")))
self.df_save.withColumn("style", F.when(F.col("style") == "None", F.lit("null")))
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
handle_obj = DimStAsinDetail(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()