1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
import os
import sys
from pyspark.sql.types import DoubleType, StringType
sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil, DateTypes
# 上级目录
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from yswg_utils.common_udf import parse_weight_str
from yswg_utils.common_udf import udf_handle_string_null_value
from yswg_utils.common_df import get_node_first_id_df
from utils.redis_utils import RedisUtils
class DimCalAsinDetail(object):
def __init__(self, site_name, date_type, date_info):
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
# 初始化参数
self.partitions_by = ['site_name']
self.partitions_num = CommonUtil.reset_partitions(self.site_name, partitions_num=80)
app_name = f"{self.__class__.__name__}:{self.site_name}"
self.spark = SparkUtil.get_spark_session(app_name)
self.hive_table = f'dim_cal_asin_history_detail'
self.partition_dict = {
"site_name": site_name
}
self.udf_parse_weight_str_reg = self.spark.udf.register("udf_parse_weight_str_reg", self.udf_parse_weight_str, DoubleType())
self.udf_handle_null_value = self.spark.udf.register("udf_handle_null_value", udf_handle_string_null_value, StringType())
@staticmethod
def udf_parse_weight_str(weight_str: str, site_name: str):
"""
解析重量
:param weight_str:
:param site_name:
:return:
"""
if weight_str is None:
return None
weight_val, unit = parse_weight_str(weight_str, site_name)
if weight_val != 'none' and weight_val is not None:
return float(weight_val)
def run(self):
print(f"读取数据中.....")
if self.date_type == 'all':
# 读取dim_asin_detail
sql = f"""select
asin,
asin_img_url,
asin_title,
asin_title_len,
asin_category_desc,
asin_rank,
asin_volume,
asin_weight,
asin_color,
asin_size,
asin_style,
asin_price,
asin_rating,
asin_total_comments,
asin_material,
asin_brand_name,
asin_page_inventory,
asin_buy_box_seller_type,
asin_launch_time,
asin_img_num,
asin_img_type,
asin_is_sale,
bsr_cate_1_id,
bsr_cate_current_id,
asin_is_amazon,
asin_is_FBA,
asin_is_FBM,
asin_is_other,
udf_handle_null_value(node_id) as node_id,
asin_is_picture,
asin_is_video,
asin_is_aadd,
date_format(created_time,'{CommonUtil._date_time_format}') as asin_crawl_date
from dim_asin_detail
where site_name='{self.site_name}'
and date_type='month' ;
"""
self.date_type = 'day_all'
elif self.date_type in (DateTypes.week.name, DateTypes.month.name, DateTypes.month_week.name):
sql = f"""select
asin,
asin_img_url,
asin_title,
asin_title_len,
asin_category_desc,
asin_rank,
asin_volume,
asin_weight,
asin_color,
asin_size,
asin_style,
asin_price,
asin_rating,
asin_total_comments,asin_material,
asin_brand_name,
asin_page_inventory,
asin_buy_box_seller_type,
asin_launch_time,
asin_img_num,
asin_img_type,
asin_is_sale,
bsr_cate_1_id,
bsr_cate_current_id,
asin_is_amazon,
asin_is_FBA,
asin_is_FBM,
asin_is_other,
udf_handle_null_value(node_id) as node_id,
asin_is_picture,
asin_is_video,
asin_is_aadd,
date_format(created_time,'{CommonUtil._date_time_format}') as asin_crawl_date
from dim_asin_detail
where site_name='{self.site_name}'
and date_type='{self.date_type}'
and date_info = '{self.date_info}';
"""
else:
sql = f"""
select
asin,
asin_img_url,
asin_title,
asin_title_len,
asin_category_desc,
asin_rank,
asin_volume,
asin_weight,
asin_color,
asin_size,
asin_style,
asin_price,
asin_rating,
asin_total_comments,asin_material,
asin_brand_name,
asin_page_inventory,
asin_buy_box_seller_type,
asin_launch_time,
asin_img_num,
asin_img_type,
asin_is_sale,
bsr_cate_1_id,
bsr_cate_current_id,
asin_is_amazon,
asin_is_FBA,
asin_is_FBM,
asin_is_other,
node_id,
asin_is_picture,
asin_is_video,
asin_is_aadd,
date_format(created_time,'{CommonUtil._date_time_format}') as asin_crawl_date
from dim_asin_detail
where site_name='{self.site_name}'
limit 0
"""
print("======================整合搜索词 day asin 中... sql如下======================")
print(sql)
df_asin_detail = self.spark.sql(sqlQuery=sql)
self_asin_sql = None
if self.date_type == DateTypes.day.name:
self_asin_sql = f"""
select asin as asin,
img_url as asin_img_url,
title as asin_title,
title_len as asin_title_len,
category as asin_category_desc,
rank as asin_rank,
volume as asin_volume,
udf_parse_weight_str_reg(weight_str,'{self.site_name}') as asin_weight,
null as asin_color,
null as asin_size,
null as asin_style,
price as asin_price,
rating as asin_rating,
total_comments as asin_total_comments,
material as asin_material,
brand as asin_brand_name,
page_inventory as asin_page_inventory,
buy_box_seller_type as asin_buy_box_seller_type,
launch_time as asin_launch_time,
img_num as asin_img_num,
img_type as asin_img_type,
null as asin_is_sale,
null as bsr_cate_1_id,
null as bsr_cate_current_id,
if(buy_box_seller_type == 1, 1, 0) as asin_is_amazon,
if(buy_box_seller_type == 2, 1, 0) as asin_is_FBA,
if(buy_box_seller_type == 3, 1, 0) as asin_is_FBM,
if(buy_box_seller_type == 4, 1, 0) as asin_is_other,
node_id,
if(locate(1, img_type) > 0, 1, 0) as asin_is_picture,
if(locate(2, img_type) > 0, 1, 0) as asin_is_video,
if(locate(3, img_type) > 0, 1, 0) as asin_is_aadd,
date_format(created_at, '{CommonUtil._date_time_format}') as asin_crawl_date
from ods_self_asin_detail
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}';
"""
elif self.date_type == "day_all":
self_asin_sql = f"""
select asin as asin,
img_url as asin_img_url,
title as asin_title,
title_len as asin_title_len,
category as asin_category_desc,
rank as asin_rank,
volume as asin_volume,
udf_parse_weight_str_reg(weight_str, '{self.site_name}') as asin_weight,
null as asin_color,
null as asin_size,
null as asin_style,
price as asin_price,
rating as asin_rating,
total_comments as asin_total_comments,
material as asin_material,
brand as asin_brand_name,
page_inventory as asin_page_inventory,
buy_box_seller_type as asin_buy_box_seller_type,
launch_time as asin_launch_time,
img_num as asin_img_num,
img_type as asin_img_type,
null as asin_is_sale,
null as bsr_cate_1_id,
null as bsr_cate_current_id,
if(buy_box_seller_type == 1, 1, 0) as asin_is_amazon,
if(buy_box_seller_type == 2, 1, 0) as asin_is_FBA,
if(buy_box_seller_type == 3, 1, 0) as asin_is_FBM,
if(buy_box_seller_type == 4, 1, 0) as asin_is_other,
if(locate(1, img_type) > 0, 1, 0) as asin_is_picture,
node_id,
if(locate(2, img_type) > 0, 1, 0) as asin_is_video,
if(locate(3, img_type) > 0, 1, 0) as asin_is_aadd,
date_format(created_at, '{CommonUtil._date_time_format}') as asin_crawl_date
from (
select *,
row_number() over (partition by asin order by date_info desc) as row_number
from ods_self_asin_detail
where site_name = '{self.site_name}'
and date_type = '{DateTypes.day.name}'
)
where row_number = 1;
"""
if self_asin_sql is not None:
print("======================整合day asin 中sql如下======================")
print(self_asin_sql)
df_self_asin_detail = self.spark.sql(sqlQuery=self_asin_sql)
# 合并要更新的数据
df_asin_detail = df_asin_detail.unionByName(df_self_asin_detail, allowMissingColumns=False)
pass
# 判断是否有数据需要整合
if df_asin_detail.first() == None:
print("============================无数据跳过===================================")
return
print("======================获取dim_bsr_category_tree first_id======================")
df_node_cate = get_node_first_id_df(self.site_name, self.spark)
df_asin_detail = df_asin_detail.join(
df_node_cate, on=['node_id'], how='left'
)
# 读取dim_cal_asin_history_detail
sql = f"""select
asin,
asin_img_url,
asin_title,
asin_title_len,
asin_category_desc,
asin_rank,
asin_volume,
asin_weight,
asin_color,
asin_size,
asin_style,
asin_price,
asin_rating,
asin_total_comments,
asin_material,
asin_brand_name,
asin_page_inventory,
asin_buy_box_seller_type,
asin_launch_time,
asin_img_num,
asin_img_type,
asin_is_sale,
bsr_cate_1_id,
bsr_cate_current_id,
asin_is_amazon,
asin_is_FBA,
asin_is_FBM,
asin_is_other,
asin_is_picture,
asin_is_video,
asin_is_aadd,
asin_crawl_date,
node_id,
category_first_id
from dim_cal_asin_history_detail
where site_name='{self.site_name}' ;
"""
df_asin_cal_detail = self.spark.sql(sqlQuery=sql)
print("======================查询sql如下======================")
print(sql)
df_asin_detail = self.handle_df_duplicated(df_asin_detail, df_asin_cal_detail)
df_save = self.handle_column(df_asin_detail).repartition(self.partitions_num)
# print("self.df_save", df_save.show(10, truncate=False))
# quit()
CommonUtil.save_or_update_table(spark_session=self.spark, hive_tb_name=self.hive_table,
partition_dict=self.partition_dict, df_save=df_save, drop_exist_tmp_flag=False)
print("success")
# 根据asin去重,取dt最大的asin保留
def handle_df_duplicated(self, df_asin_detail, df_asin_cal_detail):
print("针对asin进行数据去重...")
# 将新老数据进行合并
# df_asin_detail = df_asin_detail.union(df_asin_cal_detail)
df_asin_detail = df_asin_detail.unionByName(df_asin_cal_detail, allowMissingColumns=True)
# asin窗口内排序,按照dt降序
window = Window.partitionBy(['asin']).orderBy(
df_asin_detail.asin_crawl_date.desc_nulls_last()
)
df_asin_detail = df_asin_detail.withColumn("sort_top", F.row_number().over(window=window))
# 取按asin分组的组内第一条,就是去重后的最新asin_detail
df_asin_detail = df_asin_detail.filter("sort_top=1")
# 去除掉排序字段
df_asin_detail = df_asin_detail.drop("asin_dt_top", "dt")
return df_asin_detail
def handle_column(self, df_asin_detail):
df_save = df_asin_detail.select("asin",
"asin_title",
F.col("asin_title_len").cast('int').alias('asin_title_len'),
"asin_category_desc",
F.col("asin_rank").cast('int').alias('asin_rank'),
self.udf_handle_null_value("asin_volume").alias("asin_volume"),
F.col("asin_weight").cast('double').alias('asin_weight'),
self.udf_handle_null_value("asin_color").alias("asin_color"),
self.udf_handle_null_value("asin_size").alias("asin_size"),
self.udf_handle_null_value("asin_style").alias("asin_style"),
"asin_price",
"asin_rating",
F.col("asin_total_comments").cast('int').alias('asin_total_comments'),
self.udf_handle_null_value("asin_material").alias("asin_material"),
self.udf_handle_null_value("asin_brand_name").alias("asin_brand_name"),
"bsr_cate_1_id",
"bsr_cate_current_id",
F.col("asin_page_inventory").cast('int').alias('asin_page_inventory'),
F.col("asin_buy_box_seller_type").cast('int').alias('asin_buy_box_seller_type'),
"asin_is_amazon",
"asin_is_fba",
"asin_is_fbm",
"asin_is_other",
F.col("asin_is_sale").cast('int').alias('asin_is_sale'),
"asin_launch_time",
F.col("asin_img_num").cast('int').alias('asin_img_num'),
"asin_img_type",
"asin_is_picture",
F.col("asin_is_video").cast('int').alias('asin_is_video'),
F.col("asin_is_aadd").cast('int').alias('asin_is_aadd'),
self.udf_handle_null_value("asin_img_url").alias("asin_img_url"),
"asin_crawl_date",
"node_id",
"category_first_id"
)
# 预留字段补全
df_save = df_save.withColumn("re_string_field1", F.lit(None))
df_save = df_save.withColumn("re_string_field2", F.lit(None))
df_save = df_save.withColumn("re_string_field3", F.lit(None))
df_save = df_save.withColumn("re_int_field1", F.lit(None))
df_save = df_save.withColumn("re_int_field2", F.lit(None))
df_save = df_save.withColumn("re_int_field3", F.lit(None))
# 分区字段补全
df_save = df_save.withColumn("site_name", F.lit(self.site_name))
return df_save
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None) # 参数1:站点
date_type = CommonUtil.get_sys_arg(2, None) # 参数2:类型:week/4_week/month/quarter/day
date_info = CommonUtil.get_sys_arg(3, None) # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
lock_name = "dim_cal_asin_history_detail"
if date_type == "all":
# 如果执行数据为all的情况,非自然解锁情况,则需锁设定该表90分钟
lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=90 * 60, retry_flag=True, retry_time=10 * 60)
else:
lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=30 * 60, retry_flag=True, retry_time=10 * 60)
if lock_flag:
try:
obj = DimCalAsinDetail(site_name, date_type, date_info)
obj.run()
finally:
# 执行完成后释放锁
RedisUtils.release_redis_lock(lock_name)