Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
146441b7
Commit
146441b7
authored
Sep 28, 2025
by
chenyuanjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
流量选品-最小排名解析规则优化
parent
d75ed4cd
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
85 additions
and
36 deletions
+85
-36
dim_asin_bs_info.py
Pyspark_job/dim/dim_asin_bs_info.py
+85
-36
No files found.
Pyspark_job/dim/dim_asin_bs_info.py
View file @
146441b7
...
@@ -3,14 +3,13 @@ import sys
...
@@ -3,14 +3,13 @@ import sys
import
re
import
re
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
# 上级目录
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
# 上级目录
from
utils.templates
import
Templates
from
utils.templates
import
Templates
# from ..utils.templates import Templates
from
pyspark.sql
import
functions
as
F
from
pyspark.sql
import
functions
as
F
from
pyspark.sql.window
import
Window
from
pyspark.sql.window
import
Window
from
pyspark.sql.types
import
StructType
,
StructField
,
IntegerType
,
StringType
from
pyspark.sql.types
import
StructType
,
StructField
,
IntegerType
,
StringType
# 导入udf公共方法
# 导入udf公共方法
from
yswg_utils.common_udf
import
udf_parse_bs_category
# from yswg_utils.common_udf import udf_parse_bs_category
# from ..yswg_utils.common_udf import udf_parse_bs_category
class
DimBsAsinInfo
(
Templates
):
class
DimBsAsinInfo
(
Templates
):
...
@@ -20,31 +19,19 @@ class DimBsAsinInfo(Templates):
...
@@ -20,31 +19,19 @@ class DimBsAsinInfo(Templates):
self
.
site_name
=
site_name
self
.
site_name
=
site_name
self
.
date_type
=
date_type
self
.
date_type
=
date_type
self
.
date_info
=
date_info
self
.
date_info
=
date_info
# 初始化self.spark对
self
.
db_save
=
'dim_asin_bs_info'
self
.
db_save
=
'dim_asin_bs_info'
self
.
spark
=
self
.
create_spark_object
(
self
.
spark
=
self
.
create_spark_object
(
app_name
=
f
"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}"
)
app_name
=
f
"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}"
)
self
.
df_save
=
self
.
spark
.
sql
(
"select 1+1;"
)
self
.
df_save
=
self
.
spark
.
sql
(
"select 1+1;"
)
self
.
df_asin_node_id
=
self
.
spark
.
sql
(
"select 1+1;"
)
self
.
df_asin_node_id
=
self
.
spark
.
sql
(
"select 1+1;"
)
self
.
df_bs_asin_detail
=
self
.
spark
.
sql
(
"select 1+1;"
)
self
.
df_bs_asin_detail
=
self
.
spark
.
sql
(
"select 1+1;"
)
self
.
df_bs_category
=
self
.
spark
.
sql
(
"select 1+1;"
)
self
.
df_bs_category
=
self
.
spark
.
sql
(
"select 1+1;"
)
# 定义 UDF 的返回类型,即一个包含三个 DoubleType 字段的 StructType
schema
=
StructType
([
schema
=
StructType
([
StructField
(
'asin_bs_cate_1_id'
,
StringType
(),
True
),
StructField
(
'asin_bs_cate_1_id'
,
StringType
(),
True
),
StructField
(
'asin_bs_cate_current_id'
,
StringType
(),
True
),
StructField
(
'asin_bs_cate_current_id'
,
StringType
(),
True
),
StructField
(
'asin_bs_cate_1_rank'
,
IntegerType
(),
True
),
StructField
(
'asin_bs_cate_1_rank'
,
IntegerType
(),
True
),
StructField
(
'asin_bs_cate_current_rank'
,
IntegerType
(),
True
),
StructField
(
'asin_bs_cate_current_rank'
,
IntegerType
(),
True
),
])
])
# self.u_parse_bs_category = F.udf(self.udf_parse_bs_category, schema)
self
.
u_parse_bs_category
=
F
.
udf
(
self
.
udf_parse_bs_category
,
schema
)
self
.
u_parse_bs_category
=
F
.
udf
(
udf_parse_bs_category
,
schema
)
# self.pattern1_dict = {
# "us": "(\d+).*?See Top 100 in ".lower(),
# "uk": "(\d+).*?See Top 100 in ".lower(),
# "de": "(\d+).*?Siehe Top 100 in ".lower(),
# "es": "(\d+).*?Ver el Top 100 en ".lower(),
# "fr": "(\d+).*?Voir les 100 premiers en ".lower(),
# "it": "(\d+).*?Visualizza i Top 100 nella categoria ".lower(),
# }
self
.
pattern1_dict
=
{
self
.
pattern1_dict
=
{
"us"
:
"See Top 100 in "
.
lower
(),
"us"
:
"See Top 100 in "
.
lower
(),
"uk"
:
"See Top 100 in "
.
lower
(),
"uk"
:
"See Top 100 in "
.
lower
(),
...
@@ -66,11 +53,15 @@ class DimBsAsinInfo(Templates):
...
@@ -66,11 +53,15 @@ class DimBsAsinInfo(Templates):
self
.
get_year_week_tuple
()
self
.
get_year_week_tuple
()
@staticmethod
@staticmethod
def
udf_parse_bs_category
(
asin_bs_sellers_rank_lower
,
last_herf
,
all_best_sellers_href
,
cate_current_pattern
,
cate_1_pattern
):
def
udf_parse_bs_category
(
asin_bs_sellers_rank_lower
,
last_herf
,
all_best_sellers_href
,
cate_current_pattern
,
cate_1_pattern
,
node_id
):
"""
# if (site_name == 'us' and date_type in ['month', 'month_week'] and date_info >= '2023-11') or (site_name != 'us' and date_type in ['week'] and date_info >= '2023-41'):
asin_bs_sellers_rank_lower: 底部分类字符串
# href_list = all_best_sellers_href.split("&&&&")
last_herf: 最后一级分类链接
all_best_sellers_href: 所有分类链接
cate_current_pattern: 当前分类排名匹配规则
cate_1_pattern: 一级分类排名匹配规则
node_id: 页面头部抓取分类id
"""
# 1. 判断用哪个字段来解析分类
# 1. 判断用哪个字段来解析分类
if
str
(
all_best_sellers_href
)
.
lower
()
not
in
[
''
,
'none'
,
'null'
]:
if
str
(
all_best_sellers_href
)
.
lower
()
not
in
[
''
,
'none'
,
'null'
]:
bs_href
=
all_best_sellers_href
bs_href
=
all_best_sellers_href
...
@@ -80,8 +71,54 @@ class DimBsAsinInfo(Templates):
...
@@ -80,8 +71,54 @@ class DimBsAsinInfo(Templates):
bs_href
=
''
bs_href
=
''
href_list
=
bs_href
.
replace
(
"?tf=1"
,
""
)
.
split
(
"&&&&"
)
href_list
=
bs_href
.
replace
(
"?tf=1"
,
""
)
.
split
(
"&&&&"
)
# 新增climate-pledge分类优化--若最后一级是climate-pledge的分类,则向前取
rank_flag
=
None
while
True
:
if
'/climate-pledge'
in
href_list
[
-
1
]
and
len
(
href_list
)
>=
2
:
href_list
.
pop
()
rank_flag
=
True
else
:
break
# 2. 解析一级和当前 分类 + 排名
# 2. 解析一级和当前 分类 + 排名
# 2.1 提取分类
# 2.1 先检查 node_id 是否在 href_list 中
cate_1_id
,
cate_current_id
,
cate_1_rank
,
cate_current_rank
=
None
,
None
,
None
,
None
if
node_id
and
len
(
href_list
)
>
1
:
node_id_str
=
str
(
node_id
)
matched_idx
=
None
for
i
,
href
in
enumerate
(
href_list
):
if
node_id_str
in
href
:
# 判断node_id是否在url中出现
matched_idx
=
i
break
if
matched_idx
is
not
None
:
# 提取对应分类ID
cate_current_id
=
re
.
findall
(
'bestsellers/(.*)/ref'
,
href_list
[
matched_idx
])
cate_current_id
=
cate_current_id
[
0
]
.
split
(
"/"
)[
-
1
]
if
cate_current_id
else
None
# 一级分类还是取第一个
cate_1_id
=
re
.
findall
(
'bestsellers/(.*)/ref'
,
href_list
[
0
])
cate_1_id
=
cate_1_id
[
0
]
.
split
(
"/"
)[
0
]
if
cate_1_id
else
None
# 解析排名
if
asin_bs_sellers_rank_lower
is
not
None
:
asin_bs_sellers_rank_lower2
=
asin_bs_sellers_rank_lower
.
replace
(
"."
,
""
)
.
replace
(
","
,
""
)
.
replace
(
" 100 "
,
""
)
else
:
asin_bs_sellers_rank_lower2
=
''
rank_list
=
re
.
findall
(
cate_current_pattern
,
asin_bs_sellers_rank_lower2
)
rank_list
=
[
int
(
rank
)
for
rank
in
rank_list
]
# 如果 rank_list 长度和 href_list 对齐,则取对应位置的排名
if
matched_idx
<
len
(
rank_list
):
cate_current_rank
=
rank_list
[
matched_idx
]
# 一级分类排名
if
rank_list
and
cate_1_pattern
in
asin_bs_sellers_rank_lower
:
cate_1_rank
=
rank_list
[
0
]
return
cate_1_id
,
cate_current_id
,
cate_1_rank
,
cate_current_rank
# 2.2 提取分类
if
href_list
:
if
href_list
:
if
len
(
href_list
)
==
1
:
if
len
(
href_list
)
==
1
:
cate_list
=
re
.
findall
(
'bestsellers/(.*)/ref'
,
href_list
[
0
])
cate_list
=
re
.
findall
(
'bestsellers/(.*)/ref'
,
href_list
[
0
])
...
@@ -93,20 +130,32 @@ class DimBsAsinInfo(Templates):
...
@@ -93,20 +130,32 @@ class DimBsAsinInfo(Templates):
else
:
else
:
cate_1_id
,
cate_current_id
=
None
,
None
cate_1_id
,
cate_current_id
=
None
,
None
else
:
else
:
cate_1_id
=
re
.
findall
(
'bestsellers/(.*)/ref'
,
href_list
[
0
])[
0
]
if
re
.
findall
(
'bestsellers/(.*)/ref'
,
href_list
[
0
])
else
None
cate_1_id
=
re
.
findall
(
'bestsellers/(.*)/ref'
,
href_list
[
0
])[
0
]
if
re
.
findall
(
'bestsellers/(.*)/ref'
,
cate_current_id
=
re
.
findall
(
'bestsellers/(.*)/ref'
,
href_list
[
-
1
])[
0
]
if
re
.
findall
(
'bestsellers/(.*)/ref'
,
href_list
[
-
1
])
else
None
href_list
[
0
])
else
None
cate_current_id
=
re
.
findall
(
'bestsellers/(.*)/ref'
,
href_list
[
-
1
])[
0
]
if
re
.
findall
(
'bestsellers/(.*)/ref'
,
href_list
[
-
1
])
else
None
if
"/"
in
cate_1_id
:
if
"/"
in
cate_1_id
:
cate_1_id
=
cate_1_id
.
split
(
"/"
)[
0
]
cate_1_id
=
cate_1_id
.
split
(
"/"
)[
0
]
if
"/"
in
cate_current_id
:
if
"/"
in
cate_current_id
:
cate_current_id
=
cate_current_id
.
split
(
"/"
)[
-
1
]
cate_current_id
=
cate_current_id
.
split
(
"/"
)[
-
1
]
else
:
else
:
cate_1_id
,
cate_current_id
=
None
,
None
cate_1_id
,
cate_current_id
=
None
,
None
# 2.2 提取排名
# 2.3 提取排名
asin_bs_sellers_rank_lower2
=
asin_bs_sellers_rank_lower
.
replace
(
","
,
""
)
.
replace
(
" 100 "
,
""
)
if
asin_bs_sellers_rank_lower
is
not
None
:
asin_bs_sellers_rank_lower2
=
asin_bs_sellers_rank_lower
.
replace
(
"."
,
""
)
.
replace
(
","
,
""
)
.
replace
(
" 100 "
,
""
)
else
:
asin_bs_sellers_rank_lower2
=
''
rank_list
=
re
.
findall
(
cate_current_pattern
,
asin_bs_sellers_rank_lower2
)
# 匹配排名
rank_list
=
re
.
findall
(
cate_current_pattern
,
asin_bs_sellers_rank_lower2
)
# 匹配排名
rank_list
=
[
int
(
rank
)
for
rank
in
rank_list
]
# 转换成int类型
rank_list
=
[
int
(
rank
)
for
rank
in
rank_list
]
# 转换成int类型
# print("rank_list:", rank_list)
if
rank_flag
:
if
len
(
rank_list
)
>
len
(
href_list
):
rank_list
=
rank_list
[:
len
(
href_list
)]
if
rank_list
:
if
rank_list
:
if
len
(
rank_list
)
==
1
:
if
len
(
rank_list
)
==
1
:
if
cate_1_pattern
in
asin_bs_sellers_rank_lower
:
if
cate_1_pattern
in
asin_bs_sellers_rank_lower
:
...
@@ -170,6 +219,10 @@ class DimBsAsinInfo(Templates):
...
@@ -170,6 +219,10 @@ class DimBsAsinInfo(Templates):
# 小写
# 小写
self
.
df_bs_asin_detail
=
self
.
df_bs_asin_detail
.
withColumn
(
"asin_bs_sellers_rank_lower"
,
self
.
df_bs_asin_detail
=
self
.
df_bs_asin_detail
.
withColumn
(
"asin_bs_sellers_rank_lower"
,
F
.
lower
(
"asin_bs_sellers_rank"
))
F
.
lower
(
"asin_bs_sellers_rank"
))
# 关联node_id
self
.
df_bs_asin_detail
=
self
.
df_asin_node_id
.
join
(
self
.
df_bs_asin_detail
,
'asin'
,
how
=
'left'
)
# self.df_bs_asin_detail.show(10, truncate=False)
# self.df_bs_asin_detail.show(10, truncate=False)
# 提取分类字符串中的asin_bs_cate_1_rank, asin_bs_cate_current_rank
# 提取分类字符串中的asin_bs_cate_1_rank, asin_bs_cate_current_rank
# 生成当前分类匹配规则
# 生成当前分类匹配规则
...
@@ -178,7 +231,7 @@ class DimBsAsinInfo(Templates):
...
@@ -178,7 +231,7 @@ class DimBsAsinInfo(Templates):
self
.
df_bs_asin_detail
=
self
.
df_bs_asin_detail
.
withColumn
(
self
.
df_bs_asin_detail
=
self
.
df_bs_asin_detail
.
withColumn
(
'asin_bs_cate_ranks'
,
'asin_bs_cate_ranks'
,
self
.
u_parse_bs_category
(
'asin_bs_sellers_rank_lower'
,
'last_herf'
,
'all_best_sellers_href'
,
self
.
u_parse_bs_category
(
'asin_bs_sellers_rank_lower'
,
'last_herf'
,
'all_best_sellers_href'
,
F
.
lit
(
cate_current_pattern
),
F
.
lit
(
cate_1_pattern
))
F
.
lit
(
cate_current_pattern
),
F
.
lit
(
cate_1_pattern
)
,
'asin_bs_cate_current_id_node'
)
)
)
# self.df_bs_asin_detail.show(10, truncate=False)
# self.df_bs_asin_detail.show(10, truncate=False)
self
.
df_bs_asin_detail
=
self
.
df_bs_asin_detail
\
self
.
df_bs_asin_detail
=
self
.
df_bs_asin_detail
\
...
@@ -190,18 +243,15 @@ class DimBsAsinInfo(Templates):
...
@@ -190,18 +243,15 @@ class DimBsAsinInfo(Templates):
.
withColumn
(
'asin_bs_cate_current_rank'
,
.
withColumn
(
'asin_bs_cate_current_rank'
,
self
.
df_bs_asin_detail
.
asin_bs_cate_ranks
.
getField
(
'asin_bs_cate_current_rank'
))
\
self
.
df_bs_asin_detail
.
asin_bs_cate_ranks
.
getField
(
'asin_bs_cate_current_rank'
))
\
.
drop
(
'asin_bs_cate_ranks'
)
.
drop
(
'asin_bs_cate_ranks'
)
self
.
df_bs_asin_detail
.
show
(
10
,
truncate
=
False
)
#
self.df_bs_asin_detail.show(10, truncate=False)
# self.df_save = self.df_asin_node_id.join(
# self.df_save = self.df_asin_node_id.join(
# self.df_bs_asin_detail, 'asin', how='left'
# self.df_bs_asin_detail, 'asin', how='left'
# ).join(
# ).join(
# self.df_category_desc_id, 'asin_bs_cate_current_id', how='left'
# self.df_category_desc_id, 'asin_bs_cate_current_id', how='left'
# )
# )
self
.
df_save
=
self
.
df_asin_node_id
.
join
(
self
.
df_bs_asin_detail
,
'asin'
,
how
=
'left'
)
# 用node_id的分类去补充一级分类和当前分类
# 用node_id的分类去补充一级分类和当前分类
self
.
df_save
=
self
.
df_
save
.
withColumn
(
self
.
df_save
=
self
.
df_
bs_asin_detail
.
withColumn
(
"asin_bs_cate_1_id"
,
"asin_bs_cate_1_id"
,
F
.
when
(
F
.
col
(
"asin_bs_cate_1_id"
)
.
isNull
(),
F
.
col
(
"asin_bs_cate_1_id_node"
))
.
otherwise
(
F
.
col
(
"asin_bs_cate_1_id"
))
F
.
when
(
F
.
col
(
"asin_bs_cate_1_id"
)
.
isNull
(),
F
.
col
(
"asin_bs_cate_1_id_node"
))
.
otherwise
(
F
.
col
(
"asin_bs_cate_1_id"
))
)
.
withColumn
(
)
.
withColumn
(
...
@@ -225,4 +275,4 @@ if __name__ == '__main__':
...
@@ -225,4 +275,4 @@ if __name__ == '__main__':
date_type
=
sys
.
argv
[
2
]
# 参数2:类型:week/4_week/month/quarter
date_type
=
sys
.
argv
[
2
]
# 参数2:类型:week/4_week/month/quarter
date_info
=
sys
.
argv
[
3
]
# 参数3:年-周/年-月/年-季, 比如: 2022-1
date_info
=
sys
.
argv
[
3
]
# 参数3:年-周/年-月/年-季, 比如: 2022-1
handle_obj
=
DimBsAsinInfo
(
site_name
=
site_name
,
date_type
=
date_type
,
date_info
=
date_info
)
handle_obj
=
DimBsAsinInfo
(
site_name
=
site_name
,
date_type
=
date_type
,
date_info
=
date_info
)
handle_obj
.
run
()
handle_obj
.
run
()
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment