Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
4d168d08
Commit
4d168d08
authored
Sep 29, 2025
by
chenyuanjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ABA主题标签-过滤模板词
parent
4b76286f
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
13 additions
and
28 deletions
+13
-28
dwt_st_theme_agg.py
Pyspark_job/dwt/dwt_st_theme_agg.py
+13
-28
No files found.
Pyspark_job/dwt/dwt_st_theme_agg.py
View file @
4d168d08
...
@@ -40,7 +40,7 @@ class DwtStThemeAgg(object):
...
@@ -40,7 +40,7 @@ class DwtStThemeAgg(object):
self
.
u_theme_pattern
=
F
.
udf
(
udf_ele_mattch
,
StringType
())
self
.
u_theme_pattern
=
F
.
udf
(
udf_ele_mattch
,
StringType
())
self
.
u_theme_contain_judge
=
F
.
udf
(
self
.
udf_theme_contain_judge
,
IntegerType
())
self
.
u_theme_contain_judge
=
F
.
udf
(
self
.
udf_theme_contain_judge
,
IntegerType
())
self
.
u_judge_twin_words
=
F
.
udf
(
self
.
udf_judge_twin_words
,
IntegerType
())
self
.
u_judge_twin_words
=
F
.
udf
(
self
.
udf_judge_twin_words
,
IntegerType
())
self
.
u_filter_
sec_pattern_words
=
F
.
udf
(
self
.
udf_filter_sec
_pattern_words
,
IntegerType
())
self
.
u_filter_
pattern_words
=
F
.
udf
(
self
.
udf_filter
_pattern_words
,
IntegerType
())
# 全局df初始化
# 全局df初始化
self
.
df_st_base
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_st_base
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
...
@@ -180,8 +180,7 @@ class DwtStThemeAgg(object):
...
@@ -180,8 +180,7 @@ class DwtStThemeAgg(object):
return
F
.
udf
(
udf_filter_blacklist
,
IntegerType
())
return
F
.
udf
(
udf_filter_blacklist
,
IntegerType
())
@staticmethod
@staticmethod
def
udf_filter_sec_pattern_words
(
st_word
,
pattern_list
):
def
udf_filter_pattern_words
(
st_word
,
pattern_list
):
# 标记一些特殊情况指定的二级词,方便后期过滤
filter_flag
=
0
filter_flag
=
0
theme_list
=
[
'combination'
,
'size'
]
theme_list
=
[
'combination'
,
'size'
]
if
pattern_list
:
if
pattern_list
:
...
@@ -191,7 +190,7 @@ class DwtStThemeAgg(object):
...
@@ -191,7 +190,7 @@ class DwtStThemeAgg(object):
# 进行单项 数字+month/months的所有二级词 和 数字连接t+ boys/girls的二级词特殊匹配
# 进行单项 数字+month/months的所有二级词 和 数字连接t+ boys/girls的二级词特殊匹配
date_pattern
=
re
.
compile
(
r"(\d+(?:\.\d+)?) +(month|months)\b"
,
flags
=
re
.
IGNORECASE
)
date_pattern
=
re
.
compile
(
r"(\d+(?:\.\d+)?) +(month|months)\b"
,
flags
=
re
.
IGNORECASE
)
numt_pattern
=
re
.
compile
(
r"((?:\d+)t)(?: +)(boys|girls|boy|girl)\b"
,
flags
=
re
.
IGNORECASE
)
numt_pattern
=
re
.
compile
(
r"((?:\d+)t)(?: +)(boys|girls|boy|girl)\b"
,
flags
=
re
.
IGNORECASE
)
other_pattern
=
re
.
compile
(
r"\b(women
|men|man|woman|for|cute|fashion
|kids?|adults?|girls?|boys?)\b"
,
flags
=
re
.
IGNORECASE
)
other_pattern
=
re
.
compile
(
r"\b(women
s?|mens?|mans?|womans?|fors?|cutes?|fashions?
|kids?|adults?|girls?|boys?)\b"
,
flags
=
re
.
IGNORECASE
)
if
re
.
search
(
date_pattern
,
st_word
):
if
re
.
search
(
date_pattern
,
st_word
):
return
1
return
1
if
re
.
search
(
numt_pattern
,
st_word
):
if
re
.
search
(
numt_pattern
,
st_word
):
...
@@ -350,8 +349,6 @@ class DwtStThemeAgg(object):
...
@@ -350,8 +349,6 @@ class DwtStThemeAgg(object):
self
.
read_data
()
self
.
read_data
()
# 模板词归一化处理
# 模板词归一化处理
self
.
handle_base_pattern_data
()
self
.
handle_base_pattern_data
()
# 二级词单独处理
self
.
handle_sec_st
()
# 将一级二级模板词和搜索词进行匹配,做中间存储
# 将一级二级模板词和搜索词进行匹配,做中间存储
self
.
handle_st_filter_table
()
self
.
handle_st_filter_table
()
# 统计各模板词的指标 pattern_type=0
# 统计各模板词的指标 pattern_type=0
...
@@ -399,23 +396,18 @@ class DwtStThemeAgg(object):
...
@@ -399,23 +396,18 @@ class DwtStThemeAgg(object):
'st_blacklist_flag'
,
self
.
filter_blacklist_words
(
pd_match_blacklist
)(
"search_term"
)
'st_blacklist_flag'
,
self
.
filter_blacklist_words
(
pd_match_blacklist
)(
"search_term"
)
)
.
filter
(
'st_blacklist_flag != 1'
)
.
cache
()
)
.
filter
(
'st_blacklist_flag != 1'
)
.
cache
()
# 处理二级词
def
handle_st_filter_table
(
self
):
def
handle_sec_st
(
self
):
# 过滤特殊词
self
.
df_sec_words
=
self
.
df_base_filter_date
.
filter
(
'st_word_num = 2'
)
self
.
df_base_filter_date
=
self
.
df_base_filter_date
.
join
(
self
.
df_sec_words
=
self
.
df_sec_words
.
join
(
self
.
df_theme
,
on
=
[
'search_term'
],
how
=
'left'
self
.
df_theme
,
on
=
[
'search_term'
],
how
=
'left'
)
)
.
withColumn
(
self
.
df_sec_words
=
self
.
df_sec_words
.
withColumn
(
"filter_flag"
,
self
.
u_filter_pattern_words
(
F
.
col
(
"search_term"
),
F
.
col
(
"pattern_list"
))
"filter_flag"
,
self
.
u_filter_sec_pattern_words
(
F
.
col
(
"search_term"
),
F
.
col
(
"pattern_list"
))
)
.
filter
(
)
"filter_flag != 1"
# 过滤掉被标记为1的数据
)
.
select
(
self
.
df_sec_words
=
self
.
df_sec_words
.
filter
(
"filter_flag != 1"
)
'search_term'
,
'st_word_num'
,
'st_bsr_cate_1_id_new'
,
'st_bsr_cate_current_id_new'
,
'rank'
,
'rank_change_rate'
,
'rank_rate_of_change'
self
.
df_sec_words
=
self
.
df_sec_words
.
select
(
)
.
cache
()
'search_term'
,
'st_word_num'
,
'st_bsr_cate_1_id_new'
,
'st_bsr_cate_current_id_new'
,
'rank'
,
'rank_change_rate'
,
'rank_rate_of_change'
)
def
handle_st_filter_table
(
self
):
df_st_filter_base
=
self
.
df_st_base
.
select
(
df_st_filter_base
=
self
.
df_st_base
.
select
(
F
.
col
(
'st_key'
),
F
.
col
(
'st_key'
),
F
.
col
(
'search_term'
),
F
.
col
(
'search_term'
),
...
@@ -425,12 +417,6 @@ class DwtStThemeAgg(object):
...
@@ -425,12 +417,6 @@ class DwtStThemeAgg(object):
F
.
lit
(
self
.
date_info
)
.
alias
(
'date_info'
)
F
.
lit
(
self
.
date_info
)
.
alias
(
'date_info'
)
)
.
cache
()
)
.
cache
()
# 将处理后的二级词和一级词合并
df_one_word
=
self
.
df_base_filter_date
.
filter
(
'st_word_num = 1'
)
.
select
(
'search_term'
,
'st_word_num'
,
'st_bsr_cate_1_id_new'
,
'st_bsr_cate_current_id_new'
,
'rank'
,
'rank_change_rate'
,
'rank_rate_of_change'
)
self
.
df_base_filter_date
=
self
.
df_sec_words
.
unionByName
(
df_one_word
)
.
cache
()
pattern_words
=
self
.
df_base_filter_date
.
select
(
'search_term'
)
pattern_words
=
self
.
df_base_filter_date
.
select
(
'search_term'
)
# 将数据转换成pandas_df
# 将数据转换成pandas_df
dict_df
=
pattern_words
.
toPandas
()
dict_df
=
pattern_words
.
toPandas
()
...
@@ -461,7 +447,6 @@ class DwtStThemeAgg(object):
...
@@ -461,7 +447,6 @@ class DwtStThemeAgg(object):
df_list
.
append
(
df_union_filter
)
df_list
.
append
(
df_union_filter
)
for
i
in
range
(
0
,
len
(
df_list
),
batch_size
):
for
i
in
range
(
0
,
len
(
df_list
),
batch_size
):
print
(
f
"当前是word_batches的轮回:f{word_batches.index(word_batch)},当前写入表的df索引位置:{i + 1}"
)
print
(
f
"当前是word_batches的轮回:f{word_batches.index(word_batch)},当前写入表的df索引位置:{i + 1}"
)
tmp_df
=
[]
tmp_df
=
df_list
[
i
:
i
+
batch_size
]
tmp_df
=
df_list
[
i
:
i
+
batch_size
]
result_df
=
self
.
udf_unionAll
(
*
tmp_df
)
result_df
=
self
.
udf_unionAll
(
*
tmp_df
)
result_df
=
result_df
.
repartition
(
1
)
result_df
=
result_df
.
repartition
(
1
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment