dwt_fb_base_report.py
19.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
"""
@Author : HuangJian
@Description : 店铺基础信息报表
@SourceTable :
①ods_seller_account_feedback
②ods_seller_asin_account
③ods_asin_detail_product
④ods_seller_asin_syn
⑤ods_asin_variat
@SinkTable : dwt_fb_base_report
@SinkTable : dwt_fb_asin_info
@CreateTime : 2022/07/05 15:48
@UpdateTime : 2022/07/05 15:48
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil, DateTypes
from pyspark.sql.types import StringType, IntegerType, DoubleType
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from yswg_utils.common_udf import udf_get_package_quantity
from yswg_utils.common_udf import udf_new_asin_flag
from utils.db_util import DBUtil
class DwtFbBaseReport(object):
def __init__(self, site_name, date_type, date_info):
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.hive_tb = "dwt_fb_base_report"
self.partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)
# 创建spark_session对象相关
app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
# 获取不同维度日期下的计算日期YYYY-MM-DD
self.cal_date = CommonUtil.get_calDay_by_dateInfo(self.spark, self.date_type, self.date_info)
self.last_month = CommonUtil.get_month_offset(date_info, -1)
# 全局df初始化
self.df_fb_feedback = self.spark.sql(f"select 1+1;")
self.df_fb_asin = self.spark.sql(f"select 1+1;")
self.df_top20_asin = self.spark.sql(f"select 1+1;")
self.df_asin_history = self.spark.sql(f"select 1+1;")
self.df_asin_parent = self.spark.sql(f"select 1+1;")
self.df_fb_agg = self.spark.sql(f"select 1+1;")
self.df_fb_asin_detail = self.spark.sql(f"select 1+1;")
self.df_self_seller_id = self.spark.sql(f"select 1+1;")
self.df_seller_account = self.spark.sql(f"select 1+1;")
# 初始化UDF函数
self.udf_new_asin_flag = F.udf(udf_new_asin_flag, IntegerType())
self.u_judge_package_quantity = F.udf(udf_get_package_quantity, IntegerType())
def read_data(self):
# ods_seller_account_feedback 月度店铺报告表主表
print("获取 ods_seller_account_feedback")
sql = f"""select cur_fd.seller_id,
cur_fd.fb_web_asin_num,
cur_fd.fb_country_name,
cur_fd.count_30_day_num,
cur_fd.count_1_year_num,
cur_fd.count_lifetime_num,
cur_fd.fb_crawl_date,
round((count_30_day_num - last_30_day_num) / last_30_day_num, 4) as count_30_day_rate,
round((count_1_year_num - last_1_year_num) / last_1_year_num, 4) as count_1_year_rate,
round((count_lifetime_num - last_lifetime_num) / last_lifetime_num, 4) as count_life_time_rate
from (select seller_id,
num as fb_web_asin_num,
count_30_day as count_30_day_num,
count_1_year as count_1_year_num,
count_lifetime as count_lifetime_num,
country_name as fb_country_name,
date_format(updated_at, 'yyyy-MM-dd HH:mm:ss') as fb_crawl_date
from ods_seller_account_feedback
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
and length(seller_id) > 2 ) cur_fd
left join (select seller_id,
count_30_day as last_30_day_num,
count_1_year as last_1_year_num,
count_lifetime as last_lifetime_num
from ods_seller_account_feedback
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.last_month}'
and length(seller_id) > 2 ) last_fd
on cur_fd.seller_id = last_fd.seller_id"""
self.df_fb_feedback = self.spark.sql(sqlQuery=sql)
self.df_fb_feedback = self.df_fb_feedback.drop_duplicates(['seller_id']).cache()
print(sql)
# 获取我们内部的店铺与asin的数据库(从搜索词抓下来,店铺与asin的关系表)
print("获取 ods_seller_asin_account")
sql = f"""
select seller_id,asin from ods_seller_asin_account
where site_name='{self.site_name}'
and date_format(created_at,'yyyy-MM-dd') <= '{self.cal_date}'
"""
self.df_fb_asin = self.spark.sql(sqlQuery=sql)
self.df_fb_asin = self.df_fb_asin.drop_duplicates(['seller_id', 'asin'])
print(sql)
# 获取店铺top 20 asin信息计算
print("获取 ods_asin_detail_product")
sql = f"""
select
seller_id,
asin,
price,
rating,
total_comments,
row_num as rank
from ods_asin_detail_product
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
and row_num <= 20
"""
self.df_top20_asin = self.spark.sql(sqlQuery=sql)
self.df_top20_asin = self.df_top20_asin.drop_duplicates(['seller_id', 'asin'])
print(sql)
# 获取dim_cal_asin_history提取launch_time用于计算是否新品
print("获取 dim_cal_asin_history")
sql = f"""
select
asin,
asin_title,
asin_img_url,
asin_price,
asin_rating,
asin_total_comments,
asin_weight,
asin_volume,
asin_launch_time
from dim_cal_asin_history_detail
where site_name = '{self.site_name}'
"""
self.df_asin_history = self.spark.sql(sqlQuery=sql)
print(sql)
# 获取ods_asin_variat提取parent_asin用于计算是多变体
print("获取 dim_asin_variation_info")
sql = f"select asin,parent_asin from dim_asin_variation_info " \
f"where site_name='{self.site_name}'" \
f" and asin != parent_asin "
self.df_asin_parent = self.spark.sql(sqlQuery=sql)
print(sql)
# 获取ods_seller_account_syn提取account_name
print("获取 ods_seller_account_syn")
sql = f"select seller_id,account_name,id from ods_seller_account_syn " \
f"where site_name='{self.site_name}'"
self.df_seller_account = self.spark.sql(sqlQuery=sql)
# 进行去重
self.df_seller_account = self.df_seller_account.orderBy(self.df_seller_account.id.desc())
self.df_seller_account = self.df_seller_account.drop_duplicates(['seller_id'])
self.df_seller_account = self.df_seller_account.drop('id')
print(sql)
# 获取mysql:selection.accounts ,用于排除公司内部店铺
print("获取 selection.accounts")
sql = f"""
select seller_id, 1 as is_self_fb from
(select distinct seller_id from selection.accounts) t1
"""
conn_info = DBUtil.get_connection_info("mysql", "us")
self.df_self_seller_id = SparkUtil.read_jdbc_query(
session=self.spark,
url=conn_info["url"],
pwd=conn_info["pwd"],
username=conn_info["username"],
query=sql
)
def handle_fb_top_20(self):
print("处理asin_detail_product的top20指标")
self.df_top20_asin = self.df_top20_asin. \
join(self.df_asin_history.select('asin', 'asin_launch_time'), on='asin', how='left')
self.df_top20_asin = self.df_top20_asin.withColumn("is_asin_new",
self.udf_new_asin_flag(F.col('asin_launch_time'),
F.lit(self.cal_date)))
self.df_top20_asin = self.df_top20_asin.groupby('seller_id').agg(
F.avg('price').alias('top_20_avg_price'),
F.avg('rating').alias('top_20_avg_rating'),
F.avg('total_comments').alias('top_20_avg_total_comments'),
F.sum('is_asin_new').alias('top_20_new_asin_num')
)
def handle_fb_cal_agg(self):
print("处理店铺基础报表聚合数据")
df_fb_join = self.df_fb_feedback.select('seller_id').join(
self.df_fb_asin, on='seller_id', how='left'
)
df_fb_join = df_fb_join.join(
self.df_asin_history, on='asin', how='left'
).join(
self.df_asin_parent, on='asin', how='left'
)
# 计算是否新品
df_fb_join = df_fb_join.withColumn("is_asin_new",
self.udf_new_asin_flag(F.col('asin_launch_time'), F.lit(self.cal_date)))
# 计算店铺-asin的打包数量
df_fb_join = df_fb_join.withColumn('asin_package_quantity', self.u_judge_package_quantity(F.col('asin_title')))
# 打包数量标记 打包数量>=2的商品数标识
df_fb_join = df_fb_join.withColumn('is_pq_flag', F.when(F.col('asin_package_quantity') >= 2, F.lit(1)))
# 补充parent_asin-如果为null则说明没有匹配到变体表中的asin,则设定parent_asin为自己
df_fb_join = df_fb_join.withColumn('parent_asin', F.when(F.col('parent_asin').isNull(), F.col('asin'))
.otherwise(F.col('parent_asin')))
self.df_fb_asin_detail = df_fb_join.cache()
fb_counts_agg = self.df_fb_asin_detail.groupby(['seller_id']).agg(
F.count('asin').alias('fb_asin_total'),
F.sum('is_asin_new').alias('fb_new_asin_num'),
F.sum('is_pq_flag').alias('fb_pq_num')
)
# 计算新品比率
fb_counts_agg = fb_counts_agg.withColumn('fb_new_asin_rate',
F.round(F.col('fb_new_asin_num') / F.col('fb_asin_total'), 4))
# 计算打包数量比率
fb_counts_agg = fb_counts_agg.withColumn('fb_pq_rate',
F.round(F.col('fb_pq_num') / F.col('fb_asin_total'), 4))
fb_counts_agg = fb_counts_agg.select('seller_id', 'fb_new_asin_num', 'fb_pq_num',
'fb_asin_total', 'fb_new_asin_rate', 'fb_pq_rate')
# 计算店铺多数量占比 有变体asin数量/(有变体asin数量+单产品asin数量) 逻辑实现
# 计算多变体比率
df_variant_radio = self.df_fb_asin_detail.groupby(['seller_id', 'parent_asin']).agg(
F.count('asin').alias('asin_son_count')
)
# 打上多变体标签 如果asin_son_count > 1则说明该店铺该asin存在多变体
df_variant_radio = df_variant_radio.withColumn('is_variant_flag',
F.when(F.col('asin_son_count') > 1, F.lit(1)))
# 按照seller_id再次聚合,得出多变体计算分子分母
df_variant_radio = df_variant_radio.groupby(['seller_id']).agg(
F.sum('is_variant_flag').alias('fb_more_variant_num'),
F.count('parent_asin').alias('fb_variant_asin_total')
)
# 得出多变体比率
df_variant_radio = df_variant_radio.withColumn('fb_variant_rate',
F.round(F.col('fb_more_variant_num') / F.col(
'fb_variant_asin_total'), 4))
df_variant_radio = df_variant_radio.select('seller_id',
'fb_more_variant_num',
'fb_variant_asin_total',
'fb_variant_rate')
# 合并计算结果
self.df_fb_agg = self.df_fb_feedback.join(
fb_counts_agg, on='seller_id', how='left'
).join(
df_variant_radio, on='seller_id', how='left'
).join(
self.df_top20_asin, on='seller_id', how='left'
)
# 关联公司店铺df,并标记是否公司内部店铺
self.df_fb_agg = self.df_fb_agg.join(
self.df_self_seller_id, on='seller_id', how='left'
)
# 没有关联上的赋值为0,则不是公司内部店铺
self.df_fb_agg = self.df_fb_agg.na.fill({"is_self_fb": 0})
# 输出数据集-report
def save_data_report(self):
# 关联ods_seller_account_syn,带回account_name-采用inner join过滤掉库中无店铺名称的数据
df_save = self.df_fb_agg.join(self.df_seller_account, on='seller_id', how='inner')
df_save = df_save.select(
F.col('seller_id'),
F.col('account_name'),
F.col('fb_country_name'),
F.col('fb_web_asin_num'),
F.col('is_self_fb'),
F.round('top_20_avg_price', 4).alias('top_20_avg_price'),
F.round('top_20_avg_rating', 4).alias('top_20_avg_rating'),
F.ceil('top_20_avg_total_comments').alias('top_20_avg_total_comments'),
F.col('top_20_new_asin_num'),
F.col('count_30_day_num'),
F.col('count_1_year_num'),
F.col('count_lifetime_num'),
F.col('count_30_day_rate'),
F.col('count_1_year_rate'),
F.col('count_life_time_rate'),
F.col('fb_new_asin_num'),
F.col('fb_asin_total'),
F.col('fb_new_asin_rate'),
F.col('fb_variant_rate'),
F.col('fb_more_variant_num'),
F.col('fb_variant_asin_total'),
F.col('fb_pq_rate'),
F.col('fb_pq_num'),
F.col('fb_crawl_date'),
F.when(F.col('fb_country_name').isNull(), F.lit(0))
.when(F.col('fb_country_name') == self.site_name.upper(), F.lit(1))
.when(F.col('fb_country_name') == 'CN', F.lit(2))
.otherwise(F.lit(3)).alias('fb_country_name_type'),
F.when(F.col('fb_asin_total').isNull(), F.lit(0))
.when(F.col('fb_asin_total') <= 300, F.lit(1))
.when(F.col('fb_asin_total') <= 1000, F.lit(2))
.otherwise(F.lit(3)).alias('fb_account_type'),
F.when(F.col('fb_asin_total').isNull(), F.lit(0))
.when(F.col('fb_asin_total') == 0, F.lit(1))
.when(F.col('fb_asin_total') <= 50, F.lit(2))
.when(F.col('fb_asin_total') <= 200, F.lit(3))
.when(F.col('fb_asin_total') <= 500, F.lit(4))
.when(F.col('fb_asin_total') <= 1000, F.lit(5))
.when(F.col('fb_asin_total') <= 5000, F.lit(6))
.when(F.col('fb_asin_total') <= 10000, F.lit(7))
.otherwise(F.lit(8)).alias('fb_asin_total_type'),
F.when(F.col('fb_new_asin_num').isNull(), F.lit(0))
.when(F.col('fb_new_asin_num') == 0, F.lit(1))
.when(F.col('fb_new_asin_num') <= 5, F.lit(2))
.when(F.col('fb_new_asin_num') <= 10, F.lit(3))
.when(F.col('fb_new_asin_num') <= 20, F.lit(4))
.when(F.col('fb_new_asin_num') <= 30, F.lit(5))
.when(F.col('fb_new_asin_num') <= 50, F.lit(6))
.when(F.col('fb_new_asin_num') <= 100, F.lit(7))
.otherwise(F.lit(8)).alias('fb_new_asin_num_type'),
F.when(F.col('fb_new_asin_rate').isNull(), F.lit(0))
.when(F.col('fb_new_asin_rate') == 0, F.lit(1))
.when(F.col('fb_new_asin_rate') <= 0.05, F.lit(2))
.when(F.col('fb_new_asin_rate') <= 0.1, F.lit(3))
.when(F.col('fb_new_asin_rate') <= 0.2, F.lit(4))
.when(F.col('fb_new_asin_rate') <= 0.5, F.lit(5))
.otherwise(F.lit(6)).alias('fb_new_asin_rate_type'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
F.lit(None).alias('usr_mask_type'),
F.lit(None).alias('usr_mask_progress'),
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info')
)
# CommonUtil.check_schema(self.spark, df_save, self.hive_tb)
print(f"清除hdfs目录中:{self.hdfs_path}")
HdfsUtils.delete_file_in_folder(self.hdfs_path)
df_save = df_save.repartition(1)
partition_by = ["site_name", "date_type", "date_info"]
print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success")
# 输出数据集-asin_info
def save_data_asin_info(self):
# 只保留新品的详情
self.df_fb_asin_detail = self.df_fb_asin_detail.filter('is_asin_new = 1')
# 关联ods_seller_account_syn,带回account_name-采用inner join过滤掉库中无店铺名称的数据;
df_save_asin = self.df_fb_asin_detail.join(self.df_seller_account, on='seller_id', how='inner')
df_save_asin = df_save_asin.select(
F.col('seller_id'),
F.col('account_name'),
F.col('asin'),
F.col('asin_title'),
F.col('asin_launch_time'),
F.col('is_asin_new'),
F.col('asin_package_quantity'),
F.col('is_pq_flag'),
F.col('parent_asin'),
F.col('asin_img_url'),
F.col('asin_price'),
F.col('asin_rating'),
F.col('asin_total_comments'),
F.col('asin_weight'),
F.col('asin_volume'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info')
)
hive_tb_name = 'dwt_fb_asin_info'
hdfs_path_asin_info = CommonUtil.build_hdfs_path(hive_tb_name, partition_dict=self.partition_dict)
print(f"清除hdfs目录中:{hdfs_path_asin_info}")
HdfsUtils.delete_file_in_folder(hdfs_path_asin_info)
df_save_asin = df_save_asin.repartition(50)
partition_by = ["site_name", "date_type", "date_info"]
print(f"当前存储的表名为:{hive_tb_name},分区为{partition_by}", )
df_save_asin.write.saveAsTable(name=hive_tb_name, format='hive', mode='append', partitionBy=partition_by)
print("success")
def run(self):
self.read_data()
self.handle_fb_top_20()
self.handle_fb_cal_agg()
self.save_data_report()
self.save_data_asin_info()
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None) # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
obj = DwtFbBaseReport(site_name=site_name, date_type=date_type, date_info=date_info)
obj.run()