Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
66d45910
Commit
66d45910
authored
Apr 27, 2026
by
fangxingjun
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
27e0f356
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
63 additions
and
5 deletions
+63
-5
export_dwt_asin_sync.py
Pyspark_job/sqoop_export/export_dwt_asin_sync.py
+57
-0
ods_asin_detail.py
Pyspark_job/sqoop_import/ods_asin_detail.py
+6
-5
No files found.
Pyspark_job/sqoop_export/export_dwt_asin_sync.py
View file @
66d45910
import
os
import
os
import
sys
import
sys
import
time
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
from
utils.secure_db_client
import
get_remote_engine
from
utils.secure_db_client
import
get_remote_engine
def
update_workflow_manager
(
site_name
,
date_type
,
date_info
):
if
date_type
==
"month"
:
while
True
:
try
:
site_name_pri_dict
=
{
"us"
:
2
,
"uk"
:
4
,
"de"
:
6
,
}
engine_mysql
=
get_remote_engine
(
site_name
=
'us'
,
db_type
=
'mysql'
)
with
engine_mysql
.
begin
()
as
conn
:
priority
=
site_name_pri_dict
[
site_name
]
update_sql_workflow
=
f
"""
INSERT INTO workflow_manager
(
workflow_name,
site_name,
date_type,
date_info,
priority,
spider_name,
spider_is_ready,
spider_state,
bg_name,
bg_dol_state
)
VALUES
(
'月全流程',
'{site_name}',
'month',
'{date_info}',
{priority},
'us_spider_asin',
'yes',
1,
'us_all_cal',
1
)
ON DUPLICATE KEY UPDATE
spider_is_ready = VALUES(spider_is_ready),
spider_state = VALUES(spider_state);
"""
print
(
f
"workflow_manager进度表---重置爬虫的asin抓取进度: {update_sql_workflow}"
)
conn
.
execute
(
update_sql_workflow
)
except
Exception
as
e
:
time
.
sleep
(
300
)
continue
def
export_data
(
site_name
,
date_type
,
date_info
):
def
export_data
(
site_name
,
date_type
,
date_info
):
engine
=
get_remote_engine
(
engine
=
get_remote_engine
(
site_name
=
site_name
,
# -> database "selection"
site_name
=
site_name
,
# -> database "selection"
...
@@ -41,3 +97,4 @@ if __name__ == '__main__':
...
@@ -41,3 +97,4 @@ if __name__ == '__main__':
date_type
=
sys
.
argv
[
2
]
# 参数2:类型:week/4_week/month/quarter/day
date_type
=
sys
.
argv
[
2
]
# 参数2:类型:week/4_week/month/quarter/day
date_info
=
sys
.
argv
[
3
]
# 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
date_info
=
sys
.
argv
[
3
]
# 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
export_data
(
site_name
,
date_type
,
date_info
)
export_data
(
site_name
,
date_type
,
date_info
)
update_workflow_manager
(
site_name
,
date_type
,
date_info
)
Pyspark_job/sqoop_import/ods_asin_detail.py
View file @
66d45910
...
@@ -22,8 +22,9 @@ if __name__ == '__main__':
...
@@ -22,8 +22,9 @@ if __name__ == '__main__':
# import_table = f"{site_name}_asin_detail_month_{d1}_{d2}"
# import_table = f"{site_name}_asin_detail_month_{d1}_{d2}"
# if date_type == 'day':
# if date_type == 'day':
# import_table = f"{site_name}_asin_detail_day_{date_info.replace('-', '_')}"
# import_table = f"{site_name}_asin_detail_day_{date_info.replace('-', '_')}"
import_table
=
f
"{site_name}_asin_detail_{date_type}_{date_info.replace('-', '_')}"
check_table
=
f
"{site_name}_all_syn_st_{date_type}_{date_info.replace('-', '_')}"
import_table
=
f
"{site_name}_asin_detail_{date_type.replace('_week', '')}_{date_info.replace('-', '_')}"
check_table
=
f
"{site_name}_all_syn_st_{date_type.replace('_week', '')}_{date_info.replace('-', '_')}"
hive_table
=
"ods_asin_detail"
hive_table
=
"ods_asin_detail"
partition_dict
=
{
partition_dict
=
{
"site_name"
:
site_name
,
"site_name"
:
site_name
,
...
@@ -54,14 +55,14 @@ if __name__ == '__main__':
...
@@ -54,14 +55,14 @@ if __name__ == '__main__':
sql_check_syn
=
f
"select * from {check_table} where state in (1, 2) limit 100"
sql_check_syn
=
f
"select * from {check_table} where state in (1, 2) limit 100"
df
=
engine
.
read_sql
(
sql_check_syn
)
df
=
engine
.
read_sql
(
sql_check_syn
)
if
df
.
shape
[
0
]
>
0
:
if
df
.
shape
[
0
]
>
0
:
print
(
f
"爬虫还未抓完, 等待5分钟继续"
)
print
(
f
"
asin详情--
爬虫还未抓完, 等待5分钟继续"
)
time
.
sleep
(
300
)
time
.
sleep
(
300
)
continue
continue
else
:
else
:
print
(
"爬虫已经全部抓取完成, 可以同步数据"
)
print
(
"
asin详情--
爬虫已经全部抓取完成, 可以同步数据"
)
break
break
except
Exception
as
e
:
except
Exception
as
e
:
print
(
f
"检查asin是否全部抓取完成报错, 报错信息: {e}, {traceback.format_exc()}"
)
print
(
f
"
asin详情--
检查asin是否全部抓取完成报错, 报错信息: {e}, {traceback.format_exc()}"
)
time
.
sleep
(
300
)
time
.
sleep
(
300
)
engine
=
get_remote_engine
(
engine
=
get_remote_engine
(
site_name
=
site_name
,
site_name
=
site_name
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment