1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from pyspark.sql.types import MapType, StringType, IntegerType
"""
计算asin某个时间纬度内是否上个bsr榜单
"""
class DwdBsrAsinRank(object):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info
# 默认数据范围是30天
self.date_type = "last30day"
# 30day前
self.last_30_day = CommonUtil.get_day_offset(date_info, -30)
# 7 day前
self.last_7_day = CommonUtil.get_day_offset(date_info, -7)
self.current_day = date_info
app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
# 注册本地静态方法 udf 返回新函数
self.udf_calc_rank_reg = self.spark.udf.register("udf_calc_rank", self.udf_calc_rank, MapType(StringType(), IntegerType()))
self.hive_tb = "dwd_bsr_asin_rank"
hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwd/{self.hive_tb}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
print(f"清除hdfs目录中.....{hdfs_path}")
HdfsUtils.delete_file_in_folder(hdfs_path)
@staticmethod
def udf_calc_rank(dayArr: list, rankArr: list):
"""
判断 是否新上榜等
:return:
"""
bsr_count = len(rankArr)
is_30_day_flag = 1
is_1_day_flag = 0
is_7_day_flag = 0
last_7_day = dayArr[0]
start_day = dayArr[1]
end_day = dayArr[2]
for item in rankArr:
date_info = item[1] + ""
if date_info >= last_7_day:
is_7_day_flag = 1
if date_info >= start_day:
is_30_day_flag = 1
if date_info >= end_day:
is_1_day_flag = 1
return {
"bsr_count": bsr_count,
"is_30_day_flag": is_30_day_flag,
"is_1_day_flag": is_1_day_flag,
"is_7_day_flag": is_7_day_flag,
}
def run(self):
sql = f"""
select dbarh.asin,
dbarh.category_id as category_id,
dbarh.bsr_rank,
dbarh.date_info,
tmp.asin_launch_time
from dim_bsr_asin_rank_history dbarh
left join (
select asin, asin_launch_time
from dim_cal_asin_history_detail
where site_name = "{self.site_name}"
) tmp on tmp.asin = dbarh.asin
where dbarh.site_name = "{site_name}"
and dbarh.date_info >= "{self.last_30_day}"
and dbarh.date_info <= "{self.current_day}"
"""
print("======================查询sql如下======================")
print(sql)
last_90_day = CommonUtil.get_day_offset(date_info, -90)
df_all = self.spark.sql(sql)
df_all = df_all \
.groupBy(["category_id", "asin"]) \
.agg(
self.udf_calc_rank_reg(
F.array(F.lit(self.last_7_day), F.lit(self.last_30_day), F.lit(self.current_day)),
F.collect_set(F.array([df_all['bsr_rank'], df_all['date_info']]))
).alias("resultMap"),
# 最新 bsr_rank
F.max(F.struct("date_info", "bsr_rank")).alias("last_row"),
# 近30天进入过BSR榜单的产品中,上架时间在90天以内的产品 asin_launch_time 为 null 表示是新上架的
F.when(F.first("asin_launch_time") < F.lit(last_90_day), F.lit(0)).otherwise(1).alias("is_asin_new"),
)
# 近30天内,在当前品类中排名,上涨超过50%的产品(上涨排名至少超过20名)且上涨后排名在 20000名以上的。 todo
df_save = df_all \
.select(
F.col("asin"),
F.col("category_id"),
F.col("last_row").getField("bsr_rank").alias("bsr_rank"),
F.col("resultMap").getField("bsr_count").alias("bsr_count"),
F.col("resultMap").getField("is_1_day_flag").alias("is_1_day_flag"),
F.col("resultMap").getField("is_7_day_flag").alias("is_7_day_flag"),
F.col("resultMap").getField("is_30_day_flag").alias("is_30_day_flag"),
F.col("is_asin_new"),
# 近30天进入过BSR榜单的产品中,30天内是首次上榜的产品 即30天上榜次数为1
F.when(
F.col("resultMap").getField("bsr_count") == 1, F.lit(1)
).otherwise(0).alias("is_asin_bsr_new"),
# 最近排名上榜时间
F.col("last_row").getField("date_info").alias("last_bsr_day"),
F.lit(self.site_name).alias("site_name"),
F.lit(self.date_type).alias("date_type"),
F.lit(self.date_info).alias("date_info")
)
# 分区数量调整为2个
df_save = df_save.repartition(2)
partition_by = ["site_name", "date_type", "date_info"]
print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_info = CommonUtil.get_sys_arg(2, None)
obj = DwdBsrAsinRank(site_name, date_info)
obj.run()