Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
d913491e
Commit
d913491e
authored
Apr 17, 2026
by
hejiangming
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'developer' of
http://47.106.101.75/abel_cjy/Amazon-Selection-Data
into developer
parents
c932294b
0dcdc879
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
53 additions
and
35 deletions
+53
-35
dwt_asin_sync.py
Pyspark_job/dwt/dwt_asin_sync.py
+31
-13
export_dwt_asin_sync.py
Pyspark_job/sqoop_export/export_dwt_asin_sync.py
+1
-1
es_util.py
Pyspark_job/utils/es_util.py
+6
-0
common_udf.py
Pyspark_job/yswg_utils/common_udf.py
+15
-21
No files found.
Pyspark_job/dwt/dwt_asin_sync.py
View file @
d913491e
...
...
@@ -132,18 +132,32 @@ class DwtAsinSync(Templates):
sql_asin_stable
=
f
"""select asin, asin_volume as volume, asin_weight_str as weight_str from dim_asin_stable_info where site_name="{self.site_name}";"""
self
.
df_asin_stable
=
self
.
read_data_common
(
sql
=
sql_asin_stable
,
content
=
"2.2 读取dim_asin_variation_info表的asin重量体积属性"
)
# 读取syn爬虫表
table_syn
=
f
"us_all_syn_st_day_{self.date_info.replace('-', '_')}"
if
self
.
date_type
==
'day'
else
f
"us_all_syn_st_month_{self.date_info.replace('-', '_')}"
sql_asin_syn
=
f
"select asin from {table_syn};"
pdf_asin
=
self
.
engine_pg14
.
read_sql
(
sql_asin_syn
)
print
((
f
"pdf_asin: {pdf_asin.shape}, sql_asin_syn: {sql_asin_syn}"
))
if
pdf_asin
.
shape
[
0
]:
schema
=
StructType
([
StructField
(
'asin'
,
StringType
(),
True
),
])
self
.
df_asin_syn
=
self
.
spark
.
createDataFrame
(
pdf_asin
,
schema
=
schema
)
self
.
df_asin_syn
=
self
.
df_asin_syn
.
drop_duplicates
([
"asin"
])
.
cache
()
print
(
f
"self.df_asin_syn: {self.df_asin_syn.count()}"
)
self
.
df_asin_syn
.
show
(
10
,
truncate
=
False
)
while
True
:
try
:
table_syn
=
f
"{self.site_name}_all_syn_st_day_{self.date_info.replace('-', '_')}"
if
self
.
date_type
==
'day'
else
f
"{self.site_name}_all_syn_st_month_{self.date_info.replace('-', '_')}"
sql_asin_syn
=
f
"select asin from {table_syn};"
pdf_asin
=
self
.
engine_pg14
.
read_sql
(
sql_asin_syn
)
print
((
f
"pdf_asin: {pdf_asin.shape}, sql_asin_syn: {sql_asin_syn}"
))
if
pdf_asin
.
shape
[
0
]:
schema
=
StructType
([
StructField
(
'asin'
,
StringType
(),
True
),
])
self
.
df_asin_syn
=
self
.
spark
.
createDataFrame
(
pdf_asin
,
schema
=
schema
)
self
.
df_asin_syn
=
self
.
df_asin_syn
.
drop_duplicates
([
"asin"
])
.
cache
()
print
(
f
"self.df_asin_syn: {self.df_asin_syn.count()}"
)
self
.
df_asin_syn
.
show
(
10
,
truncate
=
False
)
break
except
Exception
as
e
:
time
.
sleep
(
100
)
self
.
engine_pg14
=
get_remote_engine
(
site_name
=
self
.
site_name
,
db_type
=
'postgresql_14'
)
self
.
engine_mysql
=
get_remote_engine
(
site_name
=
self
.
site_name
,
db_type
=
'mysql'
)
continue
def
handle_data
(
self
):
if
self
.
date_type
in
[
'month'
]:
...
...
@@ -182,13 +196,17 @@ class DwtAsinSync(Templates):
# 处理同步逻辑
print
(
"==="
*
20
)
print
(
f
"{type(self.df_asin_syn)}: {self.df_asin_syn.count()}"
)
if
self
.
date_type
!=
'day'
and
self
.
df_asin_syn
.
count
()
>
0
:
if
self
.
date_type
!=
'day'
and
self
.
df_asin_syn
.
count
()
>
1
:
self
.
df_save
=
self
.
df_save
.
join
(
self
.
df_asin_syn
,
on
=
[
'asin'
],
how
=
"left_anti"
)
self
.
df_save
=
self
.
df_save
.
withColumn
(
'site_name'
,
F
.
lit
(
self
.
site_name
))
self
.
df_save
=
self
.
df_save
.
withColumn
(
'date_type'
,
F
.
lit
(
self
.
date_type
))
self
.
df_save
=
self
.
df_save
.
withColumn
(
'date_info'
,
F
.
lit
(
self
.
date_info
))
self
.
df_save
.
show
(
10
,
truncate
=
False
)
print
(
f
"self.df_save: {self.df_save.count()}"
)
users
=
[
"fangxingjun"
,
"chenyuanjie"
,
"pengyanbing"
]
title
=
f
"dwd_asin_to_pg: {self.site_name}, {self.date_type}, {self.date_info}"
content
=
f
"整合asin完成--等待导出到pg提供爬虫使用--self.df_save.count: {self.df_save.count()}"
CommonUtil
()
.
send_wx_msg
(
users
=
users
,
title
=
title
,
content
=
content
)
# def save_data(self):
# pass
...
...
Pyspark_job/sqoop_export/export_dwt_asin_sync.py
View file @
d913491e
...
...
@@ -6,7 +6,7 @@ from utils.secure_db_client import get_remote_engine
def
export_data
(
site_name
,
date_type
,
date_info
):
engine
=
get_remote_engine
(
site_name
=
"us"
,
# -> database "selection"
site_name
=
site_name
,
# -> database "selection"
db_type
=
"postgresql_14"
,
# -> 服务端 alias "mysql"
# user="fangxingjun", # -> 服务端 alias "mysql"
# user_token="5f1b2e9c3a4d7f60" # 可不传,走默认
...
...
Pyspark_job/utils/es_util.py
View file @
d913491e
...
...
@@ -1191,6 +1191,12 @@ class EsUtils(object):
"type"
:
"float"
}
}
},
"tracking_since"
:
{
"type"
:
"date"
},
"tracking_since_type"
:
{
"type"
:
"short"
}
}
}
...
...
Pyspark_job/yswg_utils/common_udf.py
View file @
d913491e
...
...
@@ -846,28 +846,22 @@ def udf_parse_seller_json(seller_json):
sold_by
=
seller_info_parsed
.
get
(
"sold_by"
,
None
)
fulfilled_by
=
seller_info_parsed
.
get
(
"fulfilled_by"
,
None
)
seller_id
=
seller_info_parsed
.
get
(
"seller_id"
,
None
)
# 如果sold_by是amazon,优先归为1类
if
sold_by
and
sold_by
.
lower
()
.
strip
()
.
startswith
(
"amazon"
):
return
1
,
sold_by
,
seller_id
# Amazon
# 判断是否为Amazon相关(FBA情况)
if
(
ship_from
and
ship_from
.
lower
()
.
strip
()
.
startswith
(
"amazon"
))
or
(
fulfilled_by
and
'amazon'
in
fulfilled_by
.
lower
()):
return
2
,
sold_by
,
seller_id
# FBA
# FBM情况:有发货地或配送方信息,且有销售方
if
(
ship_from
or
fulfilled_by
)
and
sold_by
:
return
3
,
sold_by
,
seller_id
# FBM
# 其他情况
return
4
,
sold_by
,
seller_id
# Other
# if (ship_from and ship_from.lower().strip().startswith("amazon")) or (
# fulfilled_by and 'amazon' in fulfilled_by.lower()):
# if sold_by and not sold_by.lower().strip().startswith("amazon"):
# return 2, sold_by, seller_id # FBA
# elif sold_by and sold_by.lower().strip().startswith("amazon"):
# return 1, sold_by, seller_id # Amazon
# elif (ship_from or fulfilled_by) and sold_by:
# return 3, sold_by, seller_id # FBM
# return 4, sold_by, seller_id # Other
# ship_from 与 fulfilled_by 同义,任意一个为 amazon 即视为 amazon 发货
from_is_amazon
=
bool
(
(
ship_from
and
ship_from
.
lower
()
.
strip
()
.
startswith
(
"amazon"
))
or
(
fulfilled_by
and
fulfilled_by
.
lower
()
.
strip
()
.
startswith
(
"amazon"
))
)
from_field
=
ship_from
or
fulfilled_by
# 仅用于判断发货字段是否存在
sold_is_amazon
=
bool
(
sold_by
and
sold_by
.
lower
()
.
strip
()
.
startswith
(
"amazon"
))
if
from_is_amazon
and
sold_is_amazon
:
return
1
,
sold_by
,
seller_id
# 亚马逊自营:发货方和销售方均为 amazon
if
from_is_amazon
and
sold_by
:
return
2
,
sold_by
,
seller_id
# FBA:amazon 发货,第三方销售
if
from_field
and
sold_by
:
return
3
,
sold_by
,
seller_id
# FBM:第三方发货且第三方销售
return
4
,
sold_by
,
seller_id
# Other
def
udf_parse_amazon_orders
(
asin_amazon_orders_str
):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment