Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
ae9f5874
Commit
ae9f5874
authored
Apr 28, 2026
by
chenyuanjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
asin最新利润率表-Doris
parent
afffe7d3
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
19 additions
and
71 deletions
+19
-71
es_asin_profit_rate.py
Pyspark_job/export_es/es_asin_profit_rate.py
+19
-71
No files found.
Pyspark_job/export_es/es_asin_profit_rate.py
View file @
ae9f5874
...
...
@@ -50,72 +50,16 @@ class EsAsinProfitRate(object):
# ES相关配置
self
.
es_client
=
EsUtils
.
get_es_client
()
self
.
es_profit_rate_index
=
f
"{self.site_name}_profit_rate_extra_v2"
self
.
es_profit_rate_body
=
self
.
get_es_profit_rate_body
()
self
.
es_profit_rate_options
=
self
.
get_es_profit_rate_options
(
self
.
es_profit_rate_index
)
self
.
df_asin_profit_rate
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_keepa_add
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_cate_flag
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
@staticmethod
def
get_es_profit_rate_body
():
return
{
"settings"
:
{
"number_of_shards"
:
"3"
,
"number_of_replicas"
:
"1"
},
"mappings"
:
{
"properties"
:
{
"profit_key"
:
{
"type"
:
"keyword"
},
"asin"
:
{
"type"
:
"keyword"
},
"price"
:
{
"type"
:
"float"
},
"ocean_profit"
:
{
"type"
:
"float"
},
"air_profit"
:
{
"type"
:
"float"
},
"update_time"
:
{
"type"
:
"date"
,
"format"
:
"yyyy-MM-dd"
},
"asin_crawl_date"
:
{
"type"
:
"date"
,
"format"
:
"yyyy-MM-dd"
}
}
}
}
@staticmethod
def
get_es_profit_rate_options
(
index_name
):
return
{
"es.nodes"
:
EsUtils
.
__es_ip__
,
"es.port"
:
EsUtils
.
__es_port__
,
"es.net.http.auth.user"
:
EsUtils
.
__es_user__
,
"es.net.http.auth.pass"
:
EsUtils
.
__es_passwd__
,
"es.mapping.id"
:
"profit_key"
,
"es.resource"
:
f
"{index_name}/_doc"
,
"es.batch.write.refresh"
:
"false"
,
"es.batch.write.retry.wait"
:
"60s"
,
"es.batch.size.entries"
:
"5000"
,
"es.nodes.wan.only"
:
"false"
,
"es.batch.write.concurrency"
:
"10"
,
"es.write.operation"
:
"index"
}
def
run
(
self
):
self
.
read_profit_rate_add
()
self
.
read_keepa_add
()
# 利润率主索引
self
.
save_profit_rate_to_
e
s
()
# 利润率主索引
写入Doris(已从ES迁移至Doris selection.{site_name}_asin_latest_profit_rate)
self
.
save_profit_rate_to_
dori
s
()
# st_detail_month 近3个月
for
index_name
in
self
.
get_recent_indexes
(
"st_detail_month"
):
date_info
=
self
.
get_date_info_from_index
(
index_name
)
...
...
@@ -210,22 +154,26 @@ class EsAsinProfitRate(object):
self
.
df_cate_flag
.
show
(
10
,
False
)
# ------------------------------------------------------------------ #
# 利润率主索引
# 利润率主索引
写入Doris
# ------------------------------------------------------------------ #
def
save_profit_rate_to_es
(
self
):
def
save_profit_rate_to_doris
(
self
):
doris_table
=
f
"{self.site_name}_asin_latest_profit_rate"
print
(
f
"
\n
{'='*60}"
)
print
(
f
"开始更新
利润率索引:{self.es_profit_rate_index
}"
)
print
(
f
"开始更新
Doris利润率表:selection.{doris_table
}"
)
print
(
f
"{'='*60}"
)
EsUtils
.
create_index
(
self
.
es_profit_rate_index
,
self
.
es_client
,
self
.
es_profit_rate_body
)
try
:
self
.
df_asin_profit_rate
.
repartition
(
10
)
.
write
.
format
(
"org.elasticsearch.spark.sql"
)
\
.
options
(
**
self
.
es_profit_rate_options
)
\
.
mode
(
"append"
)
\
.
save
()
print
(
f
"ES {self.es_profit_rate_index} 索引更新完毕!"
)
except
Exception
as
e
:
print
(
"An error occurred while writing to Elasticsearch:"
,
str
(
e
))
CommonUtil
.
send_wx_msg
([
'chenyuanjie'
],
'
\u26A0
ES数据更新失败'
,
f
'失败索引:{self.es_profit_rate_index}'
)
table_columns
=
'asin,price,ocean_profit,air_profit,asin_crawl_date,update_time'
DorisHelper
.
spark_export_with_columns
(
df_save
=
self
.
df_asin_profit_rate
.
select
(
'asin'
,
'price'
,
'ocean_profit'
,
'air_profit'
,
F
.
to_date
(
F
.
col
(
'asin_crawl_date'
),
'yyyy-MM-dd'
)
.
alias
(
'asin_crawl_date'
),
F
.
to_date
(
F
.
col
(
'update_time'
),
'yyyy-MM-dd'
)
.
alias
(
'update_time'
)
),
db_name
=
'selection'
,
table_name
=
doris_table
,
table_columns
=
table_columns
,
use_type
=
'selection'
,
)
print
(
f
"Doris selection.{doris_table} 更新完毕!"
)
# ------------------------------------------------------------------ #
# 工具方法
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment