1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import os
import sys
import re
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# 导入udf公共方法
from yswg_utils.common_udf import udf_parse_bs_category
# from ..yswg_utils.common_udf import udf_parse_bs_category
class DimBsAsinInfo(Templates):
def __init__(self, site_name='us', date_type="month", date_info='2022-1'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
# 初始化self.spark对
self.db_save = 'dim_asin_bs_info'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
self.df_save = self.spark.sql("select 1+1;")
self.df_asin_node_id = self.spark.sql("select 1+1;")
self.df_bs_asin_detail = self.spark.sql("select 1+1;")
self.df_bs_category = self.spark.sql("select 1+1;")
# 定义 UDF 的返回类型,即一个包含三个 DoubleType 字段的 StructType
schema = StructType([
StructField('asin_bs_cate_1_id', StringType(), True),
StructField('asin_bs_cate_current_id', StringType(), True),
StructField('asin_bs_cate_1_rank', IntegerType(), True),
StructField('asin_bs_cate_current_rank', IntegerType(), True),
])
# self.u_parse_bs_category = F.udf(self.udf_parse_bs_category, schema)
self.u_parse_bs_category = F.udf(udf_parse_bs_category, schema)
# self.pattern1_dict = {
# "us": "(\d+).*?See Top 100 in ".lower(),
# "uk": "(\d+).*?See Top 100 in ".lower(),
# "de": "(\d+).*?Siehe Top 100 in ".lower(),
# "es": "(\d+).*?Ver el Top 100 en ".lower(),
# "fr": "(\d+).*?Voir les 100 premiers en ".lower(),
# "it": "(\d+).*?Visualizza i Top 100 nella categoria ".lower(),
# }
self.pattern1_dict = {
"us": "See Top 100 in ".lower(),
"uk": "See Top 100 in ".lower(),
"de": "Siehe Top 100 in ".lower(),
"es": "Ver el Top 100 en ".lower(),
"fr": "Voir les 100 premiers en ".lower(),
"it": "Visualizza i Top 100 nella categoria ".lower(),
}
self.pattern_current_dict = {
"us": "#(\d+) ",
"uk": "(\d+) in ",
"de": "(\d+) in ",
"es": "(\d+) en ",
"fr": "(\d+) en ",
"it": "(\d+) in ",
}
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=20)
self.get_year_week_tuple()
@staticmethod
def udf_parse_bs_category(asin_bs_sellers_rank_lower, last_herf, all_best_sellers_href, cate_current_pattern, cate_1_pattern):
# if (site_name == 'us' and date_type in ['month', 'month_week'] and date_info >= '2023-11') or (site_name != 'us' and date_type in ['week'] and date_info >= '2023-41'):
# href_list = all_best_sellers_href.split("&&&&")
# 1. 判断用哪个字段来解析分类
if str(all_best_sellers_href).lower() not in ['', 'none', 'null']:
bs_href = all_best_sellers_href
elif str(last_herf).lower() not in ['', 'none', 'null']:
bs_href = last_herf
else:
bs_href = ''
href_list = bs_href.replace("?tf=1", "").split("&&&&")
# 2. 解析一级和当前 分类 + 排名
# 2.1 提取分类
if href_list:
if len(href_list) == 1:
cate_list = re.findall('bestsellers/(.*)/ref', href_list[0])
if cate_list:
if "/" in cate_list[0]:
cate_1_id, cate_current_id = cate_list[0].split("/")[0], cate_list[0].split("/")[-1]
else:
cate_1_id, cate_current_id = cate_list[0].split("/")[0], None
else:
cate_1_id, cate_current_id = None, None
else:
cate_1_id = re.findall('bestsellers/(.*)/ref', href_list[0])[0] if re.findall('bestsellers/(.*)/ref', href_list[0]) else None
cate_current_id = re.findall('bestsellers/(.*)/ref', href_list[-1])[0] if re.findall('bestsellers/(.*)/ref', href_list[-1]) else None
if "/" in cate_1_id:
cate_1_id = cate_1_id.split("/")[0]
if "/" in cate_current_id:
cate_current_id = cate_current_id.split("/")[-1]
else:
cate_1_id, cate_current_id = None, None
# 2.2 提取排名
asin_bs_sellers_rank_lower2 = asin_bs_sellers_rank_lower.replace(",", "").replace(" 100 ", "")
rank_list = re.findall(cate_current_pattern, asin_bs_sellers_rank_lower2) # 匹配排名
rank_list = [int(rank) for rank in rank_list] # 转换成int类型
if rank_list:
if len(rank_list) == 1:
if cate_1_pattern in asin_bs_sellers_rank_lower:
cate_1_rank, cate_current_rank = rank_list[0], None
else:
cate_1_rank, cate_current_rank = None, rank_list[0]
else:
if cate_1_pattern in asin_bs_sellers_rank_lower:
cate_1_rank, cate_current_rank = rank_list[0], rank_list[-1]
else:
cate_1_rank, cate_current_rank = None, rank_list[0]
else:
cate_1_rank, cate_current_rank = None, None
return cate_1_id, cate_current_id, cate_1_rank, cate_current_rank
def read_data(self):
sql = f"select asin, category_id as asin_bs_cate_current_id_node, category_first_id as asin_bs_cate_1_id_node from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info ='{self.date_info}';" # and date_info>='2023-15'
print(f"1. 读取dim_asin_detail表node_id数据: sql -- {sql}")
self.df_asin_node_id = self.spark.sql(sqlQuery=sql).cache()
self.df_asin_node_id.show(10, truncate=False)
# 2. 读取ods_bs_category_asin_detail对应周期的详情表
params = f" date_info <= '2022-42'" if max(self.year_week_tuple) <= '2022-42' and date_type == 'month' else f" date_info in {self.year_week_tuple}"
sql = f"select asin, best_sellers_rank as asin_bs_sellers_rank, last_herf, all_best_sellers_href, date_type, date_info, created_at from ods_bs_category_asin_detail " \
f"where site_name='{self.site_name}' and date_type='week' and {params};"
if date_type in ['month', 'month_week'] and ((self.site_name == 'us' and date_info >= '2023-10') or (self.site_name in ['uk', 'de'] and self.date_info >= '2024-05')):
sql = f"select asin, best_sellers_rank as asin_bs_sellers_rank, last_herf, all_best_sellers_href, date_type, date_info, created_at from ods_bs_category_asin_detail " \
f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
print(f"2. 读取ods_bs_category_asin_detail对应周期的详情表: sql -- {sql}")
self.df_bs_asin_detail = self.spark.sql(sqlQuery=sql).cache()
# self.df_bs_asin_detail = self.df_bs_asin_detail.drop_duplicates(['asin', 'date_info'])
self.df_bs_asin_detail.show(10, truncate=False)
def handle_df_asin_node_id(self):
# 保留asin最新的node_id
# self.df_asin_node_id = self.df_asin_node_id.filter("asin_bs_cate_current_id is not null")
# window = Window.partitionBy(['asin']).orderBy(
# self.df_asin_node_id.date_info.desc()
# )
# self.df_asin_node_id = self.df_asin_node_id.withColumn(
# "row_number", F.row_number().over(window=window)
# )
# self.df_asin_node_id = self.df_asin_node_id.filter("row_number=1")
# self.df_asin_node_id = self.df_asin_node_id.drop("row_number")
pass # dim_asin_detail已经去重过
def handle_df_bs_asin_detail(self):
# 保留asin最新的asin_bs_sellers_rank_lower
window = Window.partitionBy(['asin']).orderBy(
self.df_bs_asin_detail.created_at.desc()
)
self.df_bs_asin_detail = self.df_bs_asin_detail.withColumn(
"row_number", F.row_number().over(window=window)
)
self.df_bs_asin_detail = self.df_bs_asin_detail.filter("row_number=1")
self.df_bs_asin_detail = self.df_bs_asin_detail.drop("row_number", "date_info")
# 小写
self.df_bs_asin_detail = self.df_bs_asin_detail.withColumn("asin_bs_sellers_rank_lower",
F.lower("asin_bs_sellers_rank"))
# self.df_bs_asin_detail.show(10, truncate=False)
# 提取分类字符串中的asin_bs_cate_1_rank, asin_bs_cate_current_rank
# 生成当前分类匹配规则
cate_current_pattern = self.pattern_current_dict[self.site_name]
cate_1_pattern = self.pattern1_dict[self.site_name]
self.df_bs_asin_detail = self.df_bs_asin_detail.withColumn(
'asin_bs_cate_ranks',
self.u_parse_bs_category('asin_bs_sellers_rank_lower', 'last_herf', 'all_best_sellers_href',
F.lit(cate_current_pattern), F.lit(cate_1_pattern))
)
# self.df_bs_asin_detail.show(10, truncate=False)
self.df_bs_asin_detail = self.df_bs_asin_detail \
.withColumn('asin_bs_cate_1_id', self.df_bs_asin_detail.asin_bs_cate_ranks.getField('asin_bs_cate_1_id')) \
.withColumn('asin_bs_cate_current_id',
self.df_bs_asin_detail.asin_bs_cate_ranks.getField('asin_bs_cate_current_id')) \
.withColumn('asin_bs_cate_1_rank',
self.df_bs_asin_detail.asin_bs_cate_ranks.getField('asin_bs_cate_1_rank')) \
.withColumn('asin_bs_cate_current_rank',
self.df_bs_asin_detail.asin_bs_cate_ranks.getField('asin_bs_cate_current_rank')) \
.drop('asin_bs_cate_ranks')
self.df_bs_asin_detail.show(10, truncate=False)
# self.df_save = self.df_asin_node_id.join(
# self.df_bs_asin_detail, 'asin', how='left'
# ).join(
# self.df_category_desc_id, 'asin_bs_cate_current_id', how='left'
# )
self.df_save = self.df_asin_node_id.join(
self.df_bs_asin_detail, 'asin', how='left'
)
# 用node_id的分类去补充一级分类和当前分类
self.df_save = self.df_save.withColumn(
"asin_bs_cate_1_id",
F.when(F.col("asin_bs_cate_1_id").isNull(), F.col("asin_bs_cate_1_id_node")).otherwise(F.col("asin_bs_cate_1_id"))
).withColumn(
"asin_bs_cate_current_id",
F.when(F.col("asin_bs_cate_current_id").isNull(), F.col("asin_bs_cate_current_id_node")).otherwise(F.col("asin_bs_cate_current_id"))
)
self.df_save = self.df_save.drop("asin_bs_sellers_rank_lower", "asin_bs_cate_1_id_node", "asin_bs_cate_current_id_node")
self.df_save.show(20)
def handle_data(self):
self.handle_df_asin_node_id()
self.handle_df_bs_asin_detail()
self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1
handle_obj = DimBsAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()