1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from pyspark.sql import functions as F
from utils.common_util import CommonUtil, DateTypes
from utils.spark_util import SparkUtil
from pyspark.sql.types import BooleanType, IntegerType, StructType, StructField, StringType
from yswg_utils.common_df import get_self_asin_df, get_asin_unlanuch_df, get_node_first_id_df
from yswg_utils.common_udf import category_craw_flag, udf_get_package_quantity
"""
聚合nsr榜单Asin
"""
class DwtNsrAsinDetail(object):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info
app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
self.hive_tb = "dwt_nsr_asin_detail"
self.current_month = CommonUtil.reformat_date(self.date_info, "%Y-%m-%d", "%Y-%m", )
self.udf_category_craw_flag = F.udf(category_craw_flag, BooleanType())
self.udf_cal_first_category_rank = F.udf(self.cal_first_category_rank,
StructType([StructField("first_category_rank", IntegerType(), True),
StructField("first_category_rank_date", StringType(), True)])
)
self.udf_cal_package_quantity = F.udf(self.cal_package_quantity, IntegerType())
self.udf_cal_seller_country_type = F.udf(self.cal_seller_country_type, IntegerType())
pass
@staticmethod
def cal_first_category_rank(category_id, category_first_id, bsr_date_info, current_bsr_rank, flow_update_time,
first_category_rank):
if category_id == category_first_id:
# 如果是大类直接取 bsr_rank
return (current_bsr_rank, bsr_date_info)
else:
# 小类则取流量选品
return (first_category_rank, flow_update_time)
@staticmethod
def cal_package_quantity(title):
"""
打包数量
"""
val = udf_get_package_quantity(title)
if val is None:
val = 1
return val
@staticmethod
def cal_seller_country_type(asin_buy_box_seller_type, seller_country_name):
"""
卖家所属地类型
:param asin_buy_box_seller_type:
:param seller_country_name:
:return:
"""
if str(seller_country_name).lower() not in ['none', 'null']:
if asin_buy_box_seller_type == 1:
return 4
elif asin_buy_box_seller_type != 1 and str(seller_country_name).upper().find('US') != -1:
return 1
elif asin_buy_box_seller_type != 1 and str(seller_country_name).upper().find('CN') != -1:
return 2
else:
return 3
else:
return 0
def run(self):
df_dwt_flow_asin_part = CommonUtil.select_partitions_df(self.spark, "dwt_flow_asin")
# dwt_flow_asin month 最新分区
dwt_flow_asin_last_month = df_dwt_flow_asin_part \
.filter(f"date_type = '{DateTypes.month.name}' and site_name = '{self.site_name}' ") \
.selectExpr("max(date_info)").rdd.flatMap(lambda it: it).collect()[0]
# 近30天的asin
day_30_before = CommonUtil.get_day_offset(self.date_info, -30)
sql = f"""
select tmp.asin,
title,
img_url,
img_type,
ao_val,
rating,
total_comments,
bsr_orders,
bsr_orders_change,
price,
weight,
launch_time,
brand_name,
buy_box_seller_type,
account_name,
volume,
last_update_time,
asin_air_freight_gross_margin,
asin_ocean_freight_gross_margin,
category_first_id,
first_category_rank,
seller_id,
account_name,
seller_country_name,
asin_bought_month,
tmp_category_id
from (
select asin , first(category_id) as tmp_category_id
from dwd_nsr_asin_rank
where site_name = '{self.site_name}'
and date_info <= "{self.date_info}"
and date_info >= "{day_30_before}"
group by asin
) tmp
left join (
select asin,
fd_unique as seller_id,
first(fd_account_name) as account_name,
first(fd_country_name) as seller_country_name
from dim_fd_asin_info
where site_name = '{self.site_name}'
group by asin, fd_unique
) tmp1 on tmp.asin = tmp1.asin
left join (
select asin,
asin_title as title,
asin_img_url as img_url,
asin_img_type as img_type,
asin_rating as rating,
asin_total_comments as total_comments,
asin_price as price,
asin_weight as weight,
asin_launch_time as launch_time,
asin_volume as volume,
asin_brand_name as brand_name,
asin_buy_box_seller_type as buy_box_seller_type,
asin_crawl_date as last_update_time,
asin_rank as first_category_rank,
category_first_id as category_first_id
from dim_cal_asin_history_detail
where site_name = '{self.site_name}'
) tmp2 on tmp.asin = tmp2.asin
left join (
select asin,
asin_ao_val as ao_val,
bsr_orders as bsr_orders,
asin_bsr_orders_change as bsr_orders_change,
asin_air_freight_gross_margin as asin_air_freight_gross_margin,
asin_ocean_freight_gross_margin as asin_ocean_freight_gross_margin,
cast(asin_bought_month as int ) as asin_bought_month
from dwt_flow_asin
where site_name = '{self.site_name}'
and date_type = '{DateTypes.month.name}'
and date_info = '{dwt_flow_asin_last_month}'
) tmp3 on tmp.asin = tmp3.asin
"""
print("======================查询sql如下======================")
print(sql)
df_all = self.spark.sql(sql)
# 内部asin
df_self_asin = get_self_asin_df(self.site_name, self.spark).select(
F.col("asin"),
F.lit(1).alias("asin_type"),
)
# 下架asin
df_unlanuch = get_asin_unlanuch_df(self.site_name, self.spark)
df_node_first_id = get_node_first_id_df(self.site_name, self.spark).select(
F.col("node_id").alias("tmp_category_id"),
F.col("category_first_id").alias("tmp_category_first_id"),
)
df_all = df_all \
.join(df_self_asin, on=["asin"], how='left') \
.join(df_unlanuch, on=["asin"], how='left') \
.join(df_node_first_id, on=["tmp_category_id"], how='left')
# 新品没一级分类id的补全一级分类id
df_all = df_all.withColumn("category_first_id", F.coalesce(F.col("category_first_id"), F.col("tmp_category_first_id")))
df_all = df_all.withColumn("crawl_flag", self.udf_category_craw_flag(F.col("category_first_id"), F.col("asin")))
# df_all = df_all.withColumn("first_category_rank_tuple", self.udf_cal_first_category_rank(
# F.col("category_id"),
# F.col("category_first_id"),
# F.col("bsr_date_info"),
# F.col("current_bsr_rank"),
# F.col("last_update_time"),
# F.col("first_category_rank"),
# ))
day_before_30 = CommonUtil.get_day_offset(self.date_info, -30)
day_before_90 = CommonUtil.get_day_offset(self.date_info, -90)
day_before_180 = CommonUtil.get_day_offset(self.date_info, -180)
day_before_360 = CommonUtil.get_day_offset(self.date_info, -360)
day_before_450 = CommonUtil.get_day_offset(self.date_info, -450)
day_before_720 = CommonUtil.get_day_offset(self.date_info, -720)
day_before_1080 = CommonUtil.get_day_offset(self.date_info, -1080)
# 上架时间类型
df_all = df_all.withColumn(
'asin_launch_time_type',
F.when(F.col("launch_time") >= day_before_30, F.lit(1))
.when((F.col("launch_time") >= day_before_90) & (F.col("launch_time") < day_before_30), F.lit(2))
.when((F.col("launch_time") >= day_before_180) & (F.col("launch_time") < day_before_90), F.lit(3))
.when((F.col("launch_time") >= day_before_360) & (F.col("launch_time") < day_before_180), F.lit(4))
.when((F.col("launch_time") >= day_before_450) & (F.col("launch_time") < day_before_360), F.lit(5))
.when((F.col("launch_time") >= day_before_720) & (F.col("launch_time") < day_before_450), F.lit(6))
.when((F.col("launch_time") >= day_before_1080) & (F.col("launch_time") < day_before_720), F.lit(7))
.otherwise(F.lit(0))
)
# 打包数量
df_all = df_all.withColumn("package_quantity", self.udf_cal_package_quantity(F.col("title")))
# 卖家所属地
df_all = df_all.withColumn("seller_country_type",
self.udf_cal_seller_country_type(F.col("buy_box_seller_type"), F.col("seller_country_name")))
df_all = df_all.select(
F.col('asin'),
F.col('title'),
F.col('img_url'),
F.col('img_type'),
F.col('ao_val'),
F.col('rating'),
# 评论数量
F.col('total_comments'),
F.col('bsr_orders'),
F.col('bsr_orders_change'),
F.col('price'),
F.col('weight'),
F.col('launch_time'),
F.trim(F.col('brand_name')).alias("brand_name"),
F.col('buy_box_seller_type'),
F.col('account_name'),
F.col('volume'),
F.col('last_update_time'),
F.col('asin_air_freight_gross_margin'),
F.col('asin_ocean_freight_gross_margin'),
# 大类默认值空字符串
F.coalesce('category_first_id', F.lit("")).alias("category_first_id"),
F.when(F.col("asin_type").isNotNull(), F.lit(1))
.when(F.col("crawl_flag") == False, F.lit(2))
.otherwise(0).alias("asin_type"),
# asin 下架时间
F.col("asin_unlaunch_time"),
# 店铺唯一id
F.col("seller_id"),
# 卖家所属地
F.col("seller_country_name"),
# 大类排名
F.col("first_category_rank").alias('first_category_rank'),
# 大类排名时间
F.col("last_update_time").alias('first_category_rank_date'),
# 打包数量
F.col("package_quantity"),
# 上架时间类型
F.col("asin_launch_time_type"),
# 卖家所属地类型
F.col("seller_country_type"),
# 亚马逊销量(月)默认值给-1
F.coalesce('asin_bought_month', F.lit(-1)).alias("asin_bought_month"),
F.lit(self.site_name).alias("site_name"),
F.lit(self.current_month).alias("date_info")
)
# 自动对齐
df_all = CommonUtil.format_df_with_template(self.spark, df_all, self.hive_tb, roundDouble=True)
# 分区数设置为6
df_all = df_all.repartition(6)
partition_dict = {
"site_name": self.site_name,
"date_info": self.current_month
}
partition_by = list(partition_dict.keys())
hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=partition_dict)
print(f"清除hdfs目录中:{hdfs_path}")
HdfsUtils.delete_file_in_folder(hdfs_path)
hive_tb = self.hive_tb
print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
df_all.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_info = CommonUtil.get_sys_arg(2, None)
obj = DwtNsrAsinDetail(site_name=site_name, date_info=date_info)
obj.run()