Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
a843829a
Commit
a843829a
authored
Apr 13, 2026
by
chenyuanjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
流量选品30天增加组合颜色判断
parent
9a8892d9
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
238 additions
and
4 deletions
+238
-4
kafka_flow_asin_detail.py
Pyspark_job/my_kafka/kafka_flow_asin_detail.py
+116
-2
kafka_rank_asin_detail.py
Pyspark_job/my_kafka/kafka_rank_asin_detail.py
+116
-2
es_util.py
Pyspark_job/utils/es_util.py
+6
-0
No files found.
Pyspark_job/my_kafka/kafka_flow_asin_detail.py
View file @
a843829a
import
os
import
re
import
sys
import
time
import
traceback
...
...
@@ -107,6 +108,7 @@ class KafkaFlowAsinDetail(Templates):
self
.
df_asin_source_flag
=
self
.
spark
.
sql
(
"select 1+1;"
)
self
.
df_keepa_asin
=
self
.
spark
.
sql
(
"select 1+1;"
)
self
.
df_asin_profit_rate
=
self
.
spark
.
sql
(
"select 1+1;"
)
self
.
color_set
=
set
()
# udf函数注册
package_schema
=
StructType
([
StructField
(
"parse_package_quantity"
,
IntegerType
(),
True
),
...
...
@@ -450,6 +452,11 @@ class KafkaFlowAsinDetail(Templates):
withColumn
(
"title_package_quantity_is_abnormal"
,
df
.
title_parse
.
getField
(
"is_package_quantity_abnormal"
))
.
\
withColumn
(
"variat_package_quantity_is_abnormal"
,
df
.
variat_parse
.
getField
(
"is_package_quantity_abnormal"
))
.
\
drop
(
"title_parse"
,
"variat_parse"
,
"variat_attribute"
)
# 从 product_json 额外提取 Color 字段作为颜色来源备用
df
=
df
.
withColumn
(
"_product_json_color"
,
F
.
lower
(
F
.
get_json_object
(
F
.
col
(
"product_json"
),
"$.Color"
))
)
# Number of Items:从 product_json 提取,cast 失败(脏数据)自动为 null,提取后立即 drop
df
=
df
.
withColumn
(
"number_of_items"
,
...
...
@@ -471,6 +478,8 @@ class KafkaFlowAsinDetail(Templates):
)
.
drop
(
"number_of_items"
,
"title_package_quantity"
,
"variat_package_quantity"
,
"title_package_quantity_is_abnormal"
,
"variat_package_quantity_is_abnormal"
)
df
=
df
.
withColumn
(
"title"
,
F
.
lower
(
F
.
col
(
"title"
)))
# color 优先使用变体属性颜色,fallback 到 product_json 中的 Color 字段
df
=
df
.
withColumn
(
"color"
,
F
.
coalesce
(
F
.
col
(
"color"
),
F
.
col
(
"_product_json_color"
)))
.
drop
(
"_product_json_color"
)
df
=
df
.
join
(
self
.
df_user_package_num
,
on
=
[
'asin'
,
'title'
],
how
=
'left'
)
df
=
df
.
withColumn
(
"package_quantity"
,
F
.
coalesce
(
F
.
col
(
"user_package_num"
),
F
.
col
(
"package_quantity"
)))
.
\
withColumn
(
...
...
@@ -523,6 +532,101 @@ class KafkaFlowAsinDetail(Templates):
"title_len_rating"
,
"title_brand_rating"
,
"img_num_rating"
,
"img_enlarge_rating"
)
return
df
def
handle_multi_color_flag
(
self
,
df
):
"""判断 ASIN 是否为颜色组合产品
multi_color_flag:
0 = 非多色
1 = 从颜色变体属性字段(color)解析为多色
2 = 从标题或五点描述解析为多色(降级 fallback)
颜色变体属性三层判断:
1. 命中颜色组合关键词
2. 含分隔符 (/ + & ; , and) 或 "数字 color"
3. 含 2 个及以上颜色表中的单色词
"""
# ── 第1层:关键词正则(支持 color/colors/colour/colours)──
KEYWORD_PATTERN
=
(
r"(?i)("
r"\bmulticolou?rs?\b|\bmulti[\s\-]colou?rs?\b|\bmulti[\s\-]colored\b|\bmulticolored\b|"
r"\bassorted\b|\bmorandi\b|\bpastel\b|\bvibrant\b|\bvintage\b|\bboho\b|\bgradient\b|"
r"\bcandy\b|\bdusty\b|\bfluorescent\b|\bgentle\b|\bneutral\b|\bsoft\b|\bmuted\b|"
r"\brainbow\b|\bmaillard\b|\bcolorful\b|\bcolourful\b|\bmulti\b|"
r"\baesthetic colou?rs?\b|\bdreamy colou?rs?\b|\bearthy colou?rs?\b|\bshades of\b|"
r"\bvarious colou?rs?\b|\bsolid colou?rs?\b|\bmix colou?rs?\b|\bmixed colou?rs?\b|"
r"\bbasic colou?rs?\b|\blightcolor\b|\bdarkcolor\b|\battractive colou?rs?\b|"
r"\bmultiple colou?rs?\b|\bbright colorful\b|\bdifferent colou?rs?\b|\bclassic colou?rs?\b|"
r"\bfriendly colou?rs?\b|\bwarm colou?rs?\b|\bfun colou?rs?\b|\bmetallic colou?rs?\b|"
r"\bbright colou?rs?\b|\bdark colou?rs?\b|\blight colou?rs?\b|"
r"\bautumn colou?rs?\b|\bsummer colou?rs?\b|\bwinter colou?rs?\b|\bspring colou?rs?\b"
r")"
)
# ── 第2层:分隔符正则(/ + & ; 逗号 and 数字+color)──
SEPARATOR_PATTERN
=
r"(?i)[/+&;;,,]|\band\b|\d+\s*colou?rs?"
# ── 第3层:颜色表多色词检测 UDF ──
single_colors
=
sorted
(
[
c
for
c
in
self
.
color_set
if
c
.
strip
()
and
not
re
.
search
(
r'[/+&;;,,]|\band\b'
,
c
)],
key
=
len
,
reverse
=
True
)
if
single_colors
:
color_regex
=
re
.
compile
(
r'(?i)\b('
+
'|'
.
join
(
re
.
escape
(
c
)
for
c
in
single_colors
)
+
r')\b'
)
else
:
color_regex
=
None
def
_get_matched_colors
(
color_str
):
if
not
color_str
or
color_regex
is
None
:
return
None
matched
=
sorted
({
m
.
group
(
1
)
.
lower
()
for
m
in
color_regex
.
finditer
(
color_str
)})
return
"/"
.
join
(
matched
)
if
len
(
matched
)
>=
2
else
None
udf_matched_colors
=
F
.
udf
(
_get_matched_colors
,
StringType
())
# ── 降级:标题 + 五点描述关键词正则 ──
FALLBACK_PATTERN
=
(
r"(?i)("
r"\bmulticolou?rs?\b|\bmulti[\s\-]colou?rs?\b|\bmulti[\s\-]colored\b|\bmulticolored\b|"
r"\bassorted colou?rs?\b|\bfluorescent colou?rs?\b|\bdifferent colou?rs?\b|"
r"\bbright colou?rs?\b|\bcolorful\b|\bcolourful\b|\battractive colou?rs?\b|"
r"\bvibrant colou?rs?\b|\d+\s*colou?rs?"
r")"
)
# 提前计算 UDF 结果,避免 multi_color_flag / multi_color_str 各调用一次(性能优化)
df
=
df
.
withColumn
(
"_matched_colors_str"
,
udf_matched_colors
(
F
.
col
(
"color"
)))
df
=
df
.
withColumn
(
"multi_color_flag"
,
F
.
when
(
F
.
col
(
"color"
)
.
isNotNull
()
&
F
.
col
(
"color"
)
.
rlike
(
KEYWORD_PATTERN
),
1
)
.
when
(
F
.
col
(
"color"
)
.
isNotNull
()
&
F
.
col
(
"color"
)
.
rlike
(
SEPARATOR_PATTERN
),
1
)
.
when
(
F
.
col
(
"color"
)
.
isNotNull
()
&
F
.
col
(
"_matched_colors_str"
)
.
isNotNull
(),
1
)
.
when
(
F
.
lower
(
F
.
concat_ws
(
" "
,
F
.
col
(
"title"
),
F
.
col
(
"describe"
)))
.
rlike
(
FALLBACK_PATTERN
),
2
)
.
otherwise
(
0
)
)
.
withColumn
(
"multi_color_str"
,
F
.
when
(
F
.
col
(
"color"
)
.
isNotNull
()
&
F
.
col
(
"color"
)
.
rlike
(
KEYWORD_PATTERN
),
F
.
regexp_extract
(
F
.
col
(
"color"
),
KEYWORD_PATTERN
,
1
)
)
.
when
(
F
.
col
(
"color"
)
.
isNotNull
()
&
F
.
col
(
"color"
)
.
rlike
(
SEPARATOR_PATTERN
),
F
.
col
(
"color"
)
)
.
when
(
F
.
col
(
"color"
)
.
isNotNull
()
&
F
.
col
(
"_matched_colors_str"
)
.
isNotNull
(),
F
.
col
(
"_matched_colors_str"
)
)
.
when
(
F
.
lower
(
F
.
concat_ws
(
" "
,
F
.
col
(
"title"
),
F
.
col
(
"describe"
)))
.
rlike
(
FALLBACK_PATTERN
),
F
.
regexp_extract
(
F
.
lower
(
F
.
concat_ws
(
" "
,
F
.
col
(
"title"
),
F
.
col
(
"describe"
))),
FALLBACK_PATTERN
,
1
)
)
.
otherwise
(
F
.
lit
(
None
))
)
.
drop
(
"_matched_colors_str"
)
return
df
@staticmethod
def
build_time_interval_type_expr
(
col_name
,
interval_dict
):
one_month
=
interval_dict
[
'one_month'
]
...
...
@@ -723,7 +827,8 @@ class KafkaFlowAsinDetail(Templates):
"collapse_asin"
,
F
.
col
(
"follow_sellers"
)
.
alias
(
"follow_sellers_count"
),
"seller_json"
,
F
.
col
(
"describe"
)
.
alias
(
"asin_describe"
),
F
.
round
(
"fbm_delivery_price"
,
2
)
.
alias
(
"fbm_price"
),
"asin_source_flag"
,
"bsr_last_seen_at"
,
"bsr_seen_count_30d"
,
"nsr_last_seen_at"
,
"nsr_seen_count_30d"
,
"describe_len"
,
"tracking_since"
,
"tracking_since_type"
,
"profit_key"
,
"profit_rate_extra"
,
"img_type_arr"
)
"describe_len"
,
"tracking_since"
,
"tracking_since_type"
,
"profit_key"
,
"profit_rate_extra"
,
"img_type_arr"
,
"multi_color_flag"
,
"multi_color_str"
)
df_save
=
df_save
.
na
.
fill
(
{
"zr_counts"
:
0
,
"sp_counts"
:
0
,
"sb_counts"
:
0
,
"vi_counts"
:
0
,
"bs_counts"
:
0
,
"ac_counts"
:
0
,
"tr_counts"
:
0
,
"er_counts"
:
0
,
"title_len"
:
0
,
"total_comments"
:
0
,
"variation_num"
:
0
,
"img_num"
:
0
,
...
...
@@ -733,7 +838,7 @@ class KafkaFlowAsinDetail(Templates):
"ao_val_type"
:
0
,
"rank_type"
:
0
,
"price_type"
:
0
,
"quantity_variation_type"
:
0
,
"package_quantity"
:
1
,
"is_movie_label"
:
0
,
"is_brand_label"
:
0
,
"is_alarm_brand"
:
0
,
"asin_lqs_rating"
:
0.0
,
"follow_sellers_count"
:
-
1
,
"bsr_last_seen_at"
:
"1970-01-01"
,
"bsr_seen_count_30d"
:
0
,
"nsr_last_seen_at"
:
"1970-01-01"
,
"nsr_seen_count_30d"
:
0
,
"describe_len"
:
0
}
"describe_len"
:
0
,
"multi_color_flag"
:
0
}
)
print
(
"asin的标准信息:"
)
df_save
.
show
(
10
,
truncate
=
False
)
...
...
@@ -934,6 +1039,13 @@ class KafkaFlowAsinDetail(Templates):
self
.
df_asin_profit_rate
=
self
.
df_asin_profit_rate
.
repartition
(
self
.
repartition_num
)
.
persist
(
StorageLevel
.
DISK_ONLY
)
self
.
df_asin_profit_rate
.
show
(
10
,
truncate
=
False
)
print
(
"18. 读取颜色词表 dim_asin_color_info"
)
color_rows
=
self
.
spark
.
sql
(
f
"SELECT lower(en_name) as en_name FROM dim_asin_color_info WHERE site_name='{self.site_name}'"
)
.
collect
()
self
.
color_set
=
{
row
.
en_name
for
row
in
color_rows
}
print
(
f
"颜色词表共 {len(self.color_set)} 条"
)
# 字段处理逻辑综合
def
handle_all_field
(
self
,
df
):
# 1. 处理asin分类及排名以及排名类型字段
...
...
@@ -954,6 +1066,8 @@ class KafkaFlowAsinDetail(Templates):
df
=
self
.
handle_asin_measure
(
df
)
# 9. 提取打包数量字段
df
=
self
.
handle_asin_package_quantity
(
df
)
# 9.5. 多色判断(依赖 color 字段已从变体属性+product_json 补全)
df
=
self
.
handle_multi_color_flag
(
df
)
# 10. 处理品牌标签、是否告警品牌、处理asin_lqs_rating信息
df
=
self
.
handle_asin_lqs_and_brand
(
df
)
# 11.通过ASIN页面信息处理(评分类型、上架时间类型、电影标签、ASIN类型、有效类型)
...
...
Pyspark_job/my_kafka/kafka_rank_asin_detail.py
View file @
a843829a
import
os
import
re
import
sys
import
time
import
traceback
...
...
@@ -106,6 +107,7 @@ class KafkaRankAsinDetail(Templates):
self
.
df_asin_source_flag
=
self
.
spark
.
sql
(
"select 1+1;"
)
self
.
df_keepa_asin
=
self
.
spark
.
sql
(
"select 1+1;"
)
self
.
df_asin_profit_rate
=
self
.
spark
.
sql
(
"select 1+1;"
)
self
.
color_set
=
set
()
# udf函数注册
package_schema
=
StructType
([
StructField
(
"parse_package_quantity"
,
IntegerType
(),
True
),
...
...
@@ -449,6 +451,11 @@ class KafkaRankAsinDetail(Templates):
withColumn
(
"title_package_quantity_is_abnormal"
,
df
.
title_parse
.
getField
(
"is_package_quantity_abnormal"
))
.
\
withColumn
(
"variat_package_quantity_is_abnormal"
,
df
.
variat_parse
.
getField
(
"is_package_quantity_abnormal"
))
.
\
drop
(
"title_parse"
,
"variat_parse"
,
"variat_attribute"
)
# 从 product_json 额外提取 Color 字段作为颜色来源备用
df
=
df
.
withColumn
(
"_product_json_color"
,
F
.
lower
(
F
.
get_json_object
(
F
.
col
(
"product_json"
),
"$.Color"
))
)
# Number of Items:从 product_json 提取,cast 失败(脏数据)自动为 null,提取后立即 drop
df
=
df
.
withColumn
(
"number_of_items"
,
...
...
@@ -470,6 +477,8 @@ class KafkaRankAsinDetail(Templates):
)
.
drop
(
"number_of_items"
,
"title_package_quantity"
,
"variat_package_quantity"
,
"title_package_quantity_is_abnormal"
,
"variat_package_quantity_is_abnormal"
)
df
=
df
.
withColumn
(
"title"
,
F
.
lower
(
F
.
col
(
"title"
)))
# color 优先使用变体属性颜色,fallback 到 product_json 中的 Color 字段
df
=
df
.
withColumn
(
"color"
,
F
.
coalesce
(
F
.
col
(
"color"
),
F
.
col
(
"_product_json_color"
)))
.
drop
(
"_product_json_color"
)
df
=
df
.
join
(
self
.
df_user_package_num
,
on
=
[
'asin'
,
'title'
],
how
=
'left'
)
df
=
df
.
withColumn
(
"package_quantity"
,
F
.
coalesce
(
F
.
col
(
"user_package_num"
),
F
.
col
(
"package_quantity"
)))
.
\
withColumn
(
...
...
@@ -522,6 +531,101 @@ class KafkaRankAsinDetail(Templates):
"title_len_rating"
,
"title_brand_rating"
,
"img_num_rating"
,
"img_enlarge_rating"
)
return
df
def
handle_multi_color_flag
(
self
,
df
):
"""判断 ASIN 是否为颜色组合产品
multi_color_flag:
0 = 非多色
1 = 从颜色变体属性字段(color)解析为多色
2 = 从标题或五点描述解析为多色(降级 fallback)
颜色变体属性三层判断:
1. 命中颜色组合关键词
2. 含分隔符 (/ + & ; , and) 或 "数字 color"
3. 含 2 个及以上颜色表中的单色词
"""
# ── 第1层:关键词正则(支持 color/colors/colour/colours)──
KEYWORD_PATTERN
=
(
r"(?i)("
r"\bmulticolou?rs?\b|\bmulti[\s\-]colou?rs?\b|\bmulti[\s\-]colored\b|\bmulticolored\b|"
r"\bassorted\b|\bmorandi\b|\bpastel\b|\bvibrant\b|\bvintage\b|\bboho\b|\bgradient\b|"
r"\bcandy\b|\bdusty\b|\bfluorescent\b|\bgentle\b|\bneutral\b|\bsoft\b|\bmuted\b|"
r"\brainbow\b|\bmaillard\b|\bcolorful\b|\bcolourful\b|\bmulti\b|"
r"\baesthetic colou?rs?\b|\bdreamy colou?rs?\b|\bearthy colou?rs?\b|\bshades of\b|"
r"\bvarious colou?rs?\b|\bsolid colou?rs?\b|\bmix colou?rs?\b|\bmixed colou?rs?\b|"
r"\bbasic colou?rs?\b|\blightcolor\b|\bdarkcolor\b|\battractive colou?rs?\b|"
r"\bmultiple colou?rs?\b|\bbright colorful\b|\bdifferent colou?rs?\b|\bclassic colou?rs?\b|"
r"\bfriendly colou?rs?\b|\bwarm colou?rs?\b|\bfun colou?rs?\b|\bmetallic colou?rs?\b|"
r"\bbright colou?rs?\b|\bdark colou?rs?\b|\blight colou?rs?\b|"
r"\bautumn colou?rs?\b|\bsummer colou?rs?\b|\bwinter colou?rs?\b|\bspring colou?rs?\b"
r")"
)
# ── 第2层:分隔符正则(/ + & ; 逗号 and 数字+color)──
SEPARATOR_PATTERN
=
r"(?i)[/+&;;,,]|\band\b|\d+\s*colou?rs?"
# ── 第3层:颜色表多色词检测 UDF ──
single_colors
=
sorted
(
[
c
for
c
in
self
.
color_set
if
c
.
strip
()
and
not
re
.
search
(
r'[/+&;;,,]|\band\b'
,
c
)],
key
=
len
,
reverse
=
True
)
if
single_colors
:
color_regex
=
re
.
compile
(
r'(?i)\b('
+
'|'
.
join
(
re
.
escape
(
c
)
for
c
in
single_colors
)
+
r')\b'
)
else
:
color_regex
=
None
def
_get_matched_colors
(
color_str
):
if
not
color_str
or
color_regex
is
None
:
return
None
matched
=
sorted
({
m
.
group
(
1
)
.
lower
()
for
m
in
color_regex
.
finditer
(
color_str
)})
return
"/"
.
join
(
matched
)
if
len
(
matched
)
>=
2
else
None
udf_matched_colors
=
F
.
udf
(
_get_matched_colors
,
StringType
())
# ── 降级:标题 + 五点描述关键词正则 ──
FALLBACK_PATTERN
=
(
r"(?i)("
r"\bmulticolou?rs?\b|\bmulti[\s\-]colou?rs?\b|\bmulti[\s\-]colored\b|\bmulticolored\b|"
r"\bassorted colou?rs?\b|\bfluorescent colou?rs?\b|\bdifferent colou?rs?\b|"
r"\bbright colou?rs?\b|\bcolorful\b|\bcolourful\b|\battractive colou?rs?\b|"
r"\bvibrant colou?rs?\b|\d+\s*colou?rs?"
r")"
)
# 提前计算 UDF 结果,避免 multi_color_flag / multi_color_str 各调用一次(性能优化)
df
=
df
.
withColumn
(
"_matched_colors_str"
,
udf_matched_colors
(
F
.
col
(
"color"
)))
df
=
df
.
withColumn
(
"multi_color_flag"
,
F
.
when
(
F
.
col
(
"color"
)
.
isNotNull
()
&
F
.
col
(
"color"
)
.
rlike
(
KEYWORD_PATTERN
),
1
)
.
when
(
F
.
col
(
"color"
)
.
isNotNull
()
&
F
.
col
(
"color"
)
.
rlike
(
SEPARATOR_PATTERN
),
1
)
.
when
(
F
.
col
(
"color"
)
.
isNotNull
()
&
F
.
col
(
"_matched_colors_str"
)
.
isNotNull
(),
1
)
.
when
(
F
.
lower
(
F
.
concat_ws
(
" "
,
F
.
col
(
"title"
),
F
.
col
(
"describe"
)))
.
rlike
(
FALLBACK_PATTERN
),
2
)
.
otherwise
(
0
)
)
.
withColumn
(
"multi_color_str"
,
F
.
when
(
F
.
col
(
"color"
)
.
isNotNull
()
&
F
.
col
(
"color"
)
.
rlike
(
KEYWORD_PATTERN
),
F
.
regexp_extract
(
F
.
col
(
"color"
),
KEYWORD_PATTERN
,
1
)
)
.
when
(
F
.
col
(
"color"
)
.
isNotNull
()
&
F
.
col
(
"color"
)
.
rlike
(
SEPARATOR_PATTERN
),
F
.
col
(
"color"
)
)
.
when
(
F
.
col
(
"color"
)
.
isNotNull
()
&
F
.
col
(
"_matched_colors_str"
)
.
isNotNull
(),
F
.
col
(
"_matched_colors_str"
)
)
.
when
(
F
.
lower
(
F
.
concat_ws
(
" "
,
F
.
col
(
"title"
),
F
.
col
(
"describe"
)))
.
rlike
(
FALLBACK_PATTERN
),
F
.
regexp_extract
(
F
.
lower
(
F
.
concat_ws
(
" "
,
F
.
col
(
"title"
),
F
.
col
(
"describe"
))),
FALLBACK_PATTERN
,
1
)
)
.
otherwise
(
F
.
lit
(
None
))
)
.
drop
(
"_matched_colors_str"
)
return
df
@staticmethod
def
build_time_interval_type_expr
(
col_name
,
interval_dict
):
one_month
=
interval_dict
[
'one_month'
]
...
...
@@ -722,7 +826,8 @@ class KafkaRankAsinDetail(Templates):
"collapse_asin"
,
F
.
col
(
"follow_sellers"
)
.
alias
(
"follow_sellers_count"
),
"seller_json"
,
F
.
col
(
"describe"
)
.
alias
(
"asin_describe"
),
F
.
round
(
"fbm_delivery_price"
,
2
)
.
alias
(
"fbm_price"
),
"asin_source_flag"
,
"bsr_last_seen_at"
,
"bsr_seen_count_30d"
,
"nsr_last_seen_at"
,
"nsr_seen_count_30d"
,
"describe_len"
,
"tracking_since"
,
"tracking_since_type"
,
"profit_key"
,
"profit_rate_extra"
,
"img_type_arr"
)
"describe_len"
,
"tracking_since"
,
"tracking_since_type"
,
"profit_key"
,
"profit_rate_extra"
,
"img_type_arr"
,
"multi_color_flag"
,
"multi_color_str"
)
df_save
=
df_save
.
na
.
fill
(
{
"zr_counts"
:
0
,
"sp_counts"
:
0
,
"sb_counts"
:
0
,
"vi_counts"
:
0
,
"bs_counts"
:
0
,
"ac_counts"
:
0
,
"tr_counts"
:
0
,
"er_counts"
:
0
,
"title_len"
:
0
,
"total_comments"
:
0
,
"variation_num"
:
0
,
"img_num"
:
0
,
...
...
@@ -732,7 +837,7 @@ class KafkaRankAsinDetail(Templates):
"ao_val_type"
:
0
,
"rank_type"
:
0
,
"price_type"
:
0
,
"quantity_variation_type"
:
0
,
"package_quantity"
:
1
,
"is_movie_label"
:
0
,
"is_brand_label"
:
0
,
"is_alarm_brand"
:
0
,
"asin_lqs_rating"
:
0.0
,
"follow_sellers_count"
:
-
1
,
"bsr_last_seen_at"
:
"1970-01-01"
,
"bsr_seen_count_30d"
:
0
,
"nsr_last_seen_at"
:
"1970-01-01"
,
"nsr_seen_count_30d"
:
0
,
"describe_len"
:
0
}
"describe_len"
:
0
,
"multi_color_flag"
:
0
}
)
print
(
"asin的标准信息:"
)
df_save
.
show
(
10
,
truncate
=
False
)
...
...
@@ -933,6 +1038,13 @@ class KafkaRankAsinDetail(Templates):
self
.
df_asin_profit_rate
=
self
.
df_asin_profit_rate
.
repartition
(
self
.
repartition_num
)
.
persist
(
StorageLevel
.
DISK_ONLY
)
self
.
df_asin_profit_rate
.
show
(
10
,
truncate
=
False
)
print
(
"18. 读取颜色词表 dim_asin_color_info"
)
color_rows
=
self
.
spark
.
sql
(
f
"SELECT lower(en_name) as en_name FROM dim_asin_color_info WHERE site_name='{self.site_name}'"
)
.
collect
()
self
.
color_set
=
{
row
.
en_name
for
row
in
color_rows
}
print
(
f
"颜色词表共 {len(self.color_set)} 条"
)
# 字段处理逻辑综合
def
handle_all_field
(
self
,
df
):
# 1. 处理asin分类及排名以及排名类型字段
...
...
@@ -953,6 +1065,8 @@ class KafkaRankAsinDetail(Templates):
df
=
self
.
handle_asin_measure
(
df
)
# 9. 提取打包数量字段
df
=
self
.
handle_asin_package_quantity
(
df
)
# 9.5. 多色判断(依赖 color 字段已从变体属性+product_json 补全)
df
=
self
.
handle_multi_color_flag
(
df
)
# 10. 处理品牌标签、是否告警品牌、处理asin_lqs_rating信息
df
=
self
.
handle_asin_lqs_and_brand
(
df
)
# 11.通过ASIN页面信息处理(评分类型、上架时间类型、电影标签、ASIN类型、有效类型)
...
...
Pyspark_job/utils/es_util.py
View file @
a843829a
...
...
@@ -1053,6 +1053,12 @@ class EsUtils(object):
},
"date_info_del"
:
{
"type"
:
"keyword"
},
"multi_color_flag"
:
{
"type"
:
"integer"
},
"multi_color_str"
:
{
"type"
:
"keyword"
}
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment