Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
c84a984f
Commit
c84a984f
authored
Nov 19, 2025
by
chenyuanjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
流量选品-新增字段-FBM运费
parent
31c1321b
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
22 additions
and
13 deletions
+22
-13
dim_asin_detail.py
Pyspark_job/dim/dim_asin_detail.py
+4
-2
dwt_flow_asin.py
Pyspark_job/dwt/dwt_flow_asin.py
+4
-4
es_flow_asin.py
Pyspark_job/export_es/es_flow_asin.py
+1
-1
kafka_flow_asin_detail.py
Pyspark_job/my_kafka/kafka_flow_asin_detail.py
+6
-4
ods_asin_detail.py
Pyspark_job/sqoop_import/ods_asin_detail.py
+4
-2
es_util.py
Pyspark_job/utils/es_util.py
+3
-0
No files found.
Pyspark_job/dim/dim_asin_detail.py
View file @
c84a984f
...
@@ -154,8 +154,9 @@ class DimAsinDetail(object):
...
@@ -154,8 +154,9 @@ class DimAsinDetail(object):
five_star, low_star, together_asin, ac_name, node_id, data_type as asin_data_type, variat_list,
five_star, low_star, together_asin, ac_name, node_id, data_type as asin_data_type, variat_list,
`describe` as asin_describe, follow_sellers as asin_follow_sellers, product_description,
`describe` as asin_describe, follow_sellers as asin_follow_sellers, product_description,
image_view as asin_image_view, spider_int as asin_spider_num, buy_sales, lob_asin_json as asin_lob_info,
image_view as asin_image_view, spider_int as asin_spider_num, buy_sales, lob_asin_json as asin_lob_info,
REGEXP_REPLACE(seller_json, chr(10), '') as seller_json, buy_box_seller_type as asin_buy_box_seller_type, customer_reviews_json, parent_asin, img_list,
REGEXP_REPLACE(seller_json, chr(10), '') as seller_json, buy_box_seller_type as asin_buy_box_seller_type,
created_at as created_time, updated_at as updated_time, updated_at as dt, variat_num as variation_num
customer_reviews_json, parent_asin, img_list, created_at as created_time, updated_at as updated_time,
updated_at as dt, variat_num as variation_num, fbm_delivery_price as asin_fbm_price
from ods_asin_detail where site_name='{self.site_name}' {self.date_sql}"""
from ods_asin_detail where site_name='{self.site_name}' {self.date_sql}"""
print
(
sql
)
print
(
sql
)
self
.
df_asin_detail
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
self
.
df_asin_detail
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
...
@@ -541,6 +542,7 @@ class DimAsinDetail(object):
...
@@ -541,6 +542,7 @@ class DimAsinDetail(object):
"package_quantity"
,
"is_package_quantity_abnormal"
,
"asin_quantity_variation_type"
,
"seller_json"
,
"package_quantity"
,
"is_package_quantity_abnormal"
,
"asin_quantity_variation_type"
,
"seller_json"
,
"asin_bought_month"
,
"asin_length"
,
"asin_width"
,
"asin_height"
,
"asin_is_self"
,
"asin_bought_month"
,
"asin_length"
,
"asin_width"
,
"asin_height"
,
"asin_is_self"
,
"customer_reviews_json"
,
"img_list"
,
"variat_list"
,
"customer_reviews_json"
,
"img_list"
,
"variat_list"
,
F
.
round
(
"asin_fbm_price"
,
2
)
.
alias
(
"asin_fbm_price"
),
F
.
lit
(
self
.
site_name
)
.
alias
(
'site_name'
),
F
.
lit
(
self
.
site_name
)
.
alias
(
'site_name'
),
F
.
lit
(
self
.
date_type
)
.
alias
(
'date_type'
),
F
.
lit
(
self
.
date_type
)
.
alias
(
'date_type'
),
F
.
lit
(
self
.
date_info
)
.
alias
(
'date_info'
))
.
persist
(
StorageLevel
.
MEMORY_ONLY
)
F
.
lit
(
self
.
date_info
)
.
alias
(
'date_info'
))
.
persist
(
StorageLevel
.
MEMORY_ONLY
)
...
...
Pyspark_job/dwt/dwt_flow_asin.py
View file @
c84a984f
...
@@ -167,7 +167,7 @@ class DwtFlowAsin(Templates):
...
@@ -167,7 +167,7 @@ class DwtFlowAsin(Templates):
date_format(created_time, 'yyyy-MM-dd HH:mm:ss') as asin_crawl_date, asin_bought_month, asin_image_view,
date_format(created_time, 'yyyy-MM-dd HH:mm:ss') as asin_crawl_date, asin_bought_month, asin_image_view,
case when product_description is not null then 1 else 0 end as is_with_product_description, asin_describe,
case when product_description is not null then 1 else 0 end as is_with_product_description, asin_describe,
category_id as top_category_id, category_first_id as top_category_first_id, customer_reviews_json, img_list as img_info,
category_id as top_category_id, category_first_id as top_category_first_id, customer_reviews_json, img_list as img_info,
asin_follow_sellers as follow_sellers_count
asin_follow_sellers as follow_sellers_count
, asin_fbm_price
from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"""
from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"""
print
(
"sql:"
+
sql
)
print
(
"sql:"
+
sql
)
self
.
df_asin_detail
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
self
.
df_asin_detail
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
...
@@ -503,7 +503,7 @@ class DwtFlowAsin(Templates):
...
@@ -503,7 +503,7 @@ class DwtFlowAsin(Templates):
F
.
lit
(
None
)
.
alias
(
"buy_data_viewed_month"
),
F
.
lit
(
None
)
.
alias
(
"buy_data_viewed_week"
),
F
.
lit
(
None
)
.
alias
(
"buy_data_viewed_month"
),
F
.
lit
(
None
)
.
alias
(
"buy_data_viewed_week"
),
F
.
lit
(
None
)
.
alias
(
"theme_en"
),
F
.
lit
(
None
)
.
alias
(
"theme_label_en"
),
"asin_lqs_rating"
,
F
.
lit
(
None
)
.
alias
(
"theme_en"
),
F
.
lit
(
None
)
.
alias
(
"theme_label_en"
),
"asin_lqs_rating"
,
"asin_lqs_rating_detail"
,
"title_matching_degree"
,
"zr_flow_proportion"
,
"matrix_flow_proportion"
,
"asin_lqs_rating_detail"
,
"title_matching_degree"
,
"zr_flow_proportion"
,
"matrix_flow_proportion"
,
"matrix_ao_val"
,
"follow_sellers_count"
,
"seller_json"
,
"asin_describe"
,
"matrix_ao_val"
,
"follow_sellers_count"
,
"seller_json"
,
"asin_describe"
,
"asin_fbm_price"
,
F
.
lit
(
self
.
site_name
)
.
alias
(
"site_name"
),
F
.
lit
(
self
.
date_type
)
.
alias
(
"date_type"
),
F
.
lit
(
self
.
site_name
)
.
alias
(
"site_name"
),
F
.
lit
(
self
.
date_type
)
.
alias
(
"date_type"
),
F
.
lit
(
self
.
date_info
)
.
alias
(
"date_info"
))
F
.
lit
(
self
.
date_info
)
.
alias
(
"date_info"
))
self
.
df_save
=
self
.
df_save
.
na
.
fill
(
self
.
df_save
=
self
.
df_save
.
na
.
fill
(
...
@@ -553,14 +553,14 @@ class DwtFlowAsin(Templates):
...
@@ -553,14 +553,14 @@ class DwtFlowAsin(Templates):
F
.
col
(
"current_category_rank"
)
.
alias
(
"category_current_rank"
),
"asin_type"
,
F
.
col
(
"current_category_rank"
)
.
alias
(
"category_current_rank"
),
"asin_type"
,
"bsr_orders"
,
F
.
col
(
"sales"
)
.
alias
(
"bsr_orders_sale"
),
"bsr_orders"
,
F
.
col
(
"sales"
)
.
alias
(
"bsr_orders_sale"
),
F
.
col
(
"asin_page_inventory"
)
.
alias
(
"page_inventory"
),
"asin_bought_month"
,
"seller_json"
,
F
.
col
(
"asin_page_inventory"
)
.
alias
(
"page_inventory"
),
"asin_bought_month"
,
"seller_json"
,
F
.
col
(
"asin_buy_box_seller_type"
)
.
alias
(
"buy_box_seller_type"
),
"asin_describe"
F
.
col
(
"asin_buy_box_seller_type"
)
.
alias
(
"buy_box_seller_type"
),
"asin_describe"
,
"asin_fbm_price"
)
)
table_columns
=
"""asin, asin_ao_val, asin_title, asin_title_len, asin_category_desc, asin_volume,
table_columns
=
"""asin, asin_ao_val, asin_title, asin_title_len, asin_category_desc, asin_volume,
asin_weight, asin_launch_time, asin_brand_name, one_star, two_star, three_star, four_star, five_star, low_star,
asin_weight, asin_launch_time, asin_brand_name, one_star, two_star, three_star, four_star, five_star, low_star,
account_name, account_id, seller_country_name, category_first_id, parent_asin, variation_num, img_info,
account_name, account_id, seller_country_name, category_first_id, parent_asin, variation_num, img_info,
asin_crawl_date, asin_price, asin_rating, asin_total_comments, matrix_ao_val, zr_flow_proportion, matrix_flow_proportion,
asin_crawl_date, asin_price, asin_rating, asin_total_comments, matrix_ao_val, zr_flow_proportion, matrix_flow_proportion,
date_info, img_url, category_current_id, category_first_rank, category_current_rank, asin_type, bsr_orders, bsr_orders_sale,
date_info, img_url, category_current_id, category_first_rank, category_current_rank, asin_type, bsr_orders, bsr_orders_sale,
page_inventory, asin_bought_month, seller_json, buy_box_seller_type, asin_describe"""
page_inventory, asin_bought_month, seller_json, buy_box_seller_type, asin_describe
, asin_fbm_price
"""
DorisHelper
.
spark_export_with_columns
(
df_save
=
df_doris
,
db_name
=
self
.
doris_db
,
table_name
=
self
.
asin_latest_detail_table
,
table_columns
=
table_columns
)
DorisHelper
.
spark_export_with_columns
(
df_save
=
df_doris
,
db_name
=
self
.
doris_db
,
table_name
=
self
.
asin_latest_detail_table
,
table_columns
=
table_columns
)
print
(
"save asin_latest_detail success"
)
print
(
"save asin_latest_detail success"
)
else
:
else
:
...
...
Pyspark_job/export_es/es_flow_asin.py
View file @
c84a984f
...
@@ -97,7 +97,7 @@ class EsStDetail(TemplatesMysql):
...
@@ -97,7 +97,7 @@ class EsStDetail(TemplatesMysql):
current_category_rank, asin_weight_ratio, asin_bought_month, asin_lqs_rating, asin_lqs_rating_detail,
current_category_rank, asin_weight_ratio, asin_bought_month, asin_lqs_rating, asin_lqs_rating_detail,
title_matching_degree, asin_lob_info, is_contains_lob_info, is_package_quantity_abnormal, zr_flow_proportion,
title_matching_degree, asin_lob_info, is_contains_lob_info, is_package_quantity_abnormal, zr_flow_proportion,
matrix_flow_proportion, matrix_ao_val, customer_reviews_json as product_features, img_info,
matrix_flow_proportion, matrix_ao_val, customer_reviews_json as product_features, img_info,
coalesce(parent_asin, asin) as collapse_asin, follow_sellers_count, asin_describe
coalesce(parent_asin, asin) as collapse_asin, follow_sellers_count, asin_describe
, asin_fbm_price as fbm_price
from {self.table_name} where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
from {self.table_name} where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
"""
"""
print
(
"sql:"
,
sql
)
print
(
"sql:"
,
sql
)
...
...
Pyspark_job/my_kafka/kafka_flow_asin_detail.py
View file @
c84a984f
...
@@ -183,7 +183,8 @@ class KafkaFlowAsinDetail(Templates):
...
@@ -183,7 +183,8 @@ class KafkaFlowAsinDetail(Templates):
StructField
(
"seller_json"
,
StringType
(),
True
),
StructField
(
"seller_json"
,
StringType
(),
True
),
StructField
(
"customer_reviews_json"
,
StringType
(),
True
),
StructField
(
"customer_reviews_json"
,
StringType
(),
True
),
StructField
(
"img_list"
,
StringType
(),
True
),
StructField
(
"img_list"
,
StringType
(),
True
),
StructField
(
"follow_sellers"
,
IntegerType
(),
True
)
StructField
(
"follow_sellers"
,
IntegerType
(),
True
),
StructField
(
"fbm_delivery_price"
,
FloatType
(),
True
)
])
])
return
schema
return
schema
...
@@ -614,7 +615,8 @@ class KafkaFlowAsinDetail(Templates):
...
@@ -614,7 +615,8 @@ class KafkaFlowAsinDetail(Templates):
"site_name"
,
"asin_bought_month"
,
"asin_lqs_rating"
,
"asin_lqs_rating_detail"
,
"site_name"
,
"asin_bought_month"
,
"asin_lqs_rating"
,
"asin_lqs_rating_detail"
,
"asin_lob_info"
,
"is_contains_lob_info"
,
"is_package_quantity_abnormal"
,
"category"
,
"asin_lob_info"
,
"is_contains_lob_info"
,
"is_package_quantity_abnormal"
,
"category"
,
"zr_flow_proportion"
,
"matrix_flow_proportion"
,
"matrix_ao_val"
,
"product_features"
,
"img_info"
,
"zr_flow_proportion"
,
"matrix_flow_proportion"
,
"matrix_ao_val"
,
"product_features"
,
"img_info"
,
"collapse_asin"
,
F
.
col
(
"follow_sellers"
)
.
alias
(
"follow_sellers_count"
),
"seller_json"
,
F
.
col
(
"describe"
)
.
alias
(
"asin_describe"
))
"collapse_asin"
,
F
.
col
(
"follow_sellers"
)
.
alias
(
"follow_sellers_count"
),
"seller_json"
,
F
.
col
(
"describe"
)
.
alias
(
"asin_describe"
),
F
.
round
(
"fbm_delivery_price"
,
2
)
.
alias
(
"fbm_price"
))
df_save
=
df_save
.
na
.
fill
(
df_save
=
df_save
.
na
.
fill
(
{
"zr_counts"
:
0
,
"sp_counts"
:
0
,
"sb_counts"
:
0
,
"vi_counts"
:
0
,
"bs_counts"
:
0
,
"ac_counts"
:
0
,
{
"zr_counts"
:
0
,
"sp_counts"
:
0
,
"sb_counts"
:
0
,
"vi_counts"
:
0
,
"bs_counts"
:
0
,
"ac_counts"
:
0
,
"tr_counts"
:
0
,
"er_counts"
:
0
,
"title_len"
:
0
,
"total_comments"
:
0
,
"variation_num"
:
0
,
"img_num"
:
0
,
"tr_counts"
:
0
,
"er_counts"
:
0
,
"title_len"
:
0
,
"total_comments"
:
0
,
"variation_num"
:
0
,
"img_num"
:
0
,
...
@@ -865,7 +867,7 @@ class KafkaFlowAsinDetail(Templates):
...
@@ -865,7 +867,7 @@ class KafkaFlowAsinDetail(Templates):
F
.
col
(
"first_category_rank"
)
.
alias
(
"category_first_rank"
),
F
.
col
(
"first_category_rank"
)
.
alias
(
"category_first_rank"
),
F
.
col
(
"current_category_rank"
)
.
alias
(
"category_current_rank"
),
"asin_type"
,
F
.
col
(
"current_category_rank"
)
.
alias
(
"category_current_rank"
),
"asin_type"
,
"bsr_orders"
,
"bsr_orders_sale"
,
"page_inventory"
,
"asin_bought_month"
,
"seller_json"
,
"bsr_orders"
,
"bsr_orders_sale"
,
"page_inventory"
,
"asin_bought_month"
,
"seller_json"
,
"buy_box_seller_type"
,
"asin_describe"
)
"buy_box_seller_type"
,
"asin_describe"
,
F
.
col
(
"fbm_price"
)
.
alias
(
"asin_fbm_price"
)
)
df
=
df
.
drop
(
"category"
,
"seller_json"
)
df
=
df
.
drop
(
"category"
,
"seller_json"
)
df
.
write
.
format
(
"org.elasticsearch.spark.sql"
)
.
options
(
**
self
.
es_options
)
.
mode
(
"append"
)
.
save
()
df
.
write
.
format
(
"org.elasticsearch.spark.sql"
)
.
options
(
**
self
.
es_options
)
.
mode
(
"append"
)
.
save
()
end_time
=
time
.
time
()
end_time
=
time
.
time
()
...
@@ -879,7 +881,7 @@ class KafkaFlowAsinDetail(Templates):
...
@@ -879,7 +881,7 @@ class KafkaFlowAsinDetail(Templates):
account_name, account_id, seller_country_name, category_first_id, parent_asin, variation_num, img_info,
account_name, account_id, seller_country_name, category_first_id, parent_asin, variation_num, img_info,
asin_crawl_date, asin_price, asin_rating, asin_total_comments, matrix_ao_val, zr_flow_proportion, matrix_flow_proportion,
asin_crawl_date, asin_price, asin_rating, asin_total_comments, matrix_ao_val, zr_flow_proportion, matrix_flow_proportion,
date_info, img_url, category_current_id, category_first_rank, category_current_rank, asin_type, bsr_orders, bsr_orders_sale,
date_info, img_url, category_current_id, category_first_rank, category_current_rank, asin_type, bsr_orders, bsr_orders_sale,
page_inventory, asin_bought_month, seller_json, buy_box_seller_type, asin_describe"""
page_inventory, asin_bought_month, seller_json, buy_box_seller_type, asin_describe
, asin_fbm_price
"""
DorisHelper
.
spark_export_with_columns
(
df_save
=
df_asin_latest_detail
,
db_name
=
self
.
doris_db
,
table_name
=
self
.
asin_latest_detail_table
,
table_columns
=
table_columns
)
DorisHelper
.
spark_export_with_columns
(
df_save
=
df_asin_latest_detail
,
db_name
=
self
.
doris_db
,
table_name
=
self
.
asin_latest_detail_table
,
table_columns
=
table_columns
)
df_asin_latest_detail
.
unpersist
()
df_asin_latest_detail
.
unpersist
()
...
...
Pyspark_job/sqoop_import/ods_asin_detail.py
View file @
c84a984f
...
@@ -34,7 +34,8 @@ if __name__ == '__main__':
...
@@ -34,7 +34,8 @@ if __name__ == '__main__':
"product_json, product_detail_json, review_ai_text, review_label_json, sp_initial_seen_asins_json, "
\
"product_json, product_detail_json, review_ai_text, review_label_json, sp_initial_seen_asins_json, "
\
"sp_4stars_initial_seen_asins_json, sp_delivery_initial_seen_asins_json, compare_similar_asin_json, "
\
"sp_4stars_initial_seen_asins_json, sp_delivery_initial_seen_asins_json, compare_similar_asin_json, "
\
"together_asin_json, min_match_asin_json, variat_num, current_asin, img_list, variat_list, parent_asin, "
\
"together_asin_json, min_match_asin_json, variat_num, current_asin, img_list, variat_list, parent_asin, "
\
"bundles_this_asins_json, video_m3u8_url, result_list_json, bundle_asin_component_json, review_json_list"
"bundles_this_asins_json, video_m3u8_url, result_list_json, bundle_asin_component_json, review_json_list, "
\
"fbm_delivery_price"
engine
=
get_remote_engine
(
engine
=
get_remote_engine
(
site_name
=
site_name
,
site_name
=
site_name
,
...
@@ -112,7 +113,8 @@ if __name__ == '__main__':
...
@@ -112,7 +113,8 @@ if __name__ == '__main__':
REPLACE(REPLACE(REPLACE(video_m3u8_url, E'
\n
',' '), E'
\r
',' '), E'
\t
',' ') AS video_m3u8_url,
REPLACE(REPLACE(REPLACE(video_m3u8_url, E'
\n
',' '), E'
\r
',' '), E'
\t
',' ') AS video_m3u8_url,
REPLACE(REPLACE(REPLACE(result_list_json, E'
\n
',' '), E'
\r
',' '), E'
\t
',' ') AS result_list_json,
REPLACE(REPLACE(REPLACE(result_list_json, E'
\n
',' '), E'
\r
',' '), E'
\t
',' ') AS result_list_json,
REPLACE(REPLACE(REPLACE(bundle_asin_component_json, E'
\n
',' '), E'
\r
',' '), E'
\t
',' ') AS bundle_asin_component_json,
REPLACE(REPLACE(REPLACE(bundle_asin_component_json, E'
\n
',' '), E'
\r
',' '), E'
\t
',' ') AS bundle_asin_component_json,
REPLACE(REPLACE(REPLACE(review_json_list, E'
\n
',' '), E'
\r
',' '), E'
\t
',' ') AS review_json_list
REPLACE(REPLACE(REPLACE(review_json_list, E'
\n
',' '), E'
\r
',' '), E'
\t
',' ') AS review_json_list,
fbm_delivery_price
FROM {import_table}
FROM {import_table}
WHERE 1=1 AND
\
$CONDITIONS
WHERE 1=1 AND
\
$CONDITIONS
"""
"""
...
...
Pyspark_job/utils/es_util.py
View file @
c84a984f
...
@@ -437,6 +437,9 @@ class EsUtils(object):
...
@@ -437,6 +437,9 @@ class EsUtils(object):
},
},
"asin_describe"
:
{
"asin_describe"
:
{
"type"
:
"text"
"type"
:
"text"
},
"fbm_price"
:
{
"type"
:
"float"
}
}
}
}
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment