Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
71bbd954
Commit
71bbd954
authored
Jun 18, 2026
by
hejiangming
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
分类词频增加字段 调整边界
parent
1a27c704
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
33 additions
and
12 deletions
+33
-12
dws_aba_word_freq_cate.py
Pyspark_job/dws/dws_aba_word_freq_cate.py
+20
-8
dws_aba_word_freq_cate.py
Pyspark_job/sqoop_export/dws_aba_word_freq_cate.py
+13
-4
No files found.
Pyspark_job/dws/dws_aba_word_freq_cate.py
View file @
71bbd954
...
...
@@ -664,16 +664,24 @@ class DwsAbaWordFreqCate(Templates):
relate
=
dedup
.
groupBy
(
*
KEYS
)
.
agg
(
F
.
count
(
'*'
)
.
alias
(
'relate_st_num'
))
others
=
dedup
.
groupBy
(
*
KEYS
)
.
agg
(
F
.
min
(
'rank'
)
.
alias
(
'min_rank'
),
# avg_rank = 分类内平均排名:该分类·base 关联的【去重后】搜索词,其 ABA 排名的平均值。
# 为什么先 dedup 再 avg:rank 是 search_term 维度的 ABA 排名(每个搜索词一个值,与分类无关),
# 上面 dedup 已按 (分类,base,search_term) 去重,所以叠词('shoe shoe rack')不会把同一搜索词算两次。
# 与 min_rank(分类内最佳排名)同范围:都只看本分类、本 scope 白名单过滤后的搜索词。
# 例:base='towel' 在某分类关联 3 个去重搜索词 rank=[5, 50, 200] → avg_rank=85.0
# 口径与临时报表 spark_hjm/月词频计算/word_head.py 的 avg_rank 一致(那边是全站、不分类)。
F
.
round
(
F
.
avg
(
'rank'
),
2
)
.
alias
(
'avg_rank'
),
F
.
sum
(
F
.
when
(
F
.
col
(
'date_info_first'
)
>=
self
.
year_ago
,
1
)
.
otherwise
(
0
))
.
alias
(
'new_st_num'
),
F
.
expr
(
'min_by(search_term, rank)'
)
.
alias
(
'top_aba_example'
))
# 同比/环比上升占比:该分类·base 去重搜索词里 rank 变化率<0=上升 的占比
# (口径锁定 dwt_aba_last_change_rate:负数=排名上升,固定不变)
# 同比/环比上升占比:该分类·base 去重搜索词里 rank 变化率<=0=上升 的占比
# 口径对齐后端页面查询:rank 变化率 <= 0 算"上升"(负数=排名改善、0=持平也算上升),
# 含新进榜占位 -1000;与 dwt_aba_last_change_rate 一致。注意是 <= 0(含持平),不是 < 0。
ratio
=
dedup
.
select
(
*
KEYS
,
'search_term'
)
.
join
(
self
.
df_rate
,
'search_term'
,
'left'
)
\
.
groupBy
(
*
KEYS
)
.
agg
(
F
.
count
(
'*'
)
.
alias
(
'st_total'
),
F
.
sum
(
F
.
when
(
F
.
col
(
'rank_change_rate'
)
<
0
,
1
)
.
otherwise
(
0
))
.
alias
(
'yoy_up'
),
F
.
sum
(
F
.
when
(
F
.
col
(
'rank_rate_of_change'
)
<
0
,
1
)
.
otherwise
(
0
))
.
alias
(
'mom_up'
),
F
.
sum
(
F
.
when
(
F
.
col
(
'rank_change_rate'
)
<
=
0
,
1
)
.
otherwise
(
0
))
.
alias
(
'yoy_up'
),
F
.
sum
(
F
.
when
(
F
.
col
(
'rank_rate_of_change'
)
<
=
0
,
1
)
.
otherwise
(
0
))
.
alias
(
'mom_up'
),
)
.
withColumn
(
'yoy_up_ratio'
,
F
.
round
(
F
.
col
(
'yoy_up'
)
/
F
.
col
(
'st_total'
),
4
))
\
.
withColumn
(
'mom_up_ratio'
,
F
.
round
(
F
.
col
(
'mom_up'
)
/
F
.
col
(
'st_total'
),
4
))
\
.
select
(
*
KEYS
,
'yoy_up_ratio'
,
'mom_up_ratio'
)
...
...
@@ -686,7 +694,10 @@ class DwsAbaWordFreqCate(Templates):
full
=
freq
.
join
(
relate
,
KEYS
,
'left'
)
.
join
(
others
,
KEYS
,
'left'
)
\
.
join
(
ratio
,
KEYS
,
'left'
)
.
join
(
heat_sel
,
KEYS
,
'left'
)
.
join
(
trend
,
KEYS
,
'left'
)
# 当月=heat_m0;去年=m12(无→-1);上月=m1(无→-1);热度同比/环比(对比期无→新词+1000,正数=上升)
# 当月=heat_m0;去年=m12(无→-1);上月=m1(无→-1);热度同比/环比(正数=上升)
# 同比/环比分母兜底:对比期热度为 null(该口径 12 月前/上月无数据)→ 占位 1000,
# 归为"无可比基准"=新词。注:rank 永不为 0 且 max(rank) 远小于 2000 万,
# 故 heat_m12/heat_m1 不会被 round 成 0.00(非空 0 除零的情况实际出不来),只需判 null
cz
=
lambda
n
:
F
.
coalesce
(
F
.
col
(
n
),
F
.
lit
(
0.0
))
full
=
full
\
.
withColumn
(
'word_heat'
,
F
.
col
(
'heat_m0'
))
\
...
...
@@ -718,9 +729,9 @@ class DwsAbaWordFreqCate(Templates):
union
=
union
.
join
(
self
.
hist_base_df
,
'base_form'
,
'left'
)
\
.
withColumn
(
'new_word_flag'
,
F
.
when
(
F
.
col
(
'hist_seen'
)
.
isNull
(),
F
.
lit
(
1
))
.
otherwise
(
F
.
lit
(
0
)))
\
.
drop
(
'hist_seen'
)
# 属性维度:无匹配填
'-1'(导出转 {-1
})
# 属性维度:无匹配填
空串 ''(导出时 '' → 空数组 {
})
union
=
union
.
join
(
self
.
attr_df
,
'base_form'
,
'left'
)
\
.
withColumn
(
'attr_dim'
,
F
.
coalesce
(
F
.
col
(
'attr_dim_src'
),
F
.
lit
(
'
-1
'
)))
.
drop
(
'attr_dim_src'
)
.
withColumn
(
'attr_dim'
,
F
.
coalesce
(
F
.
col
(
'attr_dim_src'
),
F
.
lit
(
''
)))
.
drop
(
'attr_dim_src'
)
# ---- 最终列(对齐 Hive DDL;时间格式与兄弟表一致)----
now
=
F
.
date_format
(
F
.
current_timestamp
(),
'yyyy-MM-dd HH:mm:SS'
)
...
...
@@ -734,6 +745,7 @@ class DwsAbaWordFreqCate(Templates):
F
.
col
(
'word_heat'
),
F
.
col
(
'relate_st_num'
)
.
cast
(
'int'
)
.
alias
(
'relate_st_num'
),
F
.
col
(
'min_rank'
)
.
cast
(
'int'
)
.
alias
(
'min_rank'
),
F
.
col
(
'avg_rank'
),
# 分类内平均排名(保留 2 位小数,不取整)
F
.
col
(
'new_st_num'
)
.
cast
(
'int'
)
.
alias
(
'new_st_num'
),
F
.
col
(
'word_heat_last_year'
),
F
.
col
(
'word_heat_change_rate'
),
...
...
@@ -778,7 +790,7 @@ class DwsAbaWordFreqCate(Templates):
print
(
"---- 占位/空值体检(word_heat 不应为 null)----"
)
df
.
selectExpr
(
"sum(case when word_heat is null then 1 else 0 end) as word_heat_null"
,
"sum(case when attr_dim='
-1
' then 1 else 0 end) as attr_dim_empty"
,
"sum(case when attr_dim='' then 1 else 0 end) as attr_dim_empty"
,
"sum(case when word_heat_change_rate=1000 then 1 else 0 end) as yoy_newword"
,
"sum(brand_word_flag) as brand_cnt"
,
"sum(new_word_flag) as newword_cnt"
,
...
...
Pyspark_job/sqoop_export/dws_aba_word_freq_cate.py
View file @
71bbd954
...
...
@@ -51,15 +51,22 @@ if __name__ == '__main__':
# copy 表:us_aba_word_freq_cate_2026_04_copy
suffix
=
str
(
date_info
)
.
replace
(
"-"
,
"_"
)
year_str
=
CommonUtil
.
safeIndex
(
date_info
.
split
(
"-"
),
0
,
None
)
year_before
=
str
(
int
(
year_str
)
-
1
)
# 去年,跨年时复制其结构建当年 master
next_val
=
CommonUtil
.
get_next_val
(
date_type
,
date_info
)
export_base_tb
=
f
"{site_name}_aba_word_freq_cate"
export_master_tb
=
f
"{export_base_tb}_{year_str}"
export_master_tb
=
f
"{export_base_tb}_{year_str}"
# 当年主表,如 us_aba_word_freq_cate_2027
export_tb_before
=
f
"{export_base_tb}_{year_before}"
# 去年主表,如 ..._2026,复制结构用
export_table
=
f
"{export_base_tb}_{suffix}"
export_tb_copy
=
f
"{export_table}_copy"
# 3) 在 master 表上建当月子分区(首次跑当月才真正创建,重跑幂等)
# 3) 跨年自动建当年 master:用 LIKE 复制去年主表结构
# 再在当年 master 上建当月子分区(if not exists,重跑幂等)。
sql_create_partition
=
f
"""
create table if not exists {export_master_tb}
( like {export_tb_before} including all )
partition by range (date_info);
create table if not exists {export_table} partition of {export_master_tb}
for values from ('{date_info}') to ('{next_val}');
"""
...
...
@@ -96,6 +103,7 @@ if __name__ == '__main__':
"word_heat"
,
"relate_st_num"
,
"min_rank"
,
"avg_rank"
,
"new_st_num"
,
"word_heat_last_year"
,
"word_heat_change_rate"
,
...
...
@@ -139,11 +147,12 @@ if __name__ == '__main__':
# 7) Sqoop 写完后,ALTER copy 表的 attr_dim 回 VARCHAR[]
# USING string_to_array(...) 把逗号串 "材质,颜色" 拆成数组 {材质,颜色}
# 词典无匹配的词 PySpark 已填 "-1",转换后是 {-1},与 Java 占位约定一致
# 词典无匹配的词 PySpark 已填空串 '':nullif 把 '' 归 NULL → string_to_array 得 NULL → coalesce 兜成空数组 {}
# (兼顾 Sqoop 可能把空串写成 NULL 的情况,'' 和 NULL 最终都落成 {})
sql_alter_back
=
f
"""
ALTER TABLE {export_tb_copy}
ALTER COLUMN attr_dim TYPE VARCHAR[]
USING
string_to_array(attr_dim, ',
')::varchar[];
USING
coalesce(string_to_array(nullif(attr_dim, ''), ','), '{{}}
')::varchar[];
"""
DBUtil
.
engine_exec_sql
(
engine
,
sql_alter_back
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment