Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
f7a08785
Commit
f7a08785
authored
Mar 18, 2026
by
chenyuanjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ASIN信息库修正索引结构
parent
c4a58441
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
173 additions
and
8 deletions
+173
-8
es_ai_asin_add.py
Pyspark_job/export_es/es_ai_asin_add.py
+41
-8
es_util.py
Pyspark_job/utils/es_util.py
+132
-0
No files found.
Pyspark_job/export_es/es_ai_asin_add.py
View file @
f7a08785
...
@@ -29,6 +29,7 @@ class EsAiAsinAdd(object):
...
@@ -29,6 +29,7 @@ class EsAiAsinAdd(object):
self
.
df_ai_asin_detail
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_ai_asin_detail
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_ai_asin_analyze
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_ai_asin_analyze
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_profit_rate
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_save_pg
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_save_pg
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_save_es
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_save_es
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
...
@@ -126,7 +127,10 @@ class EsAiAsinAdd(object):
...
@@ -126,7 +127,10 @@ class EsAiAsinAdd(object):
title_word_content,
title_word_content,
array_to_string(package_quantity_arr, ',') as package_quantity_arr,
array_to_string(package_quantity_arr, ',') as package_quantity_arr,
package_quantity_flag,
package_quantity_flag,
label_content
label_content,
festival,
multi_color_flag,
multi_color_content
from {self.site_name}_ai_asin_analyze_detail
from {self.site_name}_ai_asin_analyze_detail
"""
"""
self
.
df_ai_asin_analyze
=
SparkUtil
.
read_jdbc_query
(
self
.
df_ai_asin_analyze
=
SparkUtil
.
read_jdbc_query
(
...
@@ -143,6 +147,16 @@ class EsAiAsinAdd(object):
...
@@ -143,6 +147,16 @@ class EsAiAsinAdd(object):
print
(
"AI分析数据如下:"
)
print
(
"AI分析数据如下:"
)
self
.
df_ai_asin_analyze
.
show
(
10
,
True
)
self
.
df_ai_asin_analyze
.
show
(
10
,
True
)
# 读取利润率数据
sql3
=
f
"""
select asin, price, ocean_profit, air_profit
from dim_asin_profit_rate_info
where site_name = '{self.site_name}'
"""
self
.
df_profit_rate
=
self
.
spark
.
sql
(
sql3
)
.
repartition
(
40
,
'asin'
)
.
cache
()
print
(
"利润率数据如下:"
)
self
.
df_profit_rate
.
show
(
10
,
True
)
def
handle_data
(
self
):
def
handle_data
(
self
):
self
.
df_save_pg
=
self
.
df_ai_asin_detail
.
join
(
self
.
df_save_pg
=
self
.
df_ai_asin_detail
.
join
(
self
.
df_ai_asin_analyze
,
'asin'
,
'left_anti'
self
.
df_ai_asin_analyze
,
'asin'
,
'left_anti'
...
@@ -155,17 +169,35 @@ class EsAiAsinAdd(object):
...
@@ -155,17 +169,35 @@ class EsAiAsinAdd(object):
'is_ascending_flag'
'is_ascending_flag'
)
)
df_profit
=
self
.
df_profit_rate
.
withColumn
(
"profit_rate_extra"
,
F
.
when
(
F
.
col
(
"ocean_profit"
)
.
isNull
()
&
F
.
col
(
"air_profit"
)
.
isNull
(),
F
.
lit
(
None
)
)
.
otherwise
(
F
.
struct
(
F
.
col
(
"ocean_profit"
)
.
alias
(
"ocean_profit"
),
F
.
col
(
"air_profit"
)
.
alias
(
"air_profit"
)
)
)
)
.
drop
(
"ocean_profit"
,
"air_profit"
)
self
.
df_save_es
=
self
.
df_ai_asin_detail
.
join
(
self
.
df_save_es
=
self
.
df_ai_asin_detail
.
join
(
self
.
df_ai_asin_analyze
,
'asin'
,
'inner'
self
.
df_ai_asin_analyze
,
'asin'
,
'inner'
)
.
withColumn
(
'profit_key'
,
F
.
concat_ws
(
"_"
,
F
.
col
(
"asin"
),
F
.
col
(
"price"
))
)
.
join
(
df_profit
,
on
=
[
"asin"
,
"price"
],
how
=
"left"
)
.
select
(
)
.
select
(
'account_addr'
,
'account_name'
,
'analyze_id'
,
'ao_val'
,
'appearance'
,
'asin'
,
'bought_month'
,
'account_addr'
,
'account_name'
,
'analyze_id'
,
'ao_val'
,
'appearance'
,
'asin'
,
'bought_month'
,
'bought_month_mom'
,
'bought_month_yoy'
,
'brand'
,
'bsr_rank'
,
'buy_box_seller_type'
,
'category'
,
'bought_month_mom'
,
'bought_month_yoy'
,
'brand'
,
'bsr_rank'
,
'buy_box_seller_type'
,
'category'
,
'category_current_id'
,
'category_id'
,
'color'
,
'crowd'
,
'fb_country_name'
,
'function'
,
'img'
,
'category_current_id'
,
'category_id'
,
'color'
,
'crowd'
,
'fb_country_name'
,
'festival'
,
'function'
,
'img_num'
,
'is_ascending_flag'
,
'is_new_flag'
,
'label_content'
,
'launch_time'
,
'launch_time_type'
,
'img'
,
'img_num'
,
'is_ascending_flag'
,
'is_new_flag'
,
'label_content'
,
'launch_time'
,
'launch_time_type'
,
'material'
,
'package_quantity'
,
'package_quantity_arr'
,
'package_quantity_flag'
,
'parent_asin'
,
'material'
,
'multi_color_content'
,
'multi_color_flag'
,
'package_quantity'
,
'package_quantity_arr'
,
'price'
,
'rating'
,
'scene_comment'
,
'scene_title'
,
'seller_id'
,
'shape'
,
'short_desc'
,
'site_name'
,
'package_quantity_flag'
,
'parent_asin'
,
'price'
,
'profit_key'
,
'profit_rate_extra'
,
'rating'
,
'size'
,
'theme'
,
'title'
,
'title_pic_content'
,
'title_pic_flag'
,
'title_word_content'
,
'scene_comment'
,
'scene_title'
,
'seller_id'
,
'shape'
,
'short_desc'
,
'site_name'
,
'size'
,
'theme'
,
'title_word_flag'
,
'total_comments'
,
'uses'
,
'variation_flag'
,
'variation_num'
,
'weight'
'title'
,
'title_pic_content'
,
'title_pic_flag'
,
'title_word_content'
,
'title_word_flag'
,
'total_comments'
,
'uses'
,
'variation_flag'
,
'variation_num'
,
'weight'
)
)
def
save_data
(
self
):
def
save_data
(
self
):
...
@@ -180,11 +212,12 @@ class EsAiAsinAdd(object):
...
@@ -180,11 +212,12 @@ class EsAiAsinAdd(object):
.
save
()
.
save
()
CommonUtil
.
send_wx_msg
([
'wujicang'
,
'chenyuanjie'
],
'ASIN信息库增量数据导出'
,
f
'详情:{self.export_pg_tb} {self.site_name} {self.date_type} {self.date_info}'
)
CommonUtil
.
send_wx_msg
([
'wujicang'
,
'chenyuanjie'
],
'ASIN信息库增量数据导出'
,
f
'详情:{self.export_pg_tb} {self.site_name} {self.date_type} {self.date_info}'
)
except
Exception
as
e
:
except
Exception
as
e
:
print
(
"An error occurred while writing to
Elasticsearch
:"
,
str
(
e
))
print
(
"An error occurred while writing to
PostgreSQL
:"
,
str
(
e
))
CommonUtil
.
send_wx_msg
([
'chenyuanjie'
],
'
\u26A0
ASIN信息库增量数据导出失败'
,
f
'详情:{self.export_pg_tb} {self.site_name} {self.date_type} {self.date_info}'
)
CommonUtil
.
send_wx_msg
([
'chenyuanjie'
],
'
\u26A0
ASIN信息库增量数据导出失败'
,
f
'详情:{self.export_pg_tb} {self.site_name} {self.date_type} {self.date_info}'
)
# 将增量asin导出到es
# 将增量asin导出到es
try
:
try
:
EsUtils
.
create_index
(
self
.
es_index
,
self
.
es_client
,
EsUtils
.
get_es_ai_body
())
self
.
df_save_es
.
write
.
format
(
"org.elasticsearch.spark.sql"
)
\
self
.
df_save_es
.
write
.
format
(
"org.elasticsearch.spark.sql"
)
\
.
options
(
**
self
.
es_options
)
\
.
options
(
**
self
.
es_options
)
\
.
mode
(
"append"
)
\
.
mode
(
"append"
)
\
...
...
Pyspark_job/utils/es_util.py
View file @
f7a08785
...
@@ -1052,6 +1052,138 @@ class EsUtils(object):
...
@@ -1052,6 +1052,138 @@ class EsUtils(object):
}
}
}
}
@staticmethod
def
get_es_ai_body
():
return
{
"settings"
:
{
"number_of_shards"
:
"3"
,
"number_of_replicas"
:
"1"
,
"analysis"
:
{
"filter"
:
{
"en_snowball"
:
{
"type"
:
"snowball"
,
"language"
:
"English"
},
"en_synonym"
:
{
"type"
:
"synonym_graph"
,
"synonyms_path"
:
"analysis/synonyms_en.txt"
,
"updateable"
:
"true"
}
},
"analyzer"
:
{
"en_analyzer"
:
{
"type"
:
"custom"
,
"tokenizer"
:
"standard"
,
"filter"
:
[
"lowercase"
,
"en_snowball"
]
},
"en_search_analyzer"
:
{
"tokenizer"
:
"standard"
,
"filter"
:
[
"lowercase"
,
"en_synonym"
,
"en_snowball"
]
}
},
"normalizer"
:
{
"lowercase_normalizer"
:
{
"type"
:
"custom"
,
"char_filter"
:
[],
"filter"
:
[
"lowercase"
]
}
}
}
},
"mappings"
:
{
"properties"
:
{
"asin"
:
{
"type"
:
"keyword"
},
"parent_asin"
:
{
"type"
:
"keyword"
},
"site_name"
:
{
"type"
:
"keyword"
},
"analyze_id"
:
{
"type"
:
"integer"
},
"seller_id"
:
{
"type"
:
"keyword"
},
"title"
:
{
"type"
:
"text"
,
"analyzer"
:
"en_analyzer"
,
"search_analyzer"
:
"en_search_analyzer"
},
"img"
:
{
"type"
:
"keyword"
},
"img_num"
:
{
"type"
:
"integer"
},
"launch_time"
:
{
"type"
:
"keyword"
},
"launch_time_type"
:
{
"type"
:
"integer"
},
"price"
:
{
"type"
:
"scaled_float"
,
"scaling_factor"
:
100
},
"rating"
:
{
"type"
:
"scaled_float"
,
"scaling_factor"
:
100
},
"total_comments"
:
{
"type"
:
"integer"
},
"bought_month"
:
{
"type"
:
"integer"
},
"bought_month_mom"
:
{
"type"
:
"scaled_float"
,
"scaling_factor"
:
100
},
"bought_month_yoy"
:
{
"type"
:
"scaled_float"
,
"scaling_factor"
:
100
},
"bsr_rank"
:
{
"type"
:
"integer"
},
"bsr_rank_str"
:
{
"type"
:
"keyword"
},
"ao_val"
:
{
"type"
:
"scaled_float"
,
"scaling_factor"
:
100
},
"variation_flag"
:
{
"type"
:
"integer"
},
"variation_num"
:
{
"type"
:
"integer"
},
"is_ascending_flag"
:
{
"type"
:
"integer"
},
"is_new_flag"
:
{
"type"
:
"integer"
},
"buy_box_seller_type"
:
{
"type"
:
"keyword"
},
"category"
:
{
"type"
:
"keyword"
},
"category_id"
:
{
"type"
:
"keyword"
},
"category_current_id"
:
{
"type"
:
"keyword"
},
"festival"
:
{
"type"
:
"keyword"
},
"fb_country_name"
:
{
"type"
:
"keyword"
},
"profit_key"
:
{
"type"
:
"keyword"
},
"multi_color_flag"
:
{
"type"
:
"keyword"
},
"package_quantity_flag"
:
{
"type"
:
"keyword"
},
"title_pic_flag"
:
{
"type"
:
"keyword"
},
"title_word_flag"
:
{
"type"
:
"keyword"
},
"account_addr"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"account_name"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"appearance"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"brand"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"color"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"crowd"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"function"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"material"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"multi_color_content"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"package_quantity"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"package_quantity_arr"
:
{
"type"
:
"integer"
},
"scene_comment"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"scene_title"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"shape"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"short_desc"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"size"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"theme"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"title_pic_content"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"title_word_content"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"uses"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"weight"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
},
"label_content"
:
{
"type"
:
"text"
,
"fields"
:
{
"keyword"
:
{
"type"
:
"keyword"
,
"normalizer"
:
"lowercase_normalizer"
}
}
},
"profit_rate_extra"
:
{
"type"
:
"object"
,
"properties"
:
{
"ocean_profit"
:
{
"type"
:
"float"
},
"air_profit"
:
{
"type"
:
"float"
}
}
}
}
}
}
#创建索引
#创建索引
@staticmethod
@staticmethod
def
create_index
(
index_name
,
client
,
request_body
):
def
create_index
(
index_name
,
client
,
request_body
):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment