Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
41c62ac6
Commit
41c62ac6
authored
Apr 16, 2026
by
fangxingjun
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
4c3ab8b0
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
28 additions
and
14 deletions
+28
-14
dwt_asin_sync.py
Pyspark_job/dwt/dwt_asin_sync.py
+27
-13
export_dwt_asin_sync.py
Pyspark_job/sqoop_export/export_dwt_asin_sync.py
+1
-1
No files found.
Pyspark_job/dwt/dwt_asin_sync.py
View file @
41c62ac6
...
...
@@ -132,18 +132,32 @@ class DwtAsinSync(Templates):
sql_asin_stable
=
f
"""select asin, asin_volume as volume, asin_weight_str as weight_str from dim_asin_stable_info where site_name="{self.site_name}";"""
self
.
df_asin_stable
=
self
.
read_data_common
(
sql
=
sql_asin_stable
,
content
=
"2.2 读取dim_asin_variation_info表的asin重量体积属性"
)
# 读取syn爬虫表
table_syn
=
f
"us_all_syn_st_day_{self.date_info.replace('-', '_')}"
if
self
.
date_type
==
'day'
else
f
"us_all_syn_st_month_{self.date_info.replace('-', '_')}"
sql_asin_syn
=
f
"select asin from {table_syn};"
pdf_asin
=
self
.
engine_pg14
.
read_sql
(
sql_asin_syn
)
print
((
f
"pdf_asin: {pdf_asin.shape}, sql_asin_syn: {sql_asin_syn}"
))
if
pdf_asin
.
shape
[
0
]:
schema
=
StructType
([
StructField
(
'asin'
,
StringType
(),
True
),
])
self
.
df_asin_syn
=
self
.
spark
.
createDataFrame
(
pdf_asin
,
schema
=
schema
)
self
.
df_asin_syn
=
self
.
df_asin_syn
.
drop_duplicates
([
"asin"
])
.
cache
()
print
(
f
"self.df_asin_syn: {self.df_asin_syn.count()}"
)
self
.
df_asin_syn
.
show
(
10
,
truncate
=
False
)
while
True
:
try
:
table_syn
=
f
"{self.site_name}_all_syn_st_day_{self.date_info.replace('-', '_')}"
if
self
.
date_type
==
'day'
else
f
"{self.site_name}_all_syn_st_month_{self.date_info.replace('-', '_')}"
sql_asin_syn
=
f
"select asin from {table_syn};"
pdf_asin
=
self
.
engine_pg14
.
read_sql
(
sql_asin_syn
)
print
((
f
"pdf_asin: {pdf_asin.shape}, sql_asin_syn: {sql_asin_syn}"
))
if
pdf_asin
.
shape
[
0
]:
schema
=
StructType
([
StructField
(
'asin'
,
StringType
(),
True
),
])
self
.
df_asin_syn
=
self
.
spark
.
createDataFrame
(
pdf_asin
,
schema
=
schema
)
self
.
df_asin_syn
=
self
.
df_asin_syn
.
drop_duplicates
([
"asin"
])
.
cache
()
print
(
f
"self.df_asin_syn: {self.df_asin_syn.count()}"
)
self
.
df_asin_syn
.
show
(
10
,
truncate
=
False
)
break
except
Exception
as
e
:
time
.
sleep
(
100
)
self
.
engine_pg14
=
get_remote_engine
(
site_name
=
self
.
site_name
,
db_type
=
'postgresql_14'
)
self
.
engine_mysql
=
get_remote_engine
(
site_name
=
self
.
site_name
,
db_type
=
'mysql'
)
continue
def
handle_data
(
self
):
if
self
.
date_type
in
[
'month'
]:
...
...
@@ -182,7 +196,7 @@ class DwtAsinSync(Templates):
# 处理同步逻辑
print
(
"==="
*
20
)
print
(
f
"{type(self.df_asin_syn)}: {self.df_asin_syn.count()}"
)
if
self
.
date_type
!=
'day'
and
self
.
df_asin_syn
.
count
()
>
0
:
if
self
.
date_type
!=
'day'
and
self
.
df_asin_syn
.
count
()
>
1
:
self
.
df_save
=
self
.
df_save
.
join
(
self
.
df_asin_syn
,
on
=
[
'asin'
],
how
=
"left_anti"
)
self
.
df_save
=
self
.
df_save
.
withColumn
(
'site_name'
,
F
.
lit
(
self
.
site_name
))
self
.
df_save
=
self
.
df_save
.
withColumn
(
'date_type'
,
F
.
lit
(
self
.
date_type
))
...
...
Pyspark_job/sqoop_export/export_dwt_asin_sync.py
View file @
41c62ac6
...
...
@@ -6,7 +6,7 @@ from utils.secure_db_client import get_remote_engine
def
export_data
(
site_name
,
date_type
,
date_info
):
engine
=
get_remote_engine
(
site_name
=
"us"
,
# -> database "selection"
site_name
=
site_name
,
# -> database "selection"
db_type
=
"postgresql_14"
,
# -> 服务端 alias "mysql"
# user="fangxingjun", # -> 服务端 alias "mysql"
# user_token="5f1b2e9c3a4d7f60" # 可不传,走默认
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment