Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
b6374899
Commit
b6374899
authored
Jun 16, 2026
by
hejiangming
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
35d1e0b5
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
53 additions
and
12 deletions
+53
-12
dwt_aba_last365.py
Pyspark_job/dwt/dwt_aba_last365.py
+31
-2
dwt_aba_st_analytics.py
Pyspark_job/dwt/dwt_aba_st_analytics.py
+0
-0
dwt_aba_last365.py
Pyspark_job/sqoop_export/dwt_aba_last365.py
+10
-1
dwt_aba_st_analytics.py
Pyspark_job/sqoop_export/dwt_aba_st_analytics.py
+12
-9
No files found.
Pyspark_job/dwt/dwt_aba_last365.py
View file @
b6374899
...
@@ -331,9 +331,32 @@ class DwtAbaLast365(object):
...
@@ -331,9 +331,32 @@ class DwtAbaLast365(object):
),
),
''
''
)
)
"""
)
.
alias
(
'st_attribute_label'
)
"""
)
.
alias
(
'st_attribute_label'
),
# 峰值月中间值:最小 rank(排名最好)+ (rank,月份) 明细数组(下面算 peak_month 用)
F
.
min
(
"rank"
)
.
alias
(
"min_rank"
),
F
.
collect_list
(
F
.
struct
(
"rank"
,
"date_info"
))
.
alias
(
"rank_arr"
),
# 常年可卖:近 12 个月都出现在 ABA 月度搜索词中(去重月份数=12)→ 1,否则 0
F
.
when
(
F
.
size
(
F
.
collect_set
(
"date_info"
))
>=
12
,
F
.
lit
(
1
))
.
otherwise
(
F
.
lit
(
0
))
.
alias
(
"all_year_text_flag"
)
)
)
# 峰值月 peak_month:从 rank_arr 里 filter 出 rank=min_rank 的项 → 取 date_info → 排序 → 拼成 'YYYY-MM' 逗号串
# 并列峰值全保留;与月表 dwt_aba_st_analytics 同口径同格式
# nullif 兜底:极端 rank 全 null 时 filter 为空 → 空串 → 转 null → 由 handle_save 的 na.fill 转空串 → PG string_to_array 转空数组 {}
df_agg
=
df_agg
.
withColumn
(
"peak_month"
,
F
.
expr
(
"""
nullif(
concat_ws(',',
array_sort(transform(
filter(rank_arr, x -> x.rank = min_rank),
x -> x.date_info
))
),
''
)
"""
)
)
.
drop
(
"min_rank"
,
"rank_arr"
)
# 行转列的字段
# 行转列的字段
agg_col_arr
=
[
'st_num'
,
'bsr_orders'
,
'orders'
,
'market_cycle_type'
,
'search_volume'
]
agg_col_arr
=
[
'st_num'
,
'bsr_orders'
,
'orders'
,
'market_cycle_type'
,
'search_volume'
]
self
.
df_base
=
self
.
pivot_df
(
self
.
df_base
=
self
.
pivot_df
(
...
@@ -614,6 +637,10 @@ class DwtAbaLast365(object):
...
@@ -614,6 +637,10 @@ class DwtAbaLast365(object):
F
.
col
(
"appear_month_lastest"
),
F
.
col
(
"appear_month_lastest"
),
#搜索词属性标签(12 个月合并去重,逗号分隔字符串,sqoop 导出 PG 后转 VARCHAR[])
#搜索词属性标签(12 个月合并去重,逗号分隔字符串,sqoop 导出 PG 后转 VARCHAR[])
F
.
col
(
"st_attribute_label"
),
F
.
col
(
"st_attribute_label"
),
# 峰值月:近12个月 rank 最小的月份,并列保留,'YYYY-MM' 逗号分隔(与月表同口径)
F
.
col
(
"peak_month"
),
# 常年可卖:近12个月每月都出现在 ABA 月度搜索词中=1,否则=0
F
.
col
(
"all_year_text_flag"
),
F
.
lit
(
self
.
site_name
)
.
alias
(
"site_name"
),
F
.
lit
(
self
.
site_name
)
.
alias
(
"site_name"
),
F
.
lit
(
self
.
date_type
)
.
alias
(
"date_type"
),
F
.
lit
(
self
.
date_type
)
.
alias
(
"date_type"
),
...
@@ -655,7 +682,9 @@ class DwtAbaLast365(object):
...
@@ -655,7 +682,9 @@ class DwtAbaLast365(object):
"top_rank"
:
0
,
"top_rank"
:
0
,
# handle_agg 里 12 月全 -1 时输出空串 → nullif 转 null → 这里 fillna '-1'
# handle_agg 里 12 月全 -1 时输出空串 → nullif 转 null → 这里 fillna '-1'
# 占位 "-1" 与 dwt_aba_st_analytics 端规则一致,Java 转 null 返前端
# 占位 "-1" 与 dwt_aba_st_analytics 端规则一致,Java 转 null 返前端
"st_attribute_label"
:
"-1"
"st_attribute_label"
:
"-1"
,
# 峰值月极端兜底(rank 全 null)→ nullif 转 null → 这里 fillna 空串,PG string_to_array 转空数组 {}(与月表同规则)
"peak_month"
:
""
})
.
cache
()
})
.
cache
()
def
save_data
(
self
):
def
save_data
(
self
):
...
...
Pyspark_job/dwt/dwt_aba_st_analytics.py
View file @
b6374899
This diff is collapsed.
Click to expand it.
Pyspark_job/sqoop_export/dwt_aba_last365.py
View file @
b6374899
...
@@ -46,6 +46,7 @@ if __name__ == '__main__':
...
@@ -46,6 +46,7 @@ if __name__ == '__main__':
-- st_attribute_label 在正式表是 VARCHAR[],Sqoop 无法直接写入数组类型
-- st_attribute_label 在正式表是 VARCHAR[],Sqoop 无法直接写入数组类型
-- 先临时改成 VARCHAR(200) 让 Sqoop 写字符串,交换完成后再 ALTER 回 VARCHAR[]
-- 先临时改成 VARCHAR(200) 让 Sqoop 写字符串,交换完成后再 ALTER 回 VARCHAR[]
ALTER TABLE {export_tb_rel} ALTER COLUMN st_attribute_label TYPE VARCHAR(200);
ALTER TABLE {export_tb_rel} ALTER COLUMN st_attribute_label TYPE VARCHAR(200);
ALTER TABLE {export_tb_rel} ALTER COLUMN peak_month TYPE VARCHAR(200);
"""
"""
print
(
"================================执行sql================================"
)
print
(
"================================执行sql================================"
)
print
(
sql
)
print
(
sql
)
...
@@ -192,7 +193,11 @@ if __name__ == '__main__':
...
@@ -192,7 +193,11 @@ if __name__ == '__main__':
"rank_change_rate_lastest"
,
"rank_change_rate_lastest"
,
"rank_rate_of_change_lastest"
,
"rank_rate_of_change_lastest"
,
# 搜索词属性标签(12 个月合并去重,逗号分隔字符串,sqoop 导出 PG 后转 VARCHAR[])
# 搜索词属性标签(12 个月合并去重,逗号分隔字符串,sqoop 导出 PG 后转 VARCHAR[])
"st_attribute_label"
"st_attribute_label"
,
# 峰值月(近12月rank最小的月份,逗号分隔字符串,sqoop 导出 PG 后转 VARCHAR[])
"peak_month"
,
# 常年可卖标记(标量 int,sqoop 直写)
"all_year_text_flag"
],
],
partition_dict
=
{
partition_dict
=
{
"site_name"
:
site_name
,
"site_name"
:
site_name
,
...
@@ -227,6 +232,10 @@ if __name__ == '__main__':
...
@@ -227,6 +232,10 @@ if __name__ == '__main__':
ALTER TABLE {export_tb_before}
ALTER TABLE {export_tb_before}
ALTER COLUMN st_attribute_label TYPE VARCHAR[]
ALTER COLUMN st_attribute_label TYPE VARCHAR[]
USING string_to_array(st_attribute_label, ',')::varchar[];
USING string_to_array(st_attribute_label, ',')::varchar[];
ALTER TABLE {export_tb_before}
ALTER COLUMN peak_month TYPE VARCHAR[]
USING string_to_array(coalesce(peak_month, ''), ',')::varchar[];
alter table {export_tb_before} drop if exists keyword_tsv;
alter table {export_tb_before} drop if exists keyword_tsv;
alter table {export_tb_before} add column keyword_tsv tsvector generated always as (to_tsvector('english_amazonword', search_term)) STORED;
alter table {export_tb_before} add column keyword_tsv tsvector generated always as (to_tsvector('english_amazonword', search_term)) STORED;
...
...
Pyspark_job/sqoop_export/dwt_aba_st_analytics.py
View file @
b6374899
...
@@ -186,16 +186,18 @@ if __name__ == '__main__':
...
@@ -186,16 +186,18 @@ if __name__ == '__main__':
tb_cols
=
[
tb_cols
=
[
"is_new_market_segment"
,
"color_proportion"
,
"supply_demand"
,
"market_cycle_type"
,
"is_high_return_text"
,
"is_new_market_segment"
,
"color_proportion"
,
"supply_demand"
,
"market_cycle_type"
,
"is_high_return_text"
,
"st_zr_counts"
,
"st_sp_counts"
,
"st_self_asin_counts"
,
"st_self_asin_proportion"
,
"st_zr_counts"
,
"st_sp_counts"
,
"st_self_asin_counts"
,
"st_self_asin_proportion"
,
# 需求2 + 需求3:月度专属字段(仅 month 流程才有意义)
# is_first_ever_text 依赖累加表 dim_st_detail_history(仅 month 数据)
# brand_asin_proportion / seller_asin_proportion 服务月搜索词筛选页面
"is_first_ever_text"
,
"is_first_ever_text"
,
"brand_asin_proportion"
,
"brand_asin_proportion"
,
"seller_asin_proportion"
,
"seller_asin_proportion"
,
# 搜索词属性标签(材质/颜色/细分人群等),仅 month 计算
# 搜索词属性标签(材质/颜色/细分人群等),仅 month 计算
# Hive 端是逗号分隔 STRING(如 "材质,颜色"),sqoop 写入 PG copy 表需先 ALTER 成 VARCHAR
# Hive 端是逗号分隔 STRING(如 "材质,颜色"),sqoop 写入 PG copy 表需先 ALTER 成 VARCHAR
# 交换前再 ALTER 回 VARCHAR[](用 string_to_array 转换),与 dwt_aba_last365 处理 st_movie_brand_label 同款思路
# 交换前再 ALTER 回 VARCHAR[](用 string_to_array 转换),与 dwt_aba_last365 处理 st_movie_brand_label 同款思路
"st_attribute_label"
"st_attribute_label"
,
# 峰值月:Hive 端是逗号分隔 STRING(如 "2025-11,2026-04"),PG 端是 VARCHAR[]
# 同 st_attribute_label:copy 表先 ALTER 成 VARCHAR 让 sqoop 写字符串,交换前再 string_to_array 转回 VARCHAR[]
"peak_month"
,
# 常年可卖标记:标量 int,sqoop 直写,无需中转
"all_year_text_flag"
]
]
# 处理导出表
# 处理导出表
export_master_tb
=
f
"{export_base_tb}_{date_type}_{year_str}"
export_master_tb
=
f
"{export_base_tb}_{date_type}_{year_str}"
...
@@ -237,9 +239,9 @@ if __name__ == '__main__':
...
@@ -237,9 +239,9 @@ if __name__ == '__main__':
# copy 表继承自正式分区表(含 st_attribute_label VARCHAR[]),
# copy 表继承自正式分区表(含 st_attribute_label VARCHAR[]),
# 但 Sqoop 不支持直接写入 PG 数组类型,必须先把 copy 表的该列临时改成 VARCHAR
# 但 Sqoop 不支持直接写入 PG 数组类型,必须先把 copy 表的该列临时改成 VARCHAR
# 等 Sqoop 完成后、分区交换之前,再 ALTER 回 VARCHAR[](见下方 exchange_pg_part_tb 前的处理)
# 等 Sqoop 完成后、分区交换之前,再 ALTER 回 VARCHAR[](见下方 exchange_pg_part_tb 前的处理)
# 这是与 sqoop_export/dwt_aba_last365.py 中 st_movie_brand_label 同款的"VARCHAR 中转"模式
sql_alter_to_varchar
=
f
"""
sql_alter_to_varchar
=
f
"""
ALTER TABLE {export_tb_copy} ALTER COLUMN st_attribute_label TYPE VARCHAR(200);
ALTER TABLE {export_tb_copy} ALTER COLUMN st_attribute_label TYPE VARCHAR(200);
ALTER TABLE {export_tb_copy} ALTER COLUMN peak_month TYPE VARCHAR(200);
"""
"""
DBUtil
.
engine_exec_sql
(
engine
,
sql_alter_to_varchar
)
DBUtil
.
engine_exec_sql
(
engine
,
sql_alter_to_varchar
)
...
@@ -250,8 +252,6 @@ if __name__ == '__main__':
...
@@ -250,8 +252,6 @@ if __name__ == '__main__':
# "column keyword_tsv in child table must be a generated column"
# "column keyword_tsv in child table must be a generated column"
# 解决:显式 drop 后以生成列方式重建。CASCADE 是为了同时把 LIKE 时附带继承的 keyword_tsv 索引一并清掉,
# 解决:显式 drop 后以生成列方式重建。CASCADE 是为了同时把 LIKE 时附带继承的 keyword_tsv 索引一并清掉,
# 后面 ATTACH 时 master 的分区索引会自动给新分区补建对应索引。
# 后面 ATTACH 时 master 的分区索引会自动给新分区补建对应索引。
# 重建在 Sqoop 之前做:生成列由 search_term 自动计算(STORED),Sqoop 不写也不会报错。
# 参考 sqoop_export/dwt_aba_last365.py 中对 keyword_tsv 的同款处理。
sql_fix_keyword_tsv
=
f
"""
sql_fix_keyword_tsv
=
f
"""
ALTER TABLE {export_tb_copy} DROP COLUMN IF EXISTS keyword_tsv CASCADE;
ALTER TABLE {export_tb_copy} DROP COLUMN IF EXISTS keyword_tsv CASCADE;
ALTER TABLE {export_tb_copy} ADD COLUMN keyword_tsv tsvector
ALTER TABLE {export_tb_copy} ADD COLUMN keyword_tsv tsvector
...
@@ -311,12 +311,15 @@ if __name__ == '__main__':
...
@@ -311,12 +311,15 @@ if __name__ == '__main__':
elif
date_type
==
DateTypes
.
month
.
name
:
elif
date_type
==
DateTypes
.
month
.
name
:
# 分区交换前必须把 copy 表的 st_attribute_label 从 VARCHAR 转回 VARCHAR[]
# 分区交换前必须把 copy 表的 st_attribute_label 从 VARCHAR 转回 VARCHAR[]
# 否则与 master 分区表 schema 不一致,exchange_pg_part_tb 会失败
# 否则与 master 分区表 schema 不一致,exchange_pg_part_tb 会失败
# USING string_to_array(...) 把 Sqoop 写入的逗号串(如 "材质,颜色")拆成数组(如 {材质,颜色})
# USING string_to_array(...) 把 Sqoop 写入的逗号串(如 "材质,颜色")拆成数组(如 {材质,颜色}
# 词典无匹配的词,PySpark 已 fillna 为 "-1",转换后是 {-1},与 Java 占位约定一致
sql_alter_back
=
f
"""
sql_alter_back
=
f
"""
ALTER TABLE {export_tb_copy}
ALTER TABLE {export_tb_copy}
ALTER COLUMN st_attribute_label TYPE VARCHAR[]
ALTER COLUMN st_attribute_label TYPE VARCHAR[]
USING string_to_array(st_attribute_label, ',')::varchar[];
USING string_to_array(st_attribute_label, ',')::varchar[];
ALTER TABLE {export_tb_copy}
ALTER COLUMN peak_month TYPE VARCHAR[]
USING string_to_array(coalesce(peak_month, ''), ',')::varchar[];
"""
"""
DBUtil
.
engine_exec_sql
(
engine
,
sql_alter_back
)
DBUtil
.
engine_exec_sql
(
engine
,
sql_alter_back
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment