Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
9b73754f
Commit
9b73754f
authored
Apr 01, 2026
by
hejiangming
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
更新nsr bsr榜单通知人
parent
9b140e43
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
20 additions
and
17 deletions
+20
-17
dwd_bsr_nsr_asin_rank_day.py
Pyspark_job/sqoop_export/dwd_bsr_nsr_asin_rank_day.py
+5
-3
dwt_bsr_nsr_asin_detail_new.py
Pyspark_job/sqoop_export/dwt_bsr_nsr_asin_detail_new.py
+15
-14
No files found.
Pyspark_job/sqoop_export/dwd_bsr_nsr_asin_rank_day.py
View file @
9b73754f
...
...
@@ -17,7 +17,7 @@ if __name__ == '__main__':
site_name
=
site_name
,
date_type
=
"day"
,
date_info
=
date_info
,
principal
=
'
wujica
ng'
,
principal
=
'
hejiangmi
ng'
,
priority
=
1
,
export_tools_type
=
1
)
...
...
@@ -32,6 +32,7 @@ if __name__ == '__main__':
next_day
=
CommonUtil
.
get_day_offset
(
date_info
,
1
)
engine
=
DBUtil
.
get_db_engine
(
db_type
,
site_name
)
# 根据主表 复制结构 创建天的子表 day_flag 是一个自动计算列,由 is_1_day_flag + is_7_day_flag + is_30_day_flag
DBUtil
.
engine_exec_sql
(
engine
,
f
"""
create table {export_tb} ( like {export_master_tb} including defaults including constraints );
alter table {export_tb} drop if exists day_flag ;
...
...
@@ -39,7 +40,7 @@ if __name__ == '__main__':
"""
)
engine
.
dispose
()
# 导出表名
# 导出表名
Sqoop 导出 Hive → PG
sh
=
CommonUtil
.
build_export_sh
(
site_name
=
site_name
,
db_type
=
db_type
,
...
...
@@ -68,11 +69,12 @@ if __name__ == '__main__':
client
=
SSHUtil
.
get_ssh_client
()
SSHUtil
.
exec_command_async
(
client
,
sh
,
ignore_err
=
False
)
client
.
close
()
# 把今天的子表挂到主表上,查询主表时自动路由到对应日期的子表。
sql
=
f
"""alter table {export_master_tb} attach partition {export_tb} for values from ('{date_info}') to ('{next_day}' );"""
# 变成分布式表
DBUtil
.
exec_sql
(
db_type
=
db_type
,
site_name
=
site_name
,
sql
=
sql
,
dispose_flag
=
True
)
# 插入当前的记录数据到MySQL 记录导出状态
update_sql
=
f
"""
insert ignore into workflow_everyday (site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type,freeze_flag)
values ('{site_name}', '{date_info}', '导出PG数据库', 14, '{site_name}_{tb_type}_asin_rank', 'day', '{str(tb_type).upper()}榜单', '是', '{str(tb_type).upper()}榜单对应的TOP100ASIN','{db_type}','enable')
...
...
Pyspark_job/sqoop_export/dwt_bsr_nsr_asin_detail_new.py
View file @
9b73754f
...
...
@@ -21,30 +21,31 @@ def export_postgresql_cluster():
site_name
=
site_name
,
date_type
=
"day"
,
date_info
=
date_info
,
principal
=
'
wujica
ng'
,
principal
=
'
hejiangmi
ng'
,
priority
=
1
,
export_tools_type
=
1
)
db_type
=
DbTypes
.
postgresql_cluster
.
name
print
(
f
"导出到{db_type}中"
)
d_month_now
=
CommonUtil
.
reformat_date
(
date_info
,
"
%
Y-
%
m-
%
d"
,
"
%
Y_
%
m"
,
)
rel_date_info
=
CommonUtil
.
reformat_date
(
date_info
,
"
%
Y-
%
m-
%
d"
,
"
%
Y-
%
m"
,
)
next_month
=
CommonUtil
.
get_month_offset
(
rel_date_info
,
1
)
# date_info是日 日期格式 如"2026-03-31"
d_month_now
=
CommonUtil
.
reformat_date
(
date_info
,
"
%
Y-
%
m-
%
d"
,
"
%
Y_
%
m"
,
)
# 获取年月 2026_03
rel_date_info
=
CommonUtil
.
reformat_date
(
date_info
,
"
%
Y-
%
m-
%
d"
,
"
%
Y-
%
m"
,
)
# 获取年月 2026-03
next_month
=
CommonUtil
.
get_month_offset
(
rel_date_info
,
1
)
# 获取下一月
# 导出表
export_master_tb
=
f
"{site_name}_{tb_type}_asin_detail"
export_tb
=
f
"{export_master_tb}_{d_month_now}"
export_tb_copy
=
f
"{export_tb}_copy"
export_tb_copy
=
f
"{export_tb}_copy"
# copy临时表如 us_nsr_asin_detail_2026_04_copy
# 存在就删除
sql
=
f
"""drop table if exists {export_tb_copy}"""
DBUtil
.
exec_sql
(
db_type
=
db_type
,
site_name
=
site_name
,
sql
=
sql
,
dispose_flag
=
True
)
# 按主表结构新建
sql
=
f
"""create table {export_tb_copy} ( like {export_master_tb} including defaults including constraints );"""
DBUtil
.
exec_sql
(
db_type
=
db_type
,
site_name
=
site_name
,
sql
=
sql
,
dispose_flag
=
True
)
# 导出表名
# Sqoop 导出需要一段时间,如果直接写正式分区表,导出期间业务查询可能读到一半的脏数据。用 copy 表先接收数据,全部就绪后再一次性切换到正式表,
# 导出表名 导出到copy表
sh
=
CommonUtil
.
build_export_sh
(
site_name
=
site_name
,
db_type
=
db_type
,
...
...
@@ -93,14 +94,14 @@ def export_postgresql_cluster():
SSHUtil
.
exec_command_async
(
client
,
sh
,
ignore_err
=
False
)
client
.
close
()
# 导出表
# 导出表
copy 表替换正式分区
engine
=
DBUtil
.
get_db_engine
(
db_type
,
site_name
)
DBUtil
.
exchange_pg_part_distributed_tb
(
engine
=
engine
,
source_tb_name
=
export_tb_copy
,
part_master_tb
=
export_master_tb
,
part_target_tb
=
export_tb
,
part_val
=
{
source_tb_name
=
export_tb_copy
,
# 数据来源:copy 表
part_master_tb
=
export_master_tb
,
# 主表
part_target_tb
=
export_tb
,
# 正式月分区子表
part_val
=
{
# part_val = {'from': ['2026-04'], 'to': ['2026-05']}, # 分区范围
"from"
:
[
rel_date_info
],
"to"
:
[
next_month
]
},
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment