Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
fa8a7fbb
Commit
fa8a7fbb
authored
Sep 05, 2025
by
chenyuanjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ABA主题标签优化
parent
63a52979
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
227 additions
and
217 deletions
+227
-217
dwt_st_theme_agg.py
Pyspark_job/dwt/dwt_st_theme_agg.py
+217
-208
dwt_st_theme_agg.py
Pyspark_job/sqoop_export/dwt_st_theme_agg.py
+10
-9
No files found.
Pyspark_job/dwt/dwt_st_theme_agg.py
View file @
fa8a7fbb
...
...
@@ -27,15 +27,11 @@ class DwtStThemeAgg(object):
self
.
date_type
=
date_type
self
.
date_info
=
date_info
self
.
hive_tb
=
"dwt_st_theme_agg"
# self.hive_tb = "tmp_st_theme_agg"
self
.
partition_dict
=
{
"site_name"
:
site_name
,
"date_type"
:
date_type
,
"date_info"
:
date_info
}
# 落表路径校验
# self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)
# 创建spark_session对象相关
app_name
=
f
"{self.__class__.__name__}:{site_name}:{date_info}"
self
.
spark
=
SparkUtil
.
get_spark_session
(
app_name
)
...
...
@@ -46,10 +42,9 @@ class DwtStThemeAgg(object):
self
.
u_judge_twin_words
=
F
.
udf
(
self
.
udf_judge_twin_words
,
IntegerType
())
self
.
u_filter_sec_pattern_words
=
F
.
udf
(
self
.
udf_filter_sec_pattern_words
,
IntegerType
())
# self.u_split_words = F.udf(self.udf_split_words, StringType())
# 全局df初始化
self
.
df_st_base
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_usr_mask
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_base_filter_date
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_pattern_words_base
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_sec_words
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
...
...
@@ -57,33 +52,25 @@ class DwtStThemeAgg(object):
self
.
df_theme
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_st_theme
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_st_theme_base
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_st_theme_vertical
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_st_filter
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_pattern_st_agg
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_pattern_st_words
=
self
.
spark
.
sql
(
f
"select null as pattern_st,id as st_key,search_term,bsr_orders from dwt_aba_st_analytics limit 0;"
)
self
.
combine_df
=
self
.
spark
.
sql
(
f
"select id as st_key,search_term,bsr_orders,'' as pattern_st from dwt_aba_st_analytics limit 0;"
)
f
"select null as pattern_st,id as st_key,search_term,bsr_orders from dwt_aba_st_analytics limit 0;"
)
self
.
df_st_theme_agg
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_st_topic_base
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_st_match_topic_detail
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_st_match_topic_agg
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_match_brand
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_match_blacklist
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
# 其他变量
self
.
brand_pattern
=
str
()
# 正则匹配
self
.
theme_list_str
=
str
()
# 正则匹配
self
.
st_word_list
=
[]
@staticmethod
def
udf_unionAll
(
*
dfs
):
return
reduce
(
DataFrame
.
unionAll
,
dfs
)
@staticmethod
def
udf_theme_contain_judge
(
pattern_word
,
pattern_list
):
count
=
sum
(
1
for
word
in
pattern_list
if
pattern_word
in
word
)
...
...
@@ -126,7 +113,6 @@ class DwtStThemeAgg(object):
judge_flag
=
1
return
judge_flag
@staticmethod
def
udf_theme_regex
(
pattern
):
...
...
@@ -204,65 +190,93 @@ class DwtStThemeAgg(object):
return
1
# 进行单项 数字+month/months的所有二级词 和 数字连接t+ boys/girls的二级词特殊匹配
date_pattern
=
re
.
compile
(
r"(\d+(?:\.\d+)?) +(month|months)\b"
,
flags
=
re
.
IGNORECASE
)
numt_pattern
=
re
.
compile
(
"r'((?:
\
d+)t)(?: +)(boys|girls|boy|girl)
\b
'
"
,
flags
=
re
.
IGNORECASE
)
for_pattern
=
re
.
compile
(
r"\bfor
\b"
,
flags
=
re
.
IGNORECASE
)
numt_pattern
=
re
.
compile
(
r"((?:\d+)t)(?: +)(boys|girls|boy|girl)\b
"
,
flags
=
re
.
IGNORECASE
)
other_pattern
=
re
.
compile
(
r"\b(women|men|man|woman|for|cute|fashion|kids?|adults?|girls?|boys?)
\b"
,
flags
=
re
.
IGNORECASE
)
if
re
.
search
(
date_pattern
,
st_word
):
return
1
if
re
.
search
(
numt_pattern
,
st_word
):
return
1
if
re
.
search
(
fo
r_pattern
,
st_word
):
if
re
.
search
(
othe
r_pattern
,
st_word
):
return
1
return
filter_flag
def
read_data
(
self
):
print
(
"======================查询sql如下======================"
)
# 获取搜索词基础数据
# 月ABA词基础数据
sql
=
f
"""
select
id as st_key,
lower(search_term)
search_term,
search_term,
bsr_orders
from dwt_aba_st_analytics
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
and st_bsr_cate_1_id_new is not null
and st_bsr_cate_1_id_new not in ("audible", "books","digital-text","dmusic","mobile-apps","movies-tv","music","software","videogames")
and st_bsr_cate_1_id_new not in ("audible", "books", "digital-text", "dmusic", "mobile-apps", "movies-tv",
"music", "software", "videogames")
"""
print
(
sql
)
self
.
df_st_base
=
self
.
spark
.
sql
(
sql
)
.
cache
()
# self.df_st_base.show(10, truncate=False)
# 获取搜索的二级词和三级词原始过滤数据
self
.
df_st_base
=
self
.
spark
.
sql
(
sql
)
# 根据ABA词计算匹配模板词,要求:
# 1. ABA词汇数是1和2
# 2. 与ABA月数据默认类型保持一致(济苍处理)
# 3. 只能由数字和字母组成,且不能为纯数字
# 4. 必须有分类
# 5. 单复数关系、词序不同的问题,做归一化处理,保留排名最好的词
sql
=
f
"""
select search_term,
st_word_num,rank,st_brand_label
from (
select search_term,
st_word_num, rank, st_brand_label, st_bsr_cate_1_id_new, st_bsr_cate_current_id_new
from (
select search_term,
regexp_replace(search_term,' ','') as search_term_without_space,
st_word_num,
rank,
st_movie_label,
st_brand_label
st_brand_label,
st_bsr_cate_1_id_new,
st_bsr_cate_current_id_new
from dwt_aba_st_analytics
where site_name = '{site_name}'
and date_type = '{date_type}'
and date_info = '{date_info}'
and st_bsr_cate_1_id_new is not null
and st_bsr_cate_1_id_new not in
("audible", "books", "digital-text", "dmusic", "mobile-apps", "movies-tv", "music", "software",
"videogames")
and st_word_num <= 3
and st_word_num >= 2
and st_movie_label < 3
and st_bsr_cate_1_id_new not in ("audible", "books", "digital-text", "dmusic", "mobile-apps",
"movies-tv", "music", "software", "videogames")
and st_word_num <= 2
and st_brand_label <= 1
) t1
where search_term_without_space rlike '^
[0-9a-zA-Z]*
$'
where search_term_without_space rlike '^
(?![0-9]+$)[0-9a-zA-Z]+
$'
"""
self
.
df_pattern_words_base
=
self
.
spark
.
sql
(
sql
)
# 提前处理给叠词打上标签,并对不需要的叠词直接过滤
self
.
df_pattern_words_base
=
self
.
df_pattern_words_base
.
withColumn
(
'twin_words_flag'
,
self
.
u_judge_twin_words
(
F
.
col
(
'search_term'
)))
self
.
df_pattern_words_base
=
self
.
df_pattern_words_base
.
filter
(
" twin_words_flag == 0"
)
.
cache
()
# 读取ABA词排名变化数据
sql
=
f
"""
select
search_term,
rank_change_rate,
rank_rate_of_change
from dwt_aba_last_change_rate
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
df_st_rank
=
self
.
spark
.
sql
(
sql
)
.
repartition
(
40
,
'search_term'
)
# 过滤掉叠词脏数据,例如 gun gun
self
.
df_pattern_words_base
=
self
.
df_pattern_words_base
.
withColumn
(
"twin_words_flag"
,
F
.
when
(
F
.
size
(
F
.
split
(
F
.
col
(
"search_term"
),
" "
))
==
1
,
F
.
lit
(
0
)
)
.
otherwise
(
self
.
u_judge_twin_words
(
F
.
col
(
"search_term"
))
)
)
.
filter
(
"twin_words_flag = 0"
)
.
repartition
(
40
,
"search_term"
)
# 关联排名变化数据
self
.
df_pattern_words_base
=
self
.
df_pattern_words_base
.
join
(
df_st_rank
,
on
=
[
'search_term'
],
how
=
'left'
)
.
cache
()
print
(
'一二级词基础数据如下:'
)
self
.
df_pattern_words_base
.
show
(
10
,
True
)
# 读取ABA词主题标签数据
sql
=
f
"""
select
st_key,
...
...
@@ -279,24 +293,27 @@ class DwtStThemeAgg(object):
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self
.
df_st_theme_base
=
self
.
spark
.
sql
(
sql
)
.
cache
()
self
.
df_st_theme_base
=
self
.
spark
.
sql
(
sql
)
.
repartition
(
40
,
'search_term'
)
.
cache
()
print
(
'ABA词主题标签数据如下:'
)
self
.
df_st_theme_base
.
show
(
10
,
True
)
#
获取主题词
#
聚合搜索词+主题数据,后续过滤二级词用
sql
=
f
"""
select
search_term,
concat_ws(",",collect_list(theme_label_en)) as pattern_list
concat_ws(",", collect_list(theme_en)) as pattern_list
from big_data_selection.dws_st_theme
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
group by st_key,
search_term
group by st_key,
search_term
"""
self
.
df_theme
=
self
.
spark
.
sql
(
sql
)
.
cache
()
self
.
df_theme
=
self
.
spark
.
sql
(
sql
)
.
repartition
(
40
,
'search_term'
)
.
cache
()
# sql获取最终品牌词匹配需保留得品牌词库
pg_sql
=
f
"""
select lower(trim(character_name)) as st_brand_name_lower
select
lower(trim(character_name)) as st_brand_name_lower
from match_character_dict where match_type = '二三级词专用品牌词库'
"""
conn_info
=
DBUtil
.
get_connection_info
(
"mysql"
,
"us"
)
...
...
@@ -307,15 +324,17 @@ class DwtStThemeAgg(object):
username
=
conn_info
[
"username"
],
query
=
pg_sql
)
pdf_match_brand
=
self
.
df_match_brand
.
toPandas
()
match_brand
=
list
(
set
(
pdf_match_brand
.
st_brand_name_lower
))
self
.
brand_pattern
=
re
.
compile
(
r'(?<!\+|\*|\-|\%|\.)\b({})\b'
.
format
(
'|'
.
join
([
re
.
escape
(
x
)
for
x
in
match_brand
])),
flags
=
re
.
IGNORECASE
)
self
.
brand_pattern
=
re
.
compile
(
r'(?<!\+|\*|\-|\%|\.)\b({})\b'
.
format
(
'|'
.
join
([
re
.
escape
(
x
)
for
x
in
match_brand
])),
flags
=
re
.
IGNORECASE
)
# sql获取二三级词黑名单库
pg_sql
=
f
"""
select lower(trim(character_name)) as st_blacklist_word_lower,specical_match_type
select
lower(trim(character_name)) as st_blacklist_word_lower,
specical_match_type
from match_character_dict where match_type = '二三级词匹配黑名单'
"""
conn_info
=
DBUtil
.
get_connection_info
(
"mysql"
,
"us"
)
...
...
@@ -325,87 +344,76 @@ class DwtStThemeAgg(object):
pwd
=
conn_info
[
"pwd"
],
username
=
conn_info
[
"username"
],
query
=
pg_sql
)
)
.
cache
()
def
handle_data
(
self
):
self
.
read_data
()
# 模板词归一化处理
self
.
handle_base_pattern_data
()
# 二级词单独处理
self
.
handle_sec_st
()
self
.
handle_third_st
()
# 将一级二级模板词和搜索词进行匹配,做中间存储
self
.
handle_st_filter_table
()
# 统计各模板词的指标 pattern_type=0
self
.
handle_st_pattern_common_agg
()
# 统计各模板词的指标 pattern_type=1
self
.
handle_st_pattern_special_agg
()
self
.
save_data
()
# 处理二级词和三级词的通用逻辑
def
handle_base_pattern_data
(
self
):
# 用于处理二级词和三级词条件一致的逻辑
self
.
df_base_filter_date
=
self
.
df_pattern_words_base
self
.
df_base_filter_date
=
self
.
df_base_filter_date
.
withColumn
(
'similar_word_list'
,
self
.
udf_inflect_word
()(
F
.
col
(
'search_term'
)))
# 数据归一化处理,单复数、词序标准化,保留rank最小
self
.
df_base_filter_date
=
self
.
df_base_filter_date
.
withColumn
(
'similar_word_list'
,
self
.
udf_inflect_word
()(
F
.
col
(
'search_term'
))
)
similar_words_window
=
Window
.
partitionBy
([
"similar_word_list"
])
.
orderBy
(
self
.
df_base_filter_date
.
rank
.
asc_nulls_last
()
)
self
.
df_base_filter_date
=
self
.
df_base_filter_date
.
withColumn
(
'row_num'
,
F
.
row_number
()
.
over
(
window
=
similar_words_window
))
# CommonUtil.df_export_csv(self.spark, self.df_sec_words, 'export_sec_words_2023_10_26_detail', 100 * 10000)
self
.
df_base_filter_date
=
self
.
df_base_filter_date
.
filter
(
"row_num=1"
)
self
.
df_base_filter_date
=
self
.
df_base_filter_date
.
drop
(
*
[
'similar_word_list'
,
'row_num'
])
# 第二次过滤相似词 采用textblob词库词性还原方式过滤
self
.
df_base_filter_date
=
self
.
df_base_filter_date
.
withColumn
(
'similar_word_list'
,
self
.
udf_word_restoration
()(
F
.
col
(
'search_term'
)))
self
.
df_base_filter_date
=
self
.
df_base_filter_date
.
withColumn
(
'row_num'
,
F
.
row_number
()
.
over
(
window
=
similar_words_window
)
)
.
filter
(
'row_num = 1'
)
.
drop
(
*
[
'similar_word_list'
,
'row_num'
])
# 二次过滤相似词 采用textblob词库词性还原方式过滤
self
.
df_base_filter_date
=
self
.
df_base_filter_date
.
withColumn
(
'similar_word_list'
,
self
.
udf_word_restoration
()(
F
.
col
(
'search_term'
))
)
similar_words_window
=
Window
.
partitionBy
([
"similar_word_list"
])
.
orderBy
(
self
.
df_base_filter_date
.
rank
.
asc_nulls_last
()
)
self
.
df_base_filter_date
=
self
.
df_base_filter_date
.
withColumn
(
'row_num'
,
F
.
row_number
()
.
over
(
window
=
similar_words_window
))
# CommonUtil.df_export_csv(self.spark, self.df_sec_words, 'export_sec_words_2023_10_26_detail', 100 * 10000)
self
.
df_base_filter_date
=
self
.
df_base_filter_date
.
filter
(
"row_num=1"
)
.
cache
()
df_without_brand_words
=
self
.
df_base_filter_date
.
filter
(
"st_brand_label = 0"
)
# 单独处理品牌词内的数据逻辑
df_brand_words
=
self
.
df_base_filter_date
.
filter
(
"st_brand_label = 1"
)
df_brand_words
=
df_brand_words
.
withColumn
(
"brand_match_detail"
,
self
.
udf_theme_regex
(
self
.
brand_pattern
)(
F
.
col
(
"search_term"
)))
df_brand_words
=
df_brand_words
.
filter
(
"brand_match_detail is not null"
)
df_brand_words
=
df_brand_words
.
drop
(
'brand_match_detail'
)
self
.
df_base_filter_date
=
self
.
df_base_filter_date
.
withColumn
(
'row_num'
,
F
.
row_number
()
.
over
(
window
=
similar_words_window
)
)
.
filter
(
'row_num = 1'
)
.
drop
(
*
[
'similar_word_list'
,
'row_num'
])
.
cache
()
# 处理品牌词逻辑
df_without_brand_words
=
self
.
df_base_filter_date
.
filter
(
'st_brand_label = 0'
)
df_brand_words
=
self
.
df_base_filter_date
.
filter
(
'st_brand_label = 1'
)
df_brand_words
=
df_brand_words
.
withColumn
(
'brand_match_detail'
,
self
.
udf_theme_regex
(
self
.
brand_pattern
)(
F
.
col
(
"search_term"
))
)
.
filter
(
'brand_match_detail is not null'
)
.
drop
(
'brand_match_detail'
)
# 将处理后的品牌词与非品牌词合并
self
.
df_base_filter_date
=
df_without_brand_words
.
unionByName
(
df_brand_words
)
# 处理二三级词包含词的过滤逻辑和二三级黑名单词的过滤逻辑
# 处理黑名单词逻辑
pd_match_blacklist
=
self
.
df_match_blacklist
.
toPandas
()
self
.
df_base_filter_date
=
self
.
df_base_filter_date
.
withColumn
(
"st_blacklist_flag"
,
self
.
filter_blacklist_words
(
pd_match_blacklist
)(
"search_term"
))
# 取出非黑名单标记的数据
self
.
df_base_filter_date
=
self
.
df_base_filter_date
.
filter
(
"st_blacklist_flag != 1"
)
self
.
df_base_filter_date
=
self
.
df_base_filter_date
.
withColumn
(
'st_blacklist_flag'
,
self
.
filter_blacklist_words
(
pd_match_blacklist
)(
"search_term"
)
)
.
filter
(
'st_blacklist_flag != 1'
)
.
cache
()
# 处理二级词
def
handle_sec_st
(
self
):
self
.
df_sec_words
=
self
.
df_base_filter_date
.
filter
(
"st_word_num = 2"
)
self
.
df_sec_words
=
self
.
df_base_filter_date
.
filter
(
'st_word_num = 2'
)
self
.
df_sec_words
=
self
.
df_sec_words
.
join
(
self
.
df_theme
,
on
=
[
'search_term'
],
how
=
'left'
)
self
.
df_sec_words
=
self
.
df_sec_words
.
withColumn
(
"filter_flag"
,
self
.
u_filter_sec_pattern_words
(
F
.
col
(
"search_term"
),
F
.
col
(
"pattern_list"
))
)
self
.
df_sec_words
=
self
.
df_sec_words
.
withColumn
(
"filter_flag"
,
self
.
u_filter_sec_pattern_words
(
F
.
col
(
"search_term"
),
F
.
col
(
"pattern_list"
))
)
# 过滤掉被标记为1的数据
self
.
df_sec_words
=
self
.
df_sec_words
.
filter
(
"filter_flag != 1"
)
self
.
df_sec_words
=
self
.
df_sec_words
.
select
(
"search_term"
)
.
cache
()
# CommonUtil.df_export_csv(self.spark, self.df_sec_words, 'export_sec_words_2023_11_30', 100 * 10000)
# 处理三级词
def
handle_third_st
(
self
):
self
.
df_third_words
=
self
.
df_base_filter_date
.
filter
(
"st_word_num = 3"
)
self
.
df_third_words
=
self
.
df_third_words
.
join
(
self
.
df_theme
,
on
=
[
'search_term'
],
how
=
'left'
self
.
df_sec_words
=
self
.
df_sec_words
.
select
(
'search_term'
,
'st_word_num'
,
'st_bsr_cate_1_id_new'
,
'st_bsr_cate_current_id_new'
,
'rank'
,
'rank_change_rate'
,
'rank_rate_of_change'
)
# 过滤匹配到功能词的三级词
self
.
df_third_words
=
self
.
df_third_words
.
filter
(
"pattern_list is null"
)
self
.
df_third_words
=
self
.
df_third_words
.
select
(
"search_term"
)
.
cache
()
def
handle_st_filter_table
(
self
):
df_st_filter_base
=
self
.
df_st_base
.
select
(
...
...
@@ -415,15 +423,20 @@ class DwtStThemeAgg(object):
F
.
lit
(
self
.
site_name
)
.
alias
(
'site_name'
),
F
.
lit
(
self
.
date_type
)
.
alias
(
'date_type'
),
F
.
lit
(
self
.
date_info
)
.
alias
(
'date_info'
)
)
.
c
oalesce
(
1
)
.
c
ache
()
)
.
cache
()
# 将二级词和三级词进行合并
pattern_words
=
self
.
df_sec_words
.
unionByName
(
self
.
df_third_words
)
# 将处理后的二级词和一级词合并
df_one_word
=
self
.
df_base_filter_date
.
filter
(
'st_word_num = 1'
)
.
select
(
'search_term'
,
'st_word_num'
,
'st_bsr_cate_1_id_new'
,
'st_bsr_cate_current_id_new'
,
'rank'
,
'rank_change_rate'
,
'rank_rate_of_change'
)
self
.
df_base_filter_date
=
self
.
df_sec_words
.
unionByName
(
df_one_word
)
.
cache
()
pattern_words
=
self
.
df_base_filter_date
.
select
(
'search_term'
)
# 将数据转换成pandas_df
dict_df
=
pattern_words
.
toPandas
()
# 提取二级词和是否叠词标签转换成list[dict{}]
self
.
st_word_list
=
dict_df
.
to_dict
(
orient
=
'records'
)
# self.st_word_list = dict_df["search_term"].values.tolist()
row_size
=
40000
batch_size
=
200
# 落表路径校验
...
...
@@ -433,17 +446,18 @@ class DwtStThemeAgg(object):
partition_by
=
[
"site_name"
,
"date_type"
,
"date_info"
]
word_batches
=
[
self
.
st_word_list
[
i
:
i
+
row_size
]
for
i
in
range
(
0
,
len
(
self
.
st_word_list
),
row_size
)]
for
word_batch
in
word_batches
:
# for word_batch in word_batches[:1]:
df_list
=
[]
# 用于存储 DataFrame
for
row
in
word_batch
:
# print(f"self.st_word_list.index(word):{self.st_word_list.index(word)}, word:{word}")
# 获取处理后的多级词
pattern_st
=
row
[
"search_term"
]
# 通过方法拆分,获取完全匹配的过滤条件
filter_condition
=
self
.
st_word_filter_condition
(
pattern_st
)
filter_condition_expr
=
F
.
expr
(
filter_condition
)
df_union_filter
=
df_st_filter_base
.
filter
(
filter_condition_expr
)
df_union_filter
=
df_union_filter
.
withColumn
(
"pattern_st"
,
F
.
lit
(
pattern_st
))
df_union_filter
=
df_st_filter_base
.
filter
(
filter_condition_expr
)
.
withColumn
(
"pattern_st"
,
F
.
lit
(
pattern_st
)
)
df_list
.
append
(
df_union_filter
)
for
i
in
range
(
0
,
len
(
df_list
),
batch_size
):
print
(
f
"当前是word_batches的轮回:f{word_batches.index(word_batch)},当前写入表的df索引位置:{i + 1}"
)
...
...
@@ -452,7 +466,7 @@ class DwtStThemeAgg(object):
result_df
=
self
.
udf_unionAll
(
*
tmp_df
)
result_df
=
result_df
.
repartition
(
1
)
result_df
.
write
.
saveAsTable
(
name
=
'tmp_pattern_st_info'
,
format
=
'hive'
,
mode
=
'append'
,
partitionBy
=
partition_by
)
# print(f"test_df:{len(test_df)}")
sql
=
f
"""
select
st_key,
...
...
@@ -465,49 +479,24 @@ class DwtStThemeAgg(object):
and date_info = '{self.date_info}'
"""
self
.
df_pattern_st_words
=
self
.
spark
.
sql
(
sql
)
.
cache
()
self
.
df_pattern_st_words
.
show
(
20
,
truncate
=
False
)
# print(f"combined_df:{combined_df.count()}")
self
.
df_pattern_st_words
=
self
.
df_pattern_st_words
.
cache
()
# self.df_pattern_st_words.show(20, truncate=False)
# print("匹配后的表数据有:", self.df_pattern_st_words.count())
# 计算二级词下的总销量和匹配到的aba词个数
self
.
df_pattern_st_agg
=
self
.
df_pattern_st_words
.
groupBy
([
'pattern_st'
])
.
agg
(
F
.
sum
(
"bsr_orders"
)
.
alias
(
"pattern_bsr_orders_total"
),
F
.
count
(
"search_term"
)
.
alias
(
"pattern_st_count"
)
)
.
cache
()
self
.
df_pattern_st_words
.
show
(
10
,
truncate
=
True
)
def
handle_st_pattern_common_agg
(
self
):
# # 临时使用添加
# sql = f"""
# select
# st_key,
# search_term,
# bsr_orders,
# pattern_st
# from big_data_selection.tmp_pattern_st_info
# where site_name = '{self.site_name}'
# and date_type = '{self.date_type}'
# and date_info = '{self.date_info}'
# """
# self.df_pattern_st_words = self.spark.sql(sql).cache()
# self.df_pattern_st_words.show(20, truncate=False)
# # print(f"combined_df:{combined_df.count()}")
# self.df_pattern_st_words = self.df_pattern_st_words.cache()
# self.df_pattern_st_words.show(20, truncate=False)
# print("匹配后的表数据有:", self.df_pattern_st_words.count())
# 计算二级词下的总销量和匹配到的aba词个数
# 计算模板词的总销量和匹配到的ABA词个数
self
.
df_pattern_st_agg
=
self
.
df_pattern_st_words
.
groupBy
([
'pattern_st'
])
.
agg
(
F
.
sum
(
"bsr_orders"
)
.
alias
(
"pattern_bsr_orders_total"
),
F
.
count
(
"search_term"
)
.
alias
(
"pattern_st_count"
)
)
.
join
(
self
.
df_base_filter_date
,
on
=
self
.
df_pattern_st_words
.
pattern_st
==
self
.
df_base_filter_date
.
search_term
)
.
cache
()
# 将二级词匹配明细和主题功能词标签明细进行匹配;pattern_type=0的情况
# 将模板词与搜索词主题标签关联
# pattern_type=0的情况
df_common_st_theme
=
self
.
df_st_theme_base
.
filter
(
"pattern_type = 0"
)
self
.
df_st_theme_agg
=
self
.
df_pattern_st_words
.
join
(
df_common_st_theme
,
on
=
[
'st_key'
,
'search_term'
],
how
=
'left'
)
)
.
filter
(
"theme_en is not null"
)
# 那些搜索词匹配不到功能词需过滤掉
self
.
df_st_theme_agg
=
self
.
df_st_theme_agg
.
filter
(
"theme_en is not null"
)
# 进行分组累加(按照匹配词中文进行累加,业务要求中文含义为准计数)
self
.
df_st_theme_agg
=
self
.
df_st_theme_agg
.
groupBy
([
'pattern_st'
,
'theme_label_ch'
,
'theme_en'
,
'theme_ch'
])
.
agg
(
F
.
count
(
"st_key"
)
.
alias
(
"theme_label_counts"
),
...
...
@@ -515,27 +504,27 @@ class DwtStThemeAgg(object):
F
.
collect_set
(
"theme_label_en"
)
.
alias
(
"theme_label_en_list"
)
)
# 转换成字符串拼接
self
.
df_st_theme_agg
=
self
.
df_st_theme_agg
.
withColumn
(
'label_en_str'
,
F
.
concat_ws
(
"/"
,
F
.
col
(
'theme_label_en_list'
)
))
#
给pattern_st拼接pattern_st总的bsr_orders和st_count
self
.
df_st_theme_agg
=
self
.
df_st_theme_agg
.
withColumn
(
'label_en_str'
,
F
.
concat_ws
(
"/"
,
F
.
col
(
'theme_label_en_list'
))
)
#
关联模板词的聚合数据
self
.
df_st_theme_agg
=
self
.
df_st_theme_agg
.
join
(
self
.
df_pattern_st_agg
,
on
=
[
'pattern_st'
],
how
=
'left'
)
# 计算占比
self
.
df_st_theme_agg
=
self
.
df_st_theme_agg
.
withColumn
(
'pattern_bsr_orders_rate'
,
F
.
when
(
F
.
col
(
'pattern_bsr_orders_total'
)
>
0
,
F
.
round
(
F
.
col
(
'theme_label_bsr_orders'
)
/
F
.
col
(
'pattern_bsr_orders_total'
),
4
))
.
otherwise
(
F
.
lit
(
0.0
)))
self
.
df_st_theme_agg
=
self
.
df_st_theme_agg
.
withColumn
(
'pattern_num_rate'
,
F
.
when
(
F
.
col
(
'pattern_st_count'
)
>
0
,
F
.
round
(
F
.
col
(
'theme_label_counts'
)
/
F
.
col
(
'pattern_st_count'
),
4
))
.
otherwise
(
F
.
lit
(
0.0
)))
self
.
df_st_theme_agg
.
show
(
10
,
truncate
=
False
)
self
.
df_st_theme_agg
=
self
.
df_st_theme_agg
.
withColumn
(
'pattern_bsr_orders_rate'
,
F
.
when
(
F
.
col
(
'pattern_bsr_orders_total'
)
>
0
,
F
.
round
(
F
.
col
(
'theme_label_bsr_orders'
)
/
F
.
col
(
'pattern_bsr_orders_total'
),
4
)
)
.
otherwise
(
F
.
lit
(
0.0
))
)
.
withColumn
(
'pattern_num_rate'
,
F
.
when
(
F
.
col
(
'pattern_st_count'
)
>
0
,
F
.
round
(
F
.
col
(
'theme_label_counts'
)
/
F
.
col
(
'pattern_st_count'
),
4
)
)
.
otherwise
(
F
.
lit
(
0.0
))
)
self
.
df_st_theme_agg
=
self
.
df_st_theme_agg
.
select
(
F
.
col
(
'pattern_st'
),
...
...
@@ -548,68 +537,80 @@ class DwtStThemeAgg(object):
F
.
col
(
'theme_label_bsr_orders'
),
F
.
col
(
'theme_label_counts'
),
F
.
col
(
'pattern_bsr_orders_rate'
),
F
.
col
(
'pattern_num_rate'
)
)
pass
F
.
col
(
'pattern_num_rate'
),
F
.
col
(
'st_word_num'
),
F
.
col
(
'st_bsr_cate_1_id_new'
),
F
.
col
(
'st_bsr_cate_current_id_new'
),
F
.
col
(
'rank'
),
F
.
col
(
'rank_change_rate'
),
F
.
col
(
'rank_rate_of_change'
)
)
.
cache
()
def
handle_st_pattern_special_agg
(
self
):
# 将二级词匹配明细和主题功能词标签明细进行匹配;pattern_type=1的情况
# 将模板词与搜索词主题标签关联
# pattern_type=1的情况
df_special_st_theme
=
self
.
df_st_theme_base
.
filter
(
"pattern_type = 1"
)
self
.
df_st_match_topic_detail
=
self
.
df_pattern_st_words
.
join
(
df_special_st_theme
,
on
=
[
'st_key'
,
'search_term'
],
how
=
'left'
)
self
.
df_st_match_topic_detail
=
self
.
df_st_match_topic_detail
.
filter
(
"theme_label_en is not null"
)
)
.
filter
(
"theme_label_en is not null"
)
df_st_match_agg
=
self
.
df_st_match_topic_detail
.
groupby
(
[
'pattern_st'
,
'theme_ch'
,
'theme_en'
,
'theme_label_ch'
,
'theme_label_num_info'
,
'theme_label_unit_info'
])
.
agg
(
[
'pattern_st'
,
'theme_ch'
,
'theme_en'
,
'theme_label_ch'
,
'theme_label_num_info'
,
'theme_label_unit_info'
]
)
.
agg
(
F
.
count
(
'bsr_orders'
)
.
alias
(
"same_info_count"
),
F
.
sum
(
'bsr_orders'
)
.
alias
(
"same_info_bsr_orders"
)
)
)
.
cache
()
df_st_match_no_num_agg
=
df_st_match_agg
.
filter
(
"theme_label_num_info is null"
)
df_st_match_no_num_info
=
df_st_match_no_num_agg
.
groupby
(
[
'pattern_st'
,
'theme_ch'
,
'theme_en'
,
'theme_label_ch'
,
'theme_label_unit_info'
])
.
agg
(
[
'pattern_st'
,
'theme_ch'
,
'theme_en'
,
'theme_label_ch'
,
'theme_label_unit_info'
]
)
.
agg
(
F
.
sum
(
'same_info_count'
)
.
alias
(
"st_label_num"
),
F
.
sum
(
'same_info_bsr_orders'
)
.
alias
(
"st_label_bsr_orders"
),
F
.
col
(
'theme_label_unit_info'
)
.
alias
(
"label_info"
)
)
df_st_match_no_num_info
=
df_st_match_no_num_info
.
drop
(
"theme_label_unit_info"
)
)
.
drop
(
"theme_label_unit_info"
)
df_st_match_no_unit_agg
=
df_st_match_agg
.
filter
(
"theme_label_unit_info in ('x', 'by')"
)
df_st_match_no_unit_info
=
df_st_match_no_unit_agg
.
groupby
(
[
'pattern_st'
,
'theme_ch'
,
'theme_en'
,
'theme_label_ch'
,
'theme_label_num_info'
])
.
agg
(
[
'pattern_st'
,
'theme_ch'
,
'theme_en'
,
'theme_label_ch'
,
'theme_label_num_info'
]
)
.
agg
(
F
.
sum
(
'same_info_count'
)
.
alias
(
"st_label_num"
),
F
.
sum
(
'same_info_bsr_orders'
)
.
alias
(
"st_label_bsr_orders"
),
F
.
col
(
"theme_label_num_info"
)
.
alias
(
"label_info"
)
)
df_st_match_no_unit_info
=
df_st_match_no_unit_info
.
drop
(
"theme_label_num_info"
)
)
.
drop
(
"theme_label_num_info"
)
df_st_match_complete_agg
=
df_st_match_agg
.
filter
(
(
F
.
col
(
"theme_label_num_info"
)
.
isNotNull
())
&
(
F
.
col
(
"theme_label_unit_info"
)
.
isNotNull
())
&
(
F
.
col
(
"theme_label_unit_info"
)
!=
'x'
)
&
(
F
.
col
(
"theme_label_unit_info"
)
!=
'by'
))
df_st_match_complete_agg
=
df_st_match_complete_agg
.
withColumn
(
"complete_info"
,
F
.
concat_ws
(
' '
,
F
.
col
(
"theme_label_num_info"
),
F
.
col
(
"theme_label_unit_info"
)))
(
F
.
col
(
"theme_label_num_info"
)
.
isNotNull
())
&
(
F
.
col
(
"theme_label_unit_info"
)
.
isNotNull
())
&
(
F
.
col
(
"theme_label_unit_info"
)
!=
'x'
)
&
(
F
.
col
(
"theme_label_unit_info"
)
!=
'by'
)
)
df_st_match_complete_agg
=
df_st_match_complete_agg
.
withColumn
(
"complete_info"
,
F
.
concat_ws
(
' '
,
F
.
col
(
"theme_label_num_info"
),
F
.
col
(
"theme_label_unit_info"
))
)
df_st_match_complete_info
=
df_st_match_complete_agg
.
groupby
(
[
'pattern_st'
,
'theme_ch'
,
'theme_en'
,
'theme_label_ch'
,
'theme_label_num_info'
])
.
agg
(
[
'pattern_st'
,
'theme_ch'
,
'theme_en'
,
'theme_label_ch'
,
'theme_label_num_info'
]
)
.
agg
(
F
.
sum
(
'same_info_count'
)
.
alias
(
"st_label_num"
),
F
.
sum
(
'same_info_bsr_orders'
)
.
alias
(
"st_label_bsr_orders"
),
F
.
concat_ws
(
"/"
,
F
.
collect_set
(
F
.
col
(
"complete_info"
)))
.
alias
(
"label_info"
)
)
df_st_match_complete_info
=
df_st_match_complete_info
.
drop
(
"theme_label_num_info"
)
self
.
df_st_match_topic_agg
=
df_st_match_no_num_info
.
unionByName
(
df_st_match_no_unit_info
)
.
unionByName
(
df_st_match_complete_info
)
self
.
df_st_match_topic_agg
=
self
.
df_st_match_topic_agg
.
join
(
)
.
drop
(
"theme_label_num_info"
)
self
.
df_st_match_topic_agg
=
df_st_match_no_num_info
.
unionByName
(
df_st_match_no_unit_info
)
.
unionByName
(
df_st_match_complete_info
)
.
join
(
self
.
df_pattern_st_agg
,
on
=
[
'pattern_st'
],
how
=
'left'
)
self
.
df_st_match_topic_agg
=
self
.
df_st_match_topic_agg
.
withColumn
(
"pattern_bsr_orders_rate"
,
F
.
when
(
F
.
col
(
"pattern_bsr_orders_total"
)
>
0
,
F
.
round
((
F
.
col
(
"st_label_bsr_orders"
)
/
F
.
col
(
"pattern_bsr_orders_total"
)),
4
))
.
otherwise
(
F
.
lit
(
0.0
)))
self
.
df_st_match_topic_agg
=
self
.
df_st_match_topic_agg
.
withColumn
(
"pattern_num_rate"
,
F
.
when
(
F
.
col
(
"pattern_st_count"
)
>
0
,
F
.
round
(
(
F
.
col
(
"st_label_num"
)
/
F
.
col
(
"pattern_st_count"
)),
4
))
.
otherwise
(
F
.
lit
(
0.0
)))
self
.
df_st_match_topic_agg
=
self
.
df_st_match_topic_agg
.
withColumn
(
"pattern_bsr_orders_rate"
,
F
.
when
(
F
.
col
(
"pattern_bsr_orders_total"
)
>
0
,
F
.
round
((
F
.
col
(
"st_label_bsr_orders"
)
/
F
.
col
(
"pattern_bsr_orders_total"
)),
4
)
)
.
otherwise
(
F
.
lit
(
0.0
))
)
.
withColumn
(
"pattern_num_rate"
,
F
.
when
(
F
.
col
(
"pattern_st_count"
)
>
0
,
F
.
round
((
F
.
col
(
"st_label_num"
)
/
F
.
col
(
"pattern_st_count"
)),
4
)
)
.
otherwise
(
F
.
lit
(
0.0
))
)
self
.
df_st_match_topic_agg
=
self
.
df_st_match_topic_agg
.
select
(
F
.
col
(
'pattern_st'
),
F
.
col
(
'pattern_bsr_orders_total'
),
...
...
@@ -621,15 +622,21 @@ class DwtStThemeAgg(object):
F
.
col
(
'st_label_bsr_orders'
)
.
alias
(
'theme_label_bsr_orders'
),
F
.
col
(
'st_label_num'
)
.
alias
(
'theme_label_counts'
),
F
.
col
(
'pattern_bsr_orders_rate'
),
F
.
col
(
'pattern_num_rate'
)
)
F
.
col
(
'pattern_num_rate'
),
F
.
col
(
'st_word_num'
),
F
.
col
(
'st_bsr_cate_1_id_new'
),
F
.
col
(
'st_bsr_cate_current_id_new'
),
F
.
col
(
'rank'
),
F
.
col
(
'rank_change_rate'
),
F
.
col
(
'rank_rate_of_change'
)
)
.
cache
()
def
save_data
(
self
):
hdfs_path_asin_info
=
CommonUtil
.
build_hdfs_path
(
self
.
hive_tb
,
partition_dict
=
self
.
partition_dict
)
print
(
f
"清除hdfs目录中:{hdfs_path_asin_info}"
)
HdfsUtils
.
delete_file_in_folder
(
hdfs_path_asin_info
)
self
.
df_st_theme_agg
=
self
.
df_st_theme_agg
.
unionByName
(
self
.
df_st_match_topic_agg
)
#
添加逻辑;如果二级词/三级词自
身也有匹配词,则相应的统计需要过滤掉
#
如果模板词本
身也有匹配词,则相应的统计需要过滤掉
df_agg_filter
=
self
.
df_st_theme_base
.
select
(
F
.
col
(
'search_term'
),
F
.
col
(
'theme_label_en'
)
.
alias
(
'theme_label_en_join'
),
...
...
@@ -637,10 +644,7 @@ class DwtStThemeAgg(object):
)
self
.
df_st_theme_agg
=
self
.
df_st_theme_agg
.
join
(
df_agg_filter
,
on
=
(
self
.
df_st_theme_agg
.
pattern_st
==
df_agg_filter
.
search_term
)
&
(
self
.
df_st_theme_agg
.
theme_label_en
==
df_agg_filter
.
theme_label_en_join
),
how
=
'left'
)
# join_flag 如果为1则说明结果集匹配到了二级词/三级词自身的相关标签,因此需要过滤
self
.
df_st_theme_agg
=
self
.
df_st_theme_agg
.
filter
(
F
.
col
(
'join_flag'
)
.
isNull
())
)
.
filter
(
F
.
col
(
'join_flag'
)
.
isNull
())
self
.
df_st_theme_agg
=
self
.
df_st_theme_agg
.
select
(
F
.
col
(
'pattern_st'
),
...
...
@@ -656,6 +660,12 @@ class DwtStThemeAgg(object):
F
.
col
(
'pattern_num_rate'
),
F
.
date_format
(
F
.
current_timestamp
(),
'yyyy-MM-dd HH:mm:SS'
)
.
alias
(
'created_time'
),
F
.
date_format
(
F
.
current_timestamp
(),
'yyyy-MM-dd HH:mm:SS'
)
.
alias
(
'updated_time'
),
F
.
col
(
'st_word_num'
)
.
alias
(
'pattern_word_num'
),
F
.
col
(
'st_bsr_cate_1_id_new'
)
.
alias
(
'pattern_category_id'
),
F
.
col
(
'st_bsr_cate_current_id_new'
)
.
alias
(
'pattern_category_current_id'
),
F
.
col
(
'rank'
)
.
alias
(
'pattern_rank'
),
F
.
col
(
'rank_change_rate'
)
.
alias
(
'pattern_rank_change_rate'
),
F
.
col
(
'rank_rate_of_change'
)
.
alias
(
'pattern_rank_rate_of_change'
),
F
.
lit
(
self
.
site_name
)
.
alias
(
'site_name'
),
F
.
lit
(
self
.
date_type
)
.
alias
(
'date_type'
),
F
.
lit
(
self
.
date_info
)
.
alias
(
'date_info'
)
...
...
@@ -664,8 +674,7 @@ class DwtStThemeAgg(object):
self
.
df_st_theme_agg
=
self
.
df_st_theme_agg
.
repartition
(
20
)
partition_by
=
[
"site_name"
,
"date_type"
,
"date_info"
]
print
(
f
"当前存储的表名为:{self.hive_tb},分区为{partition_by}"
,
)
self
.
df_st_theme_agg
.
write
.
saveAsTable
(
name
=
self
.
hive_tb
,
format
=
'hive'
,
mode
=
'append'
,
partitionBy
=
partition_by
)
self
.
df_st_theme_agg
.
write
.
saveAsTable
(
name
=
self
.
hive_tb
,
format
=
'hive'
,
mode
=
'append'
,
partitionBy
=
partition_by
)
print
(
"success"
)
...
...
Pyspark_job/sqoop_export/dwt_st_theme_agg.py
View file @
fa8a7fbb
import
os
import
sys
import
random
import
string
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
from
utils.db_util
import
DBUtil
from
utils.ssh_util
import
SSHUtil
from
utils.common_util
import
CommonUtil
,
DateTypes
from
utils.common_util
import
CommonUtil
if
__name__
==
'__main__'
:
site_name
=
CommonUtil
.
get_sys_arg
(
1
,
None
)
date_type
=
CommonUtil
.
get_sys_arg
(
2
,
None
)
date_info
=
CommonUtil
.
get_sys_arg
(
3
,
None
)
# 获取最后一个参数
test_flag
=
CommonUtil
.
get_sys_arg
(
len
(
sys
.
argv
)
-
1
,
None
)
print
(
f
"执行参数为{sys.argv}"
)
...
...
@@ -22,8 +20,6 @@ if __name__ == '__main__':
db_type
=
'postgresql_test'
print
(
"导出到测试库中"
)
else
:
# db_type = "postgresql"
# print("导出到PG库中")
db_type
=
"postgresql_cluster"
print
(
"导出到PG集群库库中"
)
...
...
@@ -40,8 +36,7 @@ if __name__ == '__main__':
# 保证幂等性,先删除原始表同周期的数据
sql
=
f
"""
drop table if exists {export_tb};
create table if not exists {export_tb}
(
create table if not exists {export_tb} (
like {export_master_tb} including ALL
);
"""
...
...
@@ -67,7 +62,13 @@ if __name__ == '__main__':
"pattern_num_rate"
,
"date_info"
,
"created_time"
,
"updated_time"
"updated_time"
,
"pattern_word_num"
,
"pattern_category_id"
,
"pattern_category_current_id"
,
"pattern_rank"
,
"pattern_rank_change_rate"
,
"pattern_rank_rate_of_change"
],
partition_dict
=
{
"site_name"
:
site_name
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment