Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
83cc4eba
Commit
83cc4eba
authored
Apr 24, 2026
by
chenyuanjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
流量选品-增加详情页Amazon标签
parent
dfd6eab7
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
25 additions
and
12 deletions
+25
-12
dim_asin_detail.py
Pyspark_job/dim/dim_asin_detail.py
+3
-2
dwt_flow_asin.py
Pyspark_job/dwt/dwt_flow_asin.py
+2
-2
es_flow_asin.py
Pyspark_job/export_es/es_flow_asin.py
+2
-1
kafka_asin_detail.py
Pyspark_job/my_kafka/kafka_asin_detail.py
+4
-2
kafka_flow_asin_detail.py
Pyspark_job/my_kafka/kafka_flow_asin_detail.py
+5
-3
ods_asin_detail.py
Pyspark_job/sqoop_import/ods_asin_detail.py
+3
-2
es_util.py
Pyspark_job/utils/es_util.py
+6
-0
No files found.
Pyspark_job/dim/dim_asin_detail.py
View file @
83cc4eba
...
...
@@ -161,7 +161,8 @@ class DimAsinDetail(object):
updated_at as dt, variat_num as variation_num, fbm_delivery_price as asin_fbm_price,
get_json_object(product_json, '$.Color') as product_json_color,
get_json_object(product_json, '$.Number of Items') as product_json_number_of_items,
current_asin
current_asin,
get_json_object(amazon_label, '$.badge_type') as amazon_label
from ods_asin_detail where site_name='{self.site_name}' {self.date_sql}"""
print
(
sql
)
self
.
df_asin_detail
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
...
...
@@ -559,7 +560,7 @@ class DimAsinDetail(object):
"asin_bought_month"
,
"asin_length"
,
"asin_width"
,
"asin_height"
,
"asin_is_self"
,
"customer_reviews_json"
,
"img_list"
,
"variat_list"
,
F
.
round
(
"asin_fbm_price"
,
2
)
.
alias
(
"asin_fbm_price"
),
"current_asin"
,
"current_asin"
,
"amazon_label"
,
F
.
lit
(
self
.
site_name
)
.
alias
(
'site_name'
),
F
.
lit
(
self
.
date_type
)
.
alias
(
'date_type'
),
F
.
lit
(
self
.
date_info
)
.
alias
(
'date_info'
))
.
persist
(
StorageLevel
.
MEMORY_ONLY
)
...
...
Pyspark_job/dwt/dwt_flow_asin.py
View file @
83cc4eba
...
...
@@ -210,7 +210,7 @@ class DwtFlowAsin(Templates):
date_format(created_time, 'yyyy-MM-dd HH:mm:ss') as asin_crawl_date, asin_bought_month, asin_image_view,
case when product_description is not null then 1 else 0 end as is_with_product_description, asin_describe,
category_id as top_category_id, category_first_id as top_category_first_id, customer_reviews_json, img_list as img_info,
asin_follow_sellers as follow_sellers_count, asin_fbm_price, current_asin
asin_follow_sellers as follow_sellers_count, asin_fbm_price, current_asin
, amazon_label
from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"""
print
(
"sql:"
+
sql
)
self
.
df_asin_detail
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
...
...
@@ -767,7 +767,7 @@ class DwtFlowAsin(Templates):
"matrix_ao_val"
,
"follow_sellers_count"
,
"seller_json"
,
"asin_describe"
,
"asin_fbm_price"
,
"asin_bought_mom"
,
"asin_bought_yoy"
,
"describe_len"
,
"tracking_since"
,
"tracking_since_type"
,
"asin_source_flag"
,
"bsr_last_seen_at"
,
"bsr_seen_count_30d"
,
"nsr_last_seen_at"
,
"nsr_seen_count_30d"
,
"multi_color_flag"
,
"multi_color_str"
,
"multi_color_flag"
,
"multi_color_str"
,
"amazon_label"
,
F
.
lit
(
self
.
site_name
)
.
alias
(
"site_name"
),
F
.
lit
(
self
.
date_type
)
.
alias
(
"date_type"
),
F
.
lit
(
self
.
date_info
)
.
alias
(
"date_info"
))
self
.
df_save
=
self
.
df_save
.
na
.
fill
(
...
...
Pyspark_job/export_es/es_flow_asin.py
View file @
83cc4eba
...
...
@@ -102,7 +102,8 @@ class EsStDetail(TemplatesMysql):
describe_len, asin_bought_mom as bought_month_mom, asin_bought_yoy as bought_month_yoy, tracking_since, tracking_since_type,
asin_rank_yoy as rank_yoy, asin_ao_yoy as ao_yoy, asin_price_yoy as price_yoy, asin_rating_yoy as rating_yoy,
asin_comments_yoy as comments_yoy, asin_bsr_orders_yoy as bsr_orders_yoy, asin_sales_yoy as sales_yoy, asin_variation_yoy as variation_yoy,
asin_source_flag, bsr_last_seen_at, bsr_seen_count_30d, nsr_last_seen_at, nsr_seen_count_30d, multi_color_flag, multi_color_str
asin_source_flag, bsr_last_seen_at, bsr_seen_count_30d, nsr_last_seen_at, nsr_seen_count_30d, multi_color_flag, multi_color_str,
amazon_label
from {self.table_name} where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
"""
print
(
"sql:"
,
sql
)
...
...
Pyspark_job/my_kafka/kafka_asin_detail.py
View file @
83cc4eba
...
...
@@ -213,6 +213,7 @@ class DimStAsinInfo(Templates):
StructField
(
"follow_sellers"
,
StringType
(),
True
),
StructField
(
"buy_sales"
,
StringType
(),
True
),
StructField
(
"seller_json"
,
StringType
(),
True
),
StructField
(
"amazon_label"
,
StringType
(),
True
),
])
return
schema
...
...
@@ -376,7 +377,8 @@ class DimStAsinInfo(Templates):
df
=
df
.
select
(
"asin"
,
"parentAsin"
,
"title"
,
"img_url"
,
"variat_num"
,
"best_sellers_rank"
,
"best_sellers_herf"
,
"price"
,
"rating"
,
"brand"
,
"account_id"
,
"account_name"
,
"account_url"
,
"buy_box_seller_type"
,
"volume"
,
"weight"
,
"weight_str"
,
"launch_time"
,
"total_comments"
,
"page_inventory"
,
"asinUpdateTime"
,
"site_name"
,
"node_id"
,
"buy_sales"
,
'asin_amazon_orders'
,
'asin_ao_val'
,
'matrix_ao_val'
,
"asin_zr_flow_proportion"
,
'matrix_flow_proportion'
,
'describe'
)
"buy_sales"
,
'asin_amazon_orders'
,
'asin_ao_val'
,
'matrix_ao_val'
,
"asin_zr_flow_proportion"
,
'matrix_flow_proportion'
,
'describe'
,
F
.
get_json_object
(
F
.
col
(
"amazon_label"
),
"$.badge_type"
)
.
alias
(
"amazon_label"
))
df
=
df
.
withColumn
(
"price"
,
F
.
round
(
F
.
col
(
"price"
),
2
))
return
df
...
...
@@ -417,7 +419,7 @@ class DimStAsinInfo(Templates):
df
=
df
.
select
(
'asin'
,
'parentAsin'
,
'title'
,
"imgUrl"
,
'asinVarNum'
,
'oneCategoryRank'
,
'bestSellersRank'
,
'lastHerf'
,
'aoVal'
,
'matrixAoVal'
,
'price'
,
'rating'
,
'bsrOrders'
,
'bsrOrdersSale'
,
'brandName'
,
'accountId'
,
'accountName'
,
'accountUrl'
,
'buyBoxSellerType'
,
'volume'
,
'weight'
,
'launchTime'
,
'totalComments'
,
'pageInventory'
,
'asinUpdateTime'
,
'asinBoughtMonth'
,
"asinAmazonOrders"
,
"fdCountryName"
,
"key_outer"
,
"key_inner"
,
"volumeFormat"
,
"weightFormat"
,
"isSelfAsin"
,
"auctionsNum"
,
"skusNumCreat"
,
"asinZrFlowProportion"
,
"asinZrFlowProportionMatrix"
,
"asinDescribe"
)
"fdCountryName"
,
"key_outer"
,
"key_inner"
,
"volumeFormat"
,
"weightFormat"
,
"isSelfAsin"
,
"auctionsNum"
,
"skusNumCreat"
,
"asinZrFlowProportion"
,
"asinZrFlowProportionMatrix"
,
"asinDescribe"
,
"amazon_label"
)
return
df
...
...
Pyspark_job/my_kafka/kafka_flow_asin_detail.py
View file @
83cc4eba
...
...
@@ -194,7 +194,8 @@ class KafkaFlowAsinDetail(Templates):
StructField
(
"img_list"
,
StringType
(),
True
),
StructField
(
"follow_sellers"
,
IntegerType
(),
True
),
StructField
(
"fbm_delivery_price"
,
FloatType
(),
True
),
StructField
(
"product_json"
,
StringType
(),
True
)
StructField
(
"product_json"
,
StringType
(),
True
),
StructField
(
"amazon_label"
,
StringType
(),
True
)
])
return
schema
...
...
@@ -807,7 +808,8 @@ class KafkaFlowAsinDetail(Templates):
.
withColumn
(
"collapse_asin"
,
F
.
coalesce
(
F
.
col
(
"parent_asin"
),
F
.
col
(
"asin"
)))
\
.
withColumn
(
"bsr_best_orders_type"
,
F
.
lit
(
-
1
))
\
.
withColumn
(
"img_type_arr"
,
F
.
split
(
F
.
col
(
"img_type"
),
","
))
\
.
withColumn
(
"img_type_arr"
,
F
.
expr
(
"transform(img_type_arr, x -> cast(x as int))"
))
.
withColumn
(
"img_type_arr"
,
F
.
expr
(
"transform(img_type_arr, x -> cast(x as int))"
))
\
.
withColumn
(
"amazon_label"
,
F
.
get_json_object
(
F
.
col
(
"amazon_label"
),
"$.badge_type"
))
df_save
=
df
.
select
(
"asin"
,
"ao_val"
,
"zr_counts"
,
"sp_counts"
,
"sb_counts"
,
"vi_counts"
,
"bs_counts"
,
"ac_counts"
,
"tr_counts"
,
"er_counts"
,
"bsr_orders"
,
"bsr_orders_sale"
,
"title"
,
"title_len"
,
"price"
,
"rating"
,
"total_comments"
,
"buy_box_seller_type"
,
"page_inventory"
,
"volume"
,
"weight"
,
"color"
,
...
...
@@ -829,7 +831,7 @@ class KafkaFlowAsinDetail(Templates):
F
.
col
(
"describe"
)
.
alias
(
"asin_describe"
),
F
.
round
(
"fbm_delivery_price"
,
2
)
.
alias
(
"fbm_price"
),
"asin_source_flag"
,
"bsr_last_seen_at"
,
"bsr_seen_count_30d"
,
"nsr_last_seen_at"
,
"nsr_seen_count_30d"
,
"describe_len"
,
"tracking_since"
,
"tracking_since_type"
,
"profit_key"
,
"profit_rate_extra"
,
"img_type_arr"
,
"multi_color_flag"
,
"multi_color_str"
)
"multi_color_flag"
,
"multi_color_str"
,
"amazon_label"
)
df_save
=
df_save
.
na
.
fill
(
{
"zr_counts"
:
0
,
"sp_counts"
:
0
,
"sb_counts"
:
0
,
"vi_counts"
:
0
,
"bs_counts"
:
0
,
"ac_counts"
:
0
,
"tr_counts"
:
0
,
"er_counts"
:
0
,
"title_len"
:
0
,
"total_comments"
:
0
,
"variation_num"
:
0
,
"img_num"
:
0
,
...
...
Pyspark_job/sqoop_import/ods_asin_detail.py
View file @
83cc4eba
...
...
@@ -37,7 +37,7 @@ if __name__ == '__main__':
"sp_4stars_initial_seen_asins_json, sp_delivery_initial_seen_asins_json, compare_similar_asin_json, "
\
"together_asin_json, min_match_asin_json, variat_num, current_asin, img_list, variat_list, parent_asin, "
\
"bundles_this_asins_json, video_m3u8_url, result_list_json, bundle_asin_component_json, review_json_list, "
\
"fbm_delivery_price"
"fbm_delivery_price
, amazon_label
"
engine
=
get_remote_engine
(
site_name
=
site_name
,
...
...
@@ -116,7 +116,8 @@ if __name__ == '__main__':
REPLACE(REPLACE(REPLACE(result_list_json, E'
\n
',' '), E'
\r
',' '), E'
\t
',' ') AS result_list_json,
REPLACE(REPLACE(REPLACE(bundle_asin_component_json, E'
\n
',' '), E'
\r
',' '), E'
\t
',' ') AS bundle_asin_component_json,
REPLACE(REPLACE(REPLACE(review_json_list, E'
\n
',' '), E'
\r
',' '), E'
\t
',' ') AS review_json_list,
fbm_delivery_price
fbm_delivery_price,
REPLACE(REPLACE(REPLACE(amazon_label, E'
\n
',' '), E'
\r
',' '), E'
\t
',' ') AS amazon_label
FROM {import_table}
WHERE 1=1 AND
\
$CONDITIONS
"""
...
...
Pyspark_job/utils/es_util.py
View file @
83cc4eba
...
...
@@ -547,6 +547,9 @@ class EsUtils(object):
},
"multi_color_str"
:
{
"type"
:
"keyword"
},
"amazon_label"
:
{
"type"
:
"keyword"
}
}
}
...
...
@@ -1059,6 +1062,9 @@ class EsUtils(object):
},
"multi_color_str"
:
{
"type"
:
"keyword"
},
"amazon_label"
:
{
"type"
:
"keyword"
}
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment