Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
705596d3
Commit
705596d3
authored
Mar 31, 2026
by
fangxingjun
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
417b6578
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
30 additions
and
13 deletions
+30
-13
dwt_asin_sync.py
Pyspark_job/dwt/dwt_asin_sync.py
+30
-13
No files found.
Pyspark_job/dwt/dwt_asin_sync.py
View file @
705596d3
...
...
@@ -58,11 +58,11 @@ class DwtAsinSync(Templates):
)
print
(
type
(
self
.
engine_mysql
))
print
(
self
.
engine_mysql
)
self
.
date_type
=
self
.
judge_date_type
()
self
.
get_date_info_tuple
()
# self.date_info_tuple
self
.
table_syn
=
f
"{self.site_name}_all_syn_st_month_{self.date_info.replace('-', '_')}"
if
self
.
site_name
==
'us'
else
f
"{self.site_name}_all_syn_st_{self.date_info.replace('-', '_')}"
self
.
partitions_by
=
[
'site_name'
,
'date_type'
,
'date_info'
]
self
.
reset_partitions
(
partitions_num
=
5
)
self
.
date_type
=
self
.
judge_date_type
()
def
judge_date_type
(
self
):
print
(
f
"site_name: {self.site_name}, date_type: {self.date_type}, date_info: {self.date_info}"
)
...
...
@@ -74,13 +74,27 @@ class DwtAsinSync(Templates):
if
list
(
df
.
st_count
)[
0
]
>=
100
_0000
:
self
.
date_type
=
"month"
# 追加syn爬虫表
else
:
self
.
date_type
=
"month_week"
# 清空syn爬虫表
# self.date_type = "month_week" # 清空syn爬虫表
while
True
:
try
:
sql_truncate
=
f
"truncate {self.table_syn};"
print
(
f
"月搜索词没有导入进来, 需要先清空表, sql_truncate: {sql_truncate}"
)
self
.
engine_pg14
.
execute
(
sql_truncate
)
break
except
Exception
as
e
:
print
(
e
,
traceback
.
format_exc
())
time
.
sleep
(
random
.
randint
(
3
,
10
))
self
.
engine_pg14
=
get_remote_engine
(
site_name
=
self
.
site_name
,
db_type
=
'postgresql_14'
)
continue
return
self
.
date_type
def
read_data_common
(
self
,
sql
,
content
,
col_dup
=
'asin'
):
df
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
.
drop_duplicates
([
col_dup
])
.
cache
()
df
.
show
(
10
,
truncate
=
False
)
print
(
f
"sql: {sql}, content: {content}, count: {df.count()}"
)
df
.
show
(
10
,
truncate
=
False
)
return
df
def
read_data
(
self
):
...
...
@@ -106,16 +120,18 @@ class DwtAsinSync(Templates):
sql_asin_stable
=
f
"""select asin, asin_volume as volume, asin_weight_str as weight_str from dim_asin_stable_info where site_name="{self.site_name}";"""
self
.
df_asin_stable
=
self
.
read_data_common
(
sql
=
sql_asin_stable
,
content
=
"2.2 读取dim_asin_variation_info表的asin重量体积属性"
)
# 读取syn爬虫表
year
,
month
=
self
.
date_info
.
split
(
"-"
)
sql_asin_syn
=
f
"select asin from
us_all_syn_st_month_{year} where date_info='{self.date_info}'
"
table_syn
=
f
"us_all_syn_st_day_{self.date_info.replace('-', '_')}"
if
self
.
date_type
==
'day'
else
f
"us_all_syn_st_month_{self.date_info.replace('-', '_')}"
sql_asin_syn
=
f
"select asin from
{table_syn};
"
pdf_asin
=
self
.
engine_pg14
.
read_sql
(
sql_asin_syn
)
schema
=
StructType
([
StructField
(
'asin'
,
StringType
(),
True
),
])
self
.
df_asin_syn
=
self
.
spark
.
createDataFrame
(
pdf_asin
,
schema
=
schema
)
self
.
df_asin_syn
=
self
.
df_asin_syn
.
drop_duplicates
([
"asin"
])
.
cache
()
print
(
f
"self.df_asin_syn: {self.df_asin_syn.count()}"
)
self
.
df_asin_syn
.
show
(
10
,
truncate
=
False
)
print
((
f
"pdf_asin: {pdf_asin.shape}, sql_asin_syn: {sql_asin_syn}"
))
if
pdf_asin
.
shape
[
0
]:
schema
=
StructType
([
StructField
(
'asin'
,
StringType
(),
True
),
])
self
.
df_asin_syn
=
self
.
spark
.
createDataFrame
(
pdf_asin
,
schema
=
schema
)
self
.
df_asin_syn
=
self
.
df_asin_syn
.
drop_duplicates
([
"asin"
])
.
cache
()
print
(
f
"self.df_asin_syn: {self.df_asin_syn.count()}"
)
self
.
df_asin_syn
.
show
(
10
,
truncate
=
False
)
def
handle_data
(
self
):
if
self
.
date_type
in
[
'month'
]:
...
...
@@ -149,7 +165,8 @@ class DwtAsinSync(Templates):
self
.
df_asin_stable
,
on
=
'asin'
,
how
=
'left'
)
# 处理同步逻辑
self
.
df_save
=
self
.
df_save
.
join
(
self
.
df_asin_syn
,
on
=
[
'asin'
],
how
=
"left_anti"
)
if
self
.
date_type
!=
'day'
:
self
.
df_save
=
self
.
df_save
.
join
(
self
.
df_asin_syn
,
on
=
[
'asin'
],
how
=
"left_anti"
)
self
.
df_save
=
self
.
df_save
.
withColumn
(
'site_name'
,
F
.
lit
(
self
.
site_name
))
self
.
df_save
=
self
.
df_save
.
withColumn
(
'date_type'
,
F
.
lit
(
self
.
date_type
))
self
.
df_save
=
self
.
df_save
.
withColumn
(
'date_info'
,
F
.
lit
(
self
.
date_info
))
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment