dwt_nsr_asin_detail_all.py
8.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from pyspark.sql import functions as F, Window
from utils.common_util import CommonUtil, DateTypes
from utils.spark_util import SparkUtil
from pyspark.sql.types import BooleanType
from yswg_utils.common_df import get_self_asin_df
from yswg_utils.common_df import get_bsr_category_tree_df, get_asin_unlanuch_df
from yswg_utils.common_udf import category_craw_flag
"""
聚合nsr榜单Asin
"""
class DwtBsrAsinDetailAll(object):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info
app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
self.hive_tb = "dwt_nsr_asin_detail_all"
self.current_month = CommonUtil.reformat_date(self.date_info, "%Y-%m-%d", "%Y-%m", )
self.udf_category_craw_flag = F.udf(category_craw_flag, BooleanType())
pass
def run(self):
df_dwt_flow_asin_part = CommonUtil.select_partitions_df(self.spark, "dwt_flow_asin")
dwt_flow_asin_last_month = df_dwt_flow_asin_part \
.filter(f"date_type = '{DateTypes.month.name}' and site_name = '{self.site_name}' ") \
.selectExpr("max(date_info)").rdd.flatMap(lambda it: it).collect()[0]
if date_info <= '2023-08-14':
sql = f"""
with asin_all as (
select *,
row_number() over ( order by site_name) as id
from dwd_nsr_asin_rank
where site_name = '{self.site_name}'
and date_type = 'last30day'
and date_info = '{self.date_info}'
),
asin_detail as (
select *
from dwt_nsr_asin_detail
where site_name = '{self.site_name}'
and date_info = '{self.current_month}'
)
select asin_all.asin,
category_id,
bsr_rank,
is_1_day_flag,
is_7_day_flag,
is_30_day_flag,
bsr_count,
is_asin_new,
is_asin_bsr_new,
last_bsr_day,
title,
img_url,
img_type,
ao_val,
rating,
total_comments,
bsr_orders,
bsr_orders_change,
price,
weight,
launch_time,
brand_name,
buy_box_seller_type,
account_name,
volume,
last_update_time,
asin_air_freight_gross_margin,
asin_ocean_freight_gross_margin
from asin_all
left join asin_detail on asin_all.asin = asin_detail.asin;
"""
else:
sql = f"""
with asin_all as (
select asin,
category_id,
bsr_rank,
is_1_day_flag,
is_7_day_flag,
is_30_day_flag,
bsr_count,
is_asin_new,
is_asin_bsr_new,
last_bsr_day,
row_number() over ( order by site_name) as id
from dwd_nsr_asin_rank
where site_name = '{self.site_name}'
and date_type = 'last30day'
and date_info = '{self.date_info}'
),
account_name_tb as (
select asin,
first(fd_account_name) as account_name
from dim_fd_asin_info
where site_name = '{self.site_name}'
group by asin
),
asin_his as (
select asin,
asin_title as title,
asin_img_url as img_url,
asin_img_type as img_type,
asin_rating as rating,
asin_total_comments as total_comments,
asin_price as price,
asin_weight as weight,
asin_launch_time as launch_time,
asin_volume as volume,
asin_brand_name as brand_name,
asin_buy_box_seller_type as buy_box_seller_type,
asin_crawl_date as last_update_time
from dim_cal_asin_history_detail
where site_name = '{self.site_name}'
),
flow_asin as (
select asin,
asin_ao_val as ao_val,
bsr_orders as bsr_orders,
asin_bsr_orders_change as bsr_orders_change,
asin_air_freight_gross_margin as asin_air_freight_gross_margin,
asin_ocean_freight_gross_margin as asin_ocean_freight_gross_margin
from dwt_flow_asin
where site_name = '{self.site_name}'
and date_type = '{DateTypes.month.name}'
and date_info = '{dwt_flow_asin_last_month}'
)
select asin_all.asin,
category_id,
bsr_rank,
is_1_day_flag,
is_7_day_flag,
is_30_day_flag,
bsr_count,
is_asin_new,
is_asin_bsr_new,
last_bsr_day,
title,
img_url,
img_type,
ao_val,
rating,
total_comments,
bsr_orders,
bsr_orders_change,
price,
weight,
launch_time,
brand_name,
buy_box_seller_type,
account_name,
volume,
last_update_time,
asin_air_freight_gross_margin,
asin_ocean_freight_gross_margin
from asin_all
left join account_name_tb on asin_all.asin = account_name_tb.asin
left join flow_asin on asin_all.asin = flow_asin.asin
left join asin_his on asin_all.asin = asin_his.asin
"""
print("======================查询sql如下======================")
print(sql)
df_all = self.spark.sql(sql)
df_self_asin = get_self_asin_df(self.site_name, self.spark).select(
F.col("asin"),
F.lit(1).alias("asin_type"),
)
category_df = get_bsr_category_tree_df(self.site_name, self.spark).select(
F.col("category_id"),
F.col("category_first_id"),
)
df_unlanuch = get_asin_unlanuch_df(self.site_name, self.spark)
df_all = df_all \
.join(df_self_asin, on=["asin"], how='left') \
.join(df_unlanuch, on=["asin"], how='left') \
.join(category_df, on=["category_id"], how='left')
df_all = df_all.withColumn("crawl_flag", self.udf_category_craw_flag(F.col("category_first_id"), F.col("asin")))
# 生成id
df_all = df_all.withColumn("id", F.row_number().over(window=Window.orderBy(F.lit(1))))
df_all = df_all.select(
F.col("id"),
F.col('asin'),
F.col('category_id'),
F.col('bsr_rank'),
F.col('is_1_day_flag'),
F.col('is_7_day_flag'),
F.col('is_30_day_flag'),
F.col('bsr_count'),
F.col('is_asin_new'),
F.col('is_asin_bsr_new'),
F.col('last_bsr_day'),
F.col('title'),
F.col('img_url'),
F.col('img_type'),
F.col('ao_val'),
F.col('rating'),
F.col('total_comments'),
F.col('bsr_orders'),
F.col('bsr_orders_change'),
F.col('price'),
F.col('weight'),
F.col('launch_time'),
F.trim(F.col('brand_name')).alias("brand_name"),
F.col('buy_box_seller_type'),
F.col('account_name'),
F.col('volume'),
F.col('last_update_time'),
F.col('asin_air_freight_gross_margin'),
F.col('asin_ocean_freight_gross_margin'),
F.col("category_first_id"),
F.when(F.col("asin_type").isNotNull(), F.lit(1))
.when(F.col("crawl_flag") == False, F.lit(2))
.otherwise(0).alias("asin_type"),
# asin 下架时间
F.col("asin_unlaunch_time"),
F.lit(self.site_name).alias("site_name"),
F.lit(self.date_info).alias("date_info")
)
df_all = df_all.repartition(15)
partition_dict = {
"site_name": self.site_name,
"date_info": self.date_info
}
partition_by = list(partition_dict.keys())
# 自动对齐
df_all = CommonUtil.format_df_with_template(self.spark, df_all, self.hive_tb, roundDouble=True)
hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=partition_dict)
print(f"清除hdfs目录中:{hdfs_path}")
HdfsUtils.delete_file_in_folder(hdfs_path)
hive_tb = self.hive_tb
print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
df_all.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_info = CommonUtil.get_sys_arg(2, None)
obj = DwtBsrAsinDetailAll(site_name=site_name, date_info=date_info)
obj.run()