Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
6c0b5a67
Commit
6c0b5a67
authored
Jun 17, 2025
by
chenyuanjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
关联流量-导出需补抓的asin
parent
ac41b65d
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
120 additions
and
0 deletions
+120
-0
update_syn_pg14.py
Pyspark_job/script/update_syn_pg14.py
+120
-0
No files found.
Pyspark_job/script/update_syn_pg14.py
0 → 100644
View file @
6c0b5a67
import
os
import
sys
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
# 上级目录
from
utils.templates
import
Templates
from
pyspark.sql
import
functions
as
F
from
utils.spark_util
import
SparkUtil
from
utils.db_util
import
DBUtil
from
utils.common_util
import
CommonUtil
class
UpdateSynPG14
(
Templates
):
def
__init__
(
self
,
site_name
,
date_type
,
date_info
):
super
()
.
__init__
()
self
.
site_name
=
site_name
self
.
date_type
=
date_type
self
.
date_info
=
date_info
app_name
=
f
"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
self
.
spark
=
SparkUtil
.
get_spark_session
(
app_name
)
self
.
df_existing_asin
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_related_asin
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_asin_variation
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_asin_stable
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_save
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
def
read_data
(
self
):
print
(
"读取ods_asin_detail表,获取所有已抓asin"
)
sql
=
f
"""
select asin from ods_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';
"""
self
.
df_existing_asin
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
.
drop_duplicates
([
'asin'
])
.
cache
()
print
(
"本月已抓asin如下:"
)
self
.
df_existing_asin
.
show
(
10
,
True
)
print
(
"从dwt_asin_related_traffic表中读取所有关联asin"
)
sql
=
f
"""
select related_asin from dwt_asin_related_traffic where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';
"""
self
.
df_related_asin
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
.
cache
()
print
(
"关联asin数据如下:"
)
self
.
df_related_asin
.
show
(
10
,
True
)
print
(
"读取dim_asin_variation_info表"
)
sql
=
f
"""
select asin, 1 as asin_is_variation from dim_asin_variation_info where site_name='{self.site_name}';
"""
self
.
df_asin_variation
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
.
drop_duplicates
([
'asin'
])
.
cache
()
print
(
"asin_variation数据如下:"
)
self
.
df_asin_variation
.
show
(
10
,
True
)
print
(
"读取dim_asin_stable_info表"
)
sql
=
f
"""
select asin, asin_volume as volume, asin_weight_str as weight_str from dim_asin_stable_info where site_name='{self.site_name}';
"""
self
.
df_asin_stable
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
.
drop_duplicates
([
'asin'
])
.
cache
()
print
(
"asin重量体积数据如下:"
)
self
.
df_asin_stable
.
show
(
10
,
True
)
def
handle_data
(
self
):
# 解析关联asin字段
self
.
df_related_asin
=
self
.
df_related_asin
.
withColumn
(
'asin'
,
F
.
explode
(
F
.
split
(
F
.
col
(
'related_asin'
),
','
))
)
.
select
(
'asin'
)
.
drop_duplicates
([
'asin'
])
# 找出需要补抓的asin,并关联详情数据
self
.
df_save
=
self
.
df_related_asin
.
join
(
self
.
df_existing_asin
,
on
=
'asin'
,
how
=
'anti'
)
.
join
(
self
.
df_asin_variation
,
on
=
'asin'
,
how
=
'left'
)
.
join
(
self
.
df_asin_stable
,
on
=
'asin'
,
how
=
'left'
)
.
fillna
({
'asin_is_variation'
:
0
})
# 入库前处理
self
.
df_save
=
self
.
df_save
.
filter
(
F
.
length
(
F
.
col
(
'asin'
))
==
10
)
.
withColumn
(
'state'
,
F
.
lit
(
1
)
)
.
withColumn
(
'data_type'
,
F
.
lit
(
2
)
)
.
withColumn
(
'date_info'
,
F
.
lit
(
self
.
date_info
)
)
.
select
(
'asin'
,
'state'
,
'asin_is_variation'
,
'date_info'
,
'data_type'
,
'volume'
,
'weight_str'
)
.
cache
()
# print("最终结果如下:")
# self.df_save.show(10, True)
# print(f"需要补抓的asin数据量为:{self.df_save.count()}")
def
save_data
(
self
):
# 爬虫数据库连接
con_info
=
DBUtil
.
get_connection_info
(
'postgresql_14'
,
self
.
site_name
)
year_month
=
str
(
self
.
date_info
)
.
replace
(
"-"
,
"_"
)
table_name
=
f
'{self.site_name}_all_syn_st_month_{year_month}'
self
.
df_save
.
write
.
format
(
"jdbc"
)
\
.
option
(
"url"
,
con_info
[
"url"
])
\
.
option
(
"dbtable"
,
table_name
)
\
.
option
(
"user"
,
con_info
[
"username"
])
\
.
option
(
"password"
,
con_info
[
"pwd"
])
\
.
mode
(
"append"
)
\
.
save
()
users
=
[
"chenyuanjie"
,
"pengyanbing"
]
title
=
f
"关联流量:{self.site_name},{self.date_info}"
content
=
f
"关联流量需补抓的asin已导出到syn表,补抓量:{self.df_save.count()}"
CommonUtil
.
send_wx_msg
(
users
=
users
,
title
=
title
,
content
=
content
)
pass
if
__name__
==
"__main__"
:
site_name
=
sys
.
argv
[
1
]
# 参数1:站点
date_type
=
sys
.
argv
[
2
]
# 参数2:类型:week/4_week/month/quarter
date_info
=
sys
.
argv
[
3
]
# 参数3:年-周/年-月/年-季, 比如: 2022-1
handle_obj
=
UpdateSynPG14
(
site_name
=
site_name
,
date_type
=
date_type
,
date_info
=
date_info
)
handle_obj
.
run
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment