Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
bc0deed5
Commit
bc0deed5
authored
Jun 22, 2026
by
chenyuanjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
每日导出asin计算利润率fix
parent
153fd5ec
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
32 additions
and
1 deletions
+32
-1
export_need_profit_rate.py
Pyspark_job/script/export_need_profit_rate.py
+32
-1
No files found.
Pyspark_job/script/export_need_profit_rate.py
View file @
bc0deed5
...
...
@@ -143,6 +143,32 @@ class ExportNeedProfitRate(object):
# ---------------- 已计算过利润率的快照(路径 B 用作 LEFT ANTI 基准) ----------------
def
_read_calc_pending_asin
(
self
):
"""读取 _asin_profit_rate_calc 中需排除的(asin,price),避免重复导出:
1. calc_flag=0:已导出但尚未计算完成
2. calc_flag=1 AND updated_time>=date_info:今天刚计算,尚未同步到Hive
"""
con_info
=
DBUtil
.
get_connection_info
(
db_type
=
'postgresql_cluster'
,
site_name
=
self
.
site_name
)
table_name
=
f
"{self.site_name}_asin_profit_rate_calc"
pg_sql
=
(
f
"(select asin, price, package_length, package_width, package_height, weight "
f
"from {table_name} "
f
"where calc_flag = 0 "
f
" or (calc_flag = 1 and updated_time >= '{self.date_info}')) t"
)
df
=
self
.
spark
.
read
.
format
(
"jdbc"
)
\
.
option
(
"url"
,
con_info
[
"url"
])
\
.
option
(
"dbtable"
,
pg_sql
)
\
.
option
(
"user"
,
con_info
[
"username"
])
\
.
option
(
"password"
,
con_info
[
"pwd"
])
\
.
load
()
\
.
withColumn
(
'price'
,
F
.
round
(
F
.
col
(
'price'
),
2
)
.
cast
(
'decimal(20,2)'
))
\
.
dropDuplicates
([
'asin'
,
'price'
,
'package_length'
,
'package_width'
,
'package_height'
,
'weight'
])
\
.
repartition
(
40
,
'asin'
,
'price'
)
\
.
cache
()
print
(
f
"_asin_profit_rate_calc 排除数据量(未计算+今日已算未同步): {df.count():,}"
)
return
df
def
_read_profit_rate_latest_snapshot
(
self
):
"""读 hive dim_asin_profit_rate_info(全站点过滤),按 (asin, price) 去重,作为"已计算过"基准"""
sql_profit
=
f
"""
...
...
@@ -198,7 +224,12 @@ class ExportNeedProfitRate(object):
'asin'
,
'price'
,
'category'
,
'package_length'
,
'package_width'
,
'package_height'
,
'weight'
,
'part_key'
,
'source_month'
,
'asin_crawl_date'
,
)
.
cache
()
)
# 排除 _asin_profit_rate_calc 中 calc_flag=0 的(asin,price):已导出但未计算完成,不重复导出
df_calc_pending
=
self
.
_read_calc_pending_asin
()
df_result
=
df_result
.
join
(
df_calc_pending
,
on
=
[
'asin'
,
'price'
,
'package_length'
,
'package_width'
,
'package_height'
,
'weight'
],
how
=
'left_anti'
)
.
cache
()
df_calc_pending
.
unpersist
()
count
=
df_result
.
count
()
print
(
f
"待计算利润率数据量: {count:,}"
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment