Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
51908cbf
Commit
51908cbf
authored
Jun 09, 2026
by
hejiangming
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
914bde21
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
26 additions
and
5 deletions
+26
-5
dws_st_theme.py
Pyspark_job/sqoop_export/dws_st_theme.py
+13
-4
dwt_st_theme_agg.py
Pyspark_job/sqoop_export/dwt_st_theme_agg.py
+13
-1
No files found.
Pyspark_job/sqoop_export/dws_st_theme.py
View file @
51908cbf
...
@@ -6,7 +6,7 @@ sys.path.append(os.path.dirname(sys.path[0]))
...
@@ -6,7 +6,7 @@ sys.path.append(os.path.dirname(sys.path[0]))
from
utils.db_util
import
DBUtil
from
utils.db_util
import
DBUtil
from
utils.ssh_util
import
SSHUtil
from
utils.ssh_util
import
SSHUtil
from
utils.common_util
import
CommonUtil
,
DateTypes
from
utils.common_util
import
CommonUtil
,
DateTypes
from
utils.hdfs_utils
import
HdfsUtils
if
__name__
==
'__main__'
:
if
__name__
==
'__main__'
:
site_name
=
CommonUtil
.
get_sys_arg
(
1
,
None
)
site_name
=
CommonUtil
.
get_sys_arg
(
1
,
None
)
date_type
=
CommonUtil
.
get_sys_arg
(
2
,
None
)
date_type
=
CommonUtil
.
get_sys_arg
(
2
,
None
)
...
@@ -21,16 +21,25 @@ if __name__ == '__main__':
...
@@ -21,16 +21,25 @@ if __name__ == '__main__':
db_type
=
'postgresql_test'
db_type
=
'postgresql_test'
print
(
"导出到测试库中"
)
print
(
"导出到测试库中"
)
else
:
else
:
CommonUtil
.
judge_is_work_hours
(
site_name
=
site_name
,
date_type
=
date_type
,
date_info
=
date_info
,
principal
=
'hejiangming'
,
priority
=
2
,
export_tools_type
=
1
,
belonging_to_process
=
f
'主题标签_{date_type}'
)
db_type
=
"postgresql_cluster"
db_type
=
"postgresql_cluster"
print
(
"导出到PG集群库库中"
)
print
(
"导出到PG集群库库中"
)
# 获取数据库连接
engine
=
DBUtil
.
get_db_engine
(
db_type
,
site_name
)
# 导出前校验 Hive 分区是否有数据,避免空分区触发交换导致 PG 数据被清空
hive_partition_path
=
f
"/home/big_data_selection/dwt/dwt_aba_last_change_rate/site_name={site_name}/date_type={date_type}/date_info={date_info}"
hive_files
=
HdfsUtils
.
read_list
(
hive_partition_path
)
if
not
hive_files
:
print
(
f
"[ERROR] Hive 分区无数据文件,路径:{hive_partition_path},跳过导出,请先检查 DWT 计算任务是否正常写入!"
)
engine
.
dispose
()
sys
.
exit
(
1
)
print
(
f
"Hive 分区文件数:{len(hive_files)},路径:{hive_partition_path},继续导出"
)
year_str
=
CommonUtil
.
safeIndex
(
date_info
.
split
(
"-"
),
0
,
None
)
year_str
=
CommonUtil
.
safeIndex
(
date_info
.
split
(
"-"
),
0
,
None
)
suffix
=
str
(
date_info
)
.
replace
(
"-"
,
"_"
)
suffix
=
str
(
date_info
)
.
replace
(
"-"
,
"_"
)
base_tb
=
f
"{site_name}_st_theme_detail"
base_tb
=
f
"{site_name}_st_theme_detail"
# 获取数据库连接
engine
=
DBUtil
.
get_db_engine
(
db_type
,
site_name
)
if
date_type
in
(
DateTypes
.
last30day
.
name
,
DateTypes
.
month_week
.
name
):
if
date_type
in
(
DateTypes
.
last30day
.
name
,
DateTypes
.
month_week
.
name
):
export_tb_target
=
f
"{base_tb}_last30day"
export_tb_target
=
f
"{base_tb}_last30day"
export_tb_copy
=
f
"{export_tb_target}_copy"
export_tb_copy
=
f
"{export_tb_target}_copy"
...
...
Pyspark_job/sqoop_export/dwt_st_theme_agg.py
View file @
51908cbf
...
@@ -6,7 +6,7 @@ sys.path.append(os.path.dirname(sys.path[0]))
...
@@ -6,7 +6,7 @@ sys.path.append(os.path.dirname(sys.path[0]))
from
utils.db_util
import
DBUtil
from
utils.db_util
import
DBUtil
from
utils.ssh_util
import
SSHUtil
from
utils.ssh_util
import
SSHUtil
from
utils.common_util
import
CommonUtil
from
utils.common_util
import
CommonUtil
from
utils.hdfs_utils
import
HdfsUtils
if
__name__
==
'__main__'
:
if
__name__
==
'__main__'
:
site_name
=
CommonUtil
.
get_sys_arg
(
1
,
None
)
site_name
=
CommonUtil
.
get_sys_arg
(
1
,
None
)
date_type
=
CommonUtil
.
get_sys_arg
(
2
,
None
)
date_type
=
CommonUtil
.
get_sys_arg
(
2
,
None
)
...
@@ -20,8 +20,20 @@ if __name__ == '__main__':
...
@@ -20,8 +20,20 @@ if __name__ == '__main__':
db_type
=
'postgresql_test'
db_type
=
'postgresql_test'
print
(
"导出到测试库中"
)
print
(
"导出到测试库中"
)
else
:
else
:
CommonUtil
.
judge_is_work_hours
(
site_name
=
site_name
,
date_type
=
date_type
,
date_info
=
date_info
,
principal
=
'hejiangming'
,
priority
=
2
,
export_tools_type
=
1
,
belonging_to_process
=
f
'主题标签_{date_type}'
)
db_type
=
"postgresql_cluster"
db_type
=
"postgresql_cluster"
print
(
"导出到PG集群库库中"
)
print
(
"导出到PG集群库库中"
)
# 获取数据库连接
engine
=
DBUtil
.
get_db_engine
(
db_type
,
site_name
)
# 导出前校验 Hive 分区是否有数据,避免空分区触发交换导致 PG 数据被清空
hive_partition_path
=
f
"/home/big_data_selection/dwt/dwt_aba_last_change_rate/site_name={site_name}/date_type={date_type}/date_info={date_info}"
hive_files
=
HdfsUtils
.
read_list
(
hive_partition_path
)
if
not
hive_files
:
print
(
f
"[ERROR] Hive 分区无数据文件,路径:{hive_partition_path},跳过导出,请先检查 DWT 计算任务是否正常写入!"
)
engine
.
dispose
()
sys
.
exit
(
1
)
print
(
f
"Hive 分区文件数:{len(hive_files)},路径:{hive_partition_path},继续导出"
)
year_str
=
CommonUtil
.
safeIndex
(
date_info
.
split
(
"-"
),
0
,
None
)
year_str
=
CommonUtil
.
safeIndex
(
date_info
.
split
(
"-"
),
0
,
None
)
suffix
=
str
(
date_info
)
.
replace
(
"-"
,
"_"
)
suffix
=
str
(
date_info
)
.
replace
(
"-"
,
"_"
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment