Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
af235b1f
Commit
af235b1f
authored
Apr 03, 2026
by
chenyuanjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
流量选品增加:组合颜色筛选
parent
85c8807f
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
116 additions
and
2 deletions
+116
-2
dwt_flow_asin.py
Pyspark_job/dwt/dwt_flow_asin.py
+109
-1
es_flow_asin.py
Pyspark_job/export_es/es_flow_asin.py
+1
-1
es_util.py
Pyspark_job/utils/es_util.py
+6
-0
No files found.
Pyspark_job/dwt/dwt_flow_asin.py
View file @
af235b1f
...
...
@@ -15,6 +15,7 @@
@UpdateTime : 2023/01/10 07:55
"""
import
os
import
re
import
sys
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
# 上级目录
...
...
@@ -77,6 +78,7 @@ class DwtFlowAsin(Templates):
self
.
df_flow_asin_last_year
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_keepa_asin
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_asin_source_flag
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
color_set
=
set
()
# 颜色词表,read_data 阶段填充
@staticmethod
def
udf_get_previous_last_30_day
(
self
):
...
...
@@ -297,6 +299,13 @@ class DwtFlowAsin(Templates):
self
.
df_asin_source_flag
=
self
.
df_asin_source_flag
.
repartition
(
60
)
.
persist
(
StorageLevel
.
DISK_ONLY
)
self
.
df_asin_source_flag
.
show
(
10
,
truncate
=
False
)
print
(
"11.读取颜色词表 dim_asin_color_info"
)
color_rows
=
self
.
spark
.
sql
(
f
"SELECT lower(en_name) as en_name FROM dim_asin_color_info WHERE site_name='{self.site_name}'"
)
.
collect
()
self
.
color_set
=
{
row
.
en_name
for
row
in
color_rows
}
print
(
f
"颜色词表共 {len(self.color_set)} 条"
)
# 处理asin基础属性信息(体积重量相关)及bsr销售额相关信息
def
handle_asin_basic_attribute
(
self
):
# 处理重量类型
...
...
@@ -599,6 +608,103 @@ class DwtFlowAsin(Templates):
)
.
fillna
({
"asin_source_flag"
:
"0"
,
"bsr_last_seen_at"
:
"1970-01-01"
,
"nsr_last_seen_at"
:
"1970-01-01"
})
self
.
df_asin_source_flag
.
unpersist
()
def
handle_multi_color_flag
(
self
):
"""判断 ASIN 是否为颜色组合产品
multi_color_flag:
0 = 非多色
1 = 从颜色变体属性字段(asin_color)解析为多色
2 = 从标题或五点描述解析为多色(降级 fallback)
颜色变体属性三层判断:
1. 命中颜色组合关键词
2. 含分隔符 (/ + & ; , and) 或 "数字 color"
3. 含 2 个及以上颜色表中的单色词
"""
# ── 第1层:关键词正则(来自需求文档,支持 color/colors/colour/colours)──
KEYWORD_PATTERN
=
(
r"(?i)("
r"\bmulticolou?rs?\b|\bmulti[\s\-]colou?rs?\b|\bmulti[\s\-]colored\b|\bmulticolored\b|"
r"\bassorted\b|\bmorandi\b|\bpastel\b|\bvibrant\b|\bvintage\b|\bboho\b|\bgradient\b|"
r"\bcandy\b|\bdusty\b|\bfluorescent\b|\bgentle\b|\bneutral\b|\bsoft\b|\bmuted\b|"
r"\brainbow\b|\bmaillard\b|\bcolorful\b|\bcolourful\b|\bmulti\b|"
r"\baesthetic colou?rs?\b|\bdreamy colou?rs?\b|\bearthy colou?rs?\b|\bshades of\b|"
r"\bvarious colou?rs?\b|\bsolid colou?rs?\b|\bmix colou?rs?\b|\bmixed colou?rs?\b|"
r"\bbasic colou?rs?\b|\blightcolor\b|\bdarkcolor\b|\battractive colou?rs?\b|"
r"\bmultiple colou?rs?\b|\bbright colorful\b|\bdifferent colou?rs?\b|\bclassic colou?rs?\b|"
r"\bfriendly colou?rs?\b|\bwarm colou?rs?\b|\bfun colou?rs?\b|\bmetallic colou?rs?\b|"
r"\bbright colou?rs?\b|\bdark colou?rs?\b|\blight colou?rs?\b|"
r"\bautumn colou?rs?\b|\bsummer colou?rs?\b|\bwinter colou?rs?\b|\bspring colou?rs?\b"
r")"
)
# ── 第2层:分隔符正则(/ + & ; 逗号 and 数字+color)──
SEPARATOR_PATTERN
=
r"(?i)[/+&;;,,]|\band\b|\d+\s*colou?rs?"
# ── 第3层:颜色表多色词检测 UDF ──
# 过滤出单色词(自身不含分隔符的词条),按长度降序以避免短词误截长词
single_colors
=
sorted
(
[
c
for
c
in
self
.
color_set
if
c
.
strip
()
and
not
re
.
search
(
r'[/+&;;,,]|\band\b'
,
c
)],
key
=
len
,
reverse
=
True
)
if
single_colors
:
color_regex
=
re
.
compile
(
r'(?i)\b('
+
'|'
.
join
(
re
.
escape
(
c
)
for
c
in
single_colors
)
+
r')\b'
)
else
:
color_regex
=
None
def
_get_matched_colors
(
color_str
):
if
not
color_str
or
color_regex
is
None
:
return
None
matched
=
sorted
({
m
.
group
(
1
)
.
lower
()
for
m
in
color_regex
.
finditer
(
color_str
)})
return
"/"
.
join
(
matched
)
if
len
(
matched
)
>=
2
else
None
udf_matched_colors
=
F
.
udf
(
_get_matched_colors
,
StringType
())
# ── 降级:标题 + 五点描述关键词正则 ──
FALLBACK_PATTERN
=
(
r"(?i)("
r"\bmulticolou?rs?\b|\bmulti[\s\-]colou?rs?\b|\bmulti[\s\-]colored\b|\bmulticolored\b|"
r"\bassorted colou?rs?\b|\bfluorescent colou?rs?\b|\bdifferent colou?rs?\b|"
r"\bbright colou?rs?\b|\bcolorful\b|\bcolourful\b|\battractive colou?rs?\b|"
r"\bvibrant colou?rs?\b|\d+\s*colou?rs?"
r")"
)
# 提前计算 UDF 结果,避免 multi_color_flag / multi_color_str 各调用一次(性能优化)
self
.
df_asin_detail
=
self
.
df_asin_detail
.
withColumn
(
"_matched_colors_str"
,
udf_matched_colors
(
F
.
col
(
"asin_color"
))
)
self
.
df_asin_detail
=
self
.
df_asin_detail
.
withColumn
(
"multi_color_flag"
,
F
.
when
(
F
.
col
(
"asin_color"
)
.
isNotNull
()
&
F
.
col
(
"asin_color"
)
.
rlike
(
KEYWORD_PATTERN
),
1
)
.
when
(
F
.
col
(
"asin_color"
)
.
isNotNull
()
&
F
.
col
(
"asin_color"
)
.
rlike
(
SEPARATOR_PATTERN
),
1
)
.
when
(
F
.
col
(
"asin_color"
)
.
isNotNull
()
&
F
.
col
(
"_matched_colors_str"
)
.
isNotNull
(),
1
)
.
when
(
F
.
lower
(
F
.
concat_ws
(
" "
,
F
.
col
(
"asin_title"
),
F
.
col
(
"asin_describe"
)))
.
rlike
(
FALLBACK_PATTERN
),
2
)
.
otherwise
(
0
)
)
.
withColumn
(
"multi_color_str"
,
F
.
when
(
F
.
col
(
"asin_color"
)
.
isNotNull
()
&
F
.
col
(
"asin_color"
)
.
rlike
(
KEYWORD_PATTERN
),
F
.
regexp_extract
(
F
.
col
(
"asin_color"
),
KEYWORD_PATTERN
,
1
)
)
.
when
(
F
.
col
(
"asin_color"
)
.
isNotNull
()
&
F
.
col
(
"asin_color"
)
.
rlike
(
SEPARATOR_PATTERN
),
F
.
col
(
"asin_color"
)
)
.
when
(
F
.
col
(
"asin_color"
)
.
isNotNull
()
&
F
.
col
(
"_matched_colors_str"
)
.
isNotNull
(),
F
.
col
(
"_matched_colors_str"
)
)
.
when
(
F
.
lower
(
F
.
concat_ws
(
" "
,
F
.
col
(
"asin_title"
),
F
.
col
(
"asin_describe"
)))
.
rlike
(
FALLBACK_PATTERN
),
F
.
regexp_extract
(
F
.
lower
(
F
.
concat_ws
(
" "
,
F
.
col
(
"asin_title"
),
F
.
col
(
"asin_describe"
))),
FALLBACK_PATTERN
,
1
)
)
.
otherwise
(
F
.
lit
(
None
))
)
.
drop
(
"_matched_colors_str"
)
def
handle_other_new_col
(
self
):
# 处理五点描述长度
self
.
df_asin_detail
=
self
.
df_asin_detail
.
withColumn
(
...
...
@@ -661,6 +767,7 @@ class DwtFlowAsin(Templates):
"matrix_ao_val"
,
"follow_sellers_count"
,
"seller_json"
,
"asin_describe"
,
"asin_fbm_price"
,
"asin_bought_mom"
,
"asin_bought_yoy"
,
"describe_len"
,
"tracking_since"
,
"tracking_since_type"
,
"asin_source_flag"
,
"bsr_last_seen_at"
,
"bsr_seen_count_30d"
,
"nsr_last_seen_at"
,
"nsr_seen_count_30d"
,
"multi_color_flag"
,
"multi_color_str"
,
F
.
lit
(
self
.
site_name
)
.
alias
(
"site_name"
),
F
.
lit
(
self
.
date_type
)
.
alias
(
"date_type"
),
F
.
lit
(
self
.
date_info
)
.
alias
(
"date_info"
))
self
.
df_save
=
self
.
df_save
.
na
.
fill
(
...
...
@@ -673,7 +780,7 @@ class DwtFlowAsin(Templates):
"asin_ao_val_type"
:
0
,
"asin_rank_type"
:
0
,
"asin_price_type"
:
0
,
"asin_quantity_variation_type"
:
0
,
"package_quantity"
:
1
,
"is_movie_label"
:
0
,
"is_brand_label"
:
0
,
"is_alarm_brand"
:
0
,
"title_matching_degree"
:
0.0
,
"asin_lqs_rating"
:
0.0
,
"follow_sellers_count"
:
-
1
,
"describe_len"
:
0
,
"bsr_seen_count_30d"
:
0
,
"nsr_seen_count_30d"
:
0
})
"bsr_seen_count_30d"
:
0
,
"nsr_seen_count_30d"
:
0
,
"multi_color_flag"
:
0
})
self
.
df_save
=
self
.
df_save
.
repartition
(
60
)
.
persist
(
StorageLevel
.
DISK_ONLY
)
self
.
df_save
=
self
.
df_save
.
drop_duplicates
([
'asin'
])
.
filter
((
F
.
col
(
"asin"
)
.
isNotNull
())
&
(
F
.
col
(
"asin"
)
!=
""
)
&
(
F
.
length
(
F
.
col
(
"asin"
))
<=
10
))
print
(
"数据量为:"
,
self
.
df_save
.
count
())
...
...
@@ -737,6 +844,7 @@ class DwtFlowAsin(Templates):
self
.
handle_title_matching_degree
()
self
.
handle_change_rate
()
self
.
handle_other_new_col
()
self
.
handle_multi_color_flag
()
self
.
handle_asin_different_source
()
self
.
handle_column
()
...
...
Pyspark_job/export_es/es_flow_asin.py
View file @
af235b1f
...
...
@@ -102,7 +102,7 @@ class EsStDetail(TemplatesMysql):
describe_len, asin_bought_mom as bought_month_mom, asin_bought_yoy as bought_month_yoy, tracking_since, tracking_since_type,
asin_rank_yoy as rank_yoy, asin_ao_yoy as ao_yoy, asin_price_yoy as price_yoy, asin_rating_yoy as rating_yoy,
asin_comments_yoy as comments_yoy, asin_bsr_orders_yoy as bsr_orders_yoy, asin_sales_yoy as sales_yoy, asin_variation_yoy as variation_yoy,
asin_source_flag, bsr_last_seen_at, bsr_seen_count_30d, nsr_last_seen_at, nsr_seen_count_30d
asin_source_flag, bsr_last_seen_at, bsr_seen_count_30d, nsr_last_seen_at, nsr_seen_count_30d
, multi_color_flag, multi_color_str
from {self.table_name} where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
"""
print
(
"sql:"
,
sql
)
...
...
Pyspark_job/utils/es_util.py
View file @
af235b1f
...
...
@@ -541,6 +541,12 @@ class EsUtils(object):
},
"nsr_seen_count_30d"
:
{
"type"
:
"integer"
},
"multi_color_flag"
:
{
"type"
:
"integer"
},
"multi_color_str"
:
{
"type"
:
"keyword"
}
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment