Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
06ecf4d4
Commit
06ecf4d4
authored
Mar 12, 2026
by
chenyuanjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
流量选品-月流程-增加选品模式相关字段
parent
3e4fe845
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
44 additions
and
4 deletions
+44
-4
dwt_flow_asin.py
Pyspark_job/dwt/dwt_flow_asin.py
+22
-2
es_flow_asin.py
Pyspark_job/export_es/es_flow_asin.py
+7
-2
es_util.py
Pyspark_job/utils/es_util.py
+15
-0
No files found.
Pyspark_job/dwt/dwt_flow_asin.py
View file @
06ecf4d4
...
@@ -76,6 +76,7 @@ class DwtFlowAsin(Templates):
...
@@ -76,6 +76,7 @@ class DwtFlowAsin(Templates):
self
.
df_title_matching_degree
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_title_matching_degree
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_flow_asin_last_year
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_flow_asin_last_year
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_keepa_asin
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_keepa_asin
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_asin_source_flag
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
@staticmethod
@staticmethod
def
udf_get_previous_last_30_day
(
self
):
def
udf_get_previous_last_30_day
(
self
):
...
@@ -285,6 +286,17 @@ class DwtFlowAsin(Templates):
...
@@ -285,6 +286,17 @@ class DwtFlowAsin(Templates):
self
.
df_keepa_asin
=
self
.
df_keepa_asin
.
repartition
(
60
)
.
persist
(
StorageLevel
.
DISK_ONLY
)
self
.
df_keepa_asin
=
self
.
df_keepa_asin
.
repartition
(
60
)
.
persist
(
StorageLevel
.
DISK_ONLY
)
self
.
df_keepa_asin
.
show
(
10
,
truncate
=
False
)
self
.
df_keepa_asin
.
show
(
10
,
truncate
=
False
)
print
(
"10.获取asin不同来源标识"
)
sql
=
f
"""
select asin, asin_cate_flag as asin_source_flag, bsr_latest_date as bsr_last_seen_at, bsr_30day_count as bsr_seen_count_30d,
nsr_latest_date as nsr_last_seen_at, nsr_30day_count as nsr_seen_count_30d from dwd_asin_cate_flag
where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
"""
print
(
"sql:"
+
sql
)
self
.
df_asin_source_flag
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
self
.
df_asin_source_flag
=
self
.
df_asin_source_flag
.
repartition
(
60
)
.
persist
(
StorageLevel
.
DISK_ONLY
)
self
.
df_asin_source_flag
.
show
(
10
,
truncate
=
False
)
# 处理asin基础属性信息(体积重量相关)及bsr销售额相关信息
# 处理asin基础属性信息(体积重量相关)及bsr销售额相关信息
def
handle_asin_basic_attribute
(
self
):
def
handle_asin_basic_attribute
(
self
):
# 处理重量类型
# 处理重量类型
...
@@ -564,7 +576,12 @@ class DwtFlowAsin(Templates):
...
@@ -564,7 +576,12 @@ class DwtFlowAsin(Templates):
self
.
df_flow_asin_last
.
unpersist
()
self
.
df_flow_asin_last
.
unpersist
()
self
.
df_flow_asin_last_year
.
unpersist
()
self
.
df_flow_asin_last_year
.
unpersist
()
# 处理一些新增字段
def
handle_asin_different_source
(
self
):
self
.
df_asin_detail
=
self
.
df_asin_detail
.
join
(
self
.
df_asin_source_flag
,
on
=
[
'asin'
],
how
=
'left'
)
.
fillna
({
"asin_source_flag"
:
"0"
,
"bsr_last_seen_at"
:
"1970-01-01"
,
"nsr_last_seen_at"
:
"1970-01-01"
})
self
.
df_asin_source_flag
.
unpersist
()
def
handle_other_new_col
(
self
):
def
handle_other_new_col
(
self
):
# 处理五点描述长度
# 处理五点描述长度
self
.
df_asin_detail
=
self
.
df_asin_detail
.
withColumn
(
self
.
df_asin_detail
=
self
.
df_asin_detail
.
withColumn
(
...
@@ -626,6 +643,7 @@ class DwtFlowAsin(Templates):
...
@@ -626,6 +643,7 @@ class DwtFlowAsin(Templates):
"asin_lqs_rating_detail"
,
"title_matching_degree"
,
"zr_flow_proportion"
,
"matrix_flow_proportion"
,
"asin_lqs_rating_detail"
,
"title_matching_degree"
,
"zr_flow_proportion"
,
"matrix_flow_proportion"
,
"matrix_ao_val"
,
"follow_sellers_count"
,
"seller_json"
,
"asin_describe"
,
"asin_fbm_price"
,
"matrix_ao_val"
,
"follow_sellers_count"
,
"seller_json"
,
"asin_describe"
,
"asin_fbm_price"
,
"asin_bought_mom"
,
"asin_bought_yoy"
,
"describe_len"
,
"tracking_since"
,
"tracking_since_type"
,
"asin_bought_mom"
,
"asin_bought_yoy"
,
"describe_len"
,
"tracking_since"
,
"tracking_since_type"
,
"asin_source_flag"
,
"bsr_last_seen_at"
,
"bsr_seen_count_30d"
,
"nsr_last_seen_at"
,
"nsr_seen_count_30d"
,
F
.
lit
(
self
.
site_name
)
.
alias
(
"site_name"
),
F
.
lit
(
self
.
date_type
)
.
alias
(
"date_type"
),
F
.
lit
(
self
.
site_name
)
.
alias
(
"site_name"
),
F
.
lit
(
self
.
date_type
)
.
alias
(
"date_type"
),
F
.
lit
(
self
.
date_info
)
.
alias
(
"date_info"
))
F
.
lit
(
self
.
date_info
)
.
alias
(
"date_info"
))
self
.
df_save
=
self
.
df_save
.
na
.
fill
(
self
.
df_save
=
self
.
df_save
.
na
.
fill
(
...
@@ -637,7 +655,8 @@ class DwtFlowAsin(Templates):
...
@@ -637,7 +655,8 @@ class DwtFlowAsin(Templates):
"asin_rating_type"
:
0
,
"asin_site_name_type"
:
0
,
"asin_weight_type"
:
0
,
"asin_launch_time_type"
:
0
,
"asin_rating_type"
:
0
,
"asin_site_name_type"
:
0
,
"asin_weight_type"
:
0
,
"asin_launch_time_type"
:
0
,
"asin_ao_val_type"
:
0
,
"asin_rank_type"
:
0
,
"asin_price_type"
:
0
,
"asin_quantity_variation_type"
:
0
,
"asin_ao_val_type"
:
0
,
"asin_rank_type"
:
0
,
"asin_price_type"
:
0
,
"asin_quantity_variation_type"
:
0
,
"package_quantity"
:
1
,
"is_movie_label"
:
0
,
"is_brand_label"
:
0
,
"is_alarm_brand"
:
0
,
"package_quantity"
:
1
,
"is_movie_label"
:
0
,
"is_brand_label"
:
0
,
"is_alarm_brand"
:
0
,
"title_matching_degree"
:
0.0
,
"asin_lqs_rating"
:
0.0
,
"follow_sellers_count"
:
-
1
,
"describe_len"
:
0
})
"title_matching_degree"
:
0.0
,
"asin_lqs_rating"
:
0.0
,
"follow_sellers_count"
:
-
1
,
"describe_len"
:
0
,
"bsr_seen_count_30d"
:
0
,
"nsr_seen_count_30d"
:
0
})
self
.
df_save
=
self
.
df_save
.
repartition
(
60
)
.
persist
(
StorageLevel
.
DISK_ONLY
)
self
.
df_save
=
self
.
df_save
.
repartition
(
60
)
.
persist
(
StorageLevel
.
DISK_ONLY
)
self
.
df_save
=
self
.
df_save
.
drop_duplicates
([
'asin'
])
.
filter
((
F
.
col
(
"asin"
)
.
isNotNull
())
&
(
F
.
col
(
"asin"
)
!=
""
)
&
(
F
.
length
(
F
.
col
(
"asin"
))
<=
10
))
self
.
df_save
=
self
.
df_save
.
drop_duplicates
([
'asin'
])
.
filter
((
F
.
col
(
"asin"
)
.
isNotNull
())
&
(
F
.
col
(
"asin"
)
!=
""
)
&
(
F
.
length
(
F
.
col
(
"asin"
))
<=
10
))
print
(
"数据量为:"
,
self
.
df_save
.
count
())
print
(
"数据量为:"
,
self
.
df_save
.
count
())
...
@@ -701,6 +720,7 @@ class DwtFlowAsin(Templates):
...
@@ -701,6 +720,7 @@ class DwtFlowAsin(Templates):
self
.
handle_title_matching_degree
()
self
.
handle_title_matching_degree
()
self
.
handle_change_rate
()
self
.
handle_change_rate
()
self
.
handle_other_new_col
()
self
.
handle_other_new_col
()
self
.
handle_asin_different_source
()
self
.
handle_column
()
self
.
handle_column
()
...
...
Pyspark_job/export_es/es_flow_asin.py
View file @
06ecf4d4
...
@@ -100,8 +100,9 @@ class EsStDetail(TemplatesMysql):
...
@@ -100,8 +100,9 @@ class EsStDetail(TemplatesMysql):
matrix_flow_proportion, matrix_ao_val, customer_reviews_json as product_features, img_info,
matrix_flow_proportion, matrix_ao_val, customer_reviews_json as product_features, img_info,
coalesce(parent_asin, asin) as collapse_asin, follow_sellers_count, asin_describe, asin_fbm_price as fbm_price,
coalesce(parent_asin, asin) as collapse_asin, follow_sellers_count, asin_describe, asin_fbm_price as fbm_price,
describe_len, asin_bought_mom as bought_month_mom, asin_bought_yoy as bought_month_yoy, tracking_since, tracking_since_type,
describe_len, asin_bought_mom as bought_month_mom, asin_bought_yoy as bought_month_yoy, tracking_since, tracking_since_type,
asin_rank_yoy as rank_yoy, asin_ao_yoy as ao_yoy, asin_price_yoy as price_yoy, asin_rating_yoy as rating_yoy,
asin_rank_yoy as rank_yoy, asin_ao_yoy as ao_yoy, asin_price_yoy as price_yoy, asin_rating_yoy as rating_yoy,
asin_comments_yoy as comments_yoy, asin_bsr_orders_yoy as bsr_orders_yoy, asin_sales_yoy as sales_yoy, asin_variation_yoy as variation_yoy
asin_comments_yoy as comments_yoy, asin_bsr_orders_yoy as bsr_orders_yoy, asin_sales_yoy as sales_yoy, asin_variation_yoy as variation_yoy,
asin_source_flag, bsr_last_seen_at, bsr_seen_count_30d, nsr_last_seen_at, nsr_seen_count_30d
from {self.table_name} where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
from {self.table_name} where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
"""
"""
print
(
"sql:"
,
sql
)
print
(
"sql:"
,
sql
)
...
@@ -126,6 +127,10 @@ class EsStDetail(TemplatesMysql):
...
@@ -126,6 +127,10 @@ class EsStDetail(TemplatesMysql):
)
.
withColumn
(
)
.
withColumn
(
"img_type_arr"
,
F
.
expr
(
"transform(img_type_arr, x -> cast(x as int))"
)
"img_type_arr"
,
F
.
expr
(
"transform(img_type_arr, x -> cast(x as int))"
)
)
.
withColumn
(
)
.
withColumn
(
"asin_source_flag"
,
F
.
split
(
F
.
col
(
"asin_source_flag"
),
","
)
)
.
withColumn
(
"asin_source_flag"
,
F
.
expr
(
"transform(asin_source_flag, x -> cast(x as int))"
)
)
.
withColumn
(
'profit_key'
,
F
.
concat_ws
(
"_"
,
F
.
col
(
"asin"
),
F
.
col
(
"price"
))
'profit_key'
,
F
.
concat_ws
(
"_"
,
F
.
col
(
"asin"
),
F
.
col
(
"price"
))
)
.
cache
()
)
.
cache
()
...
...
Pyspark_job/utils/es_util.py
View file @
06ecf4d4
...
@@ -526,6 +526,21 @@ class EsUtils(object):
...
@@ -526,6 +526,21 @@ class EsUtils(object):
},
},
"variation_yoy"
:
{
"variation_yoy"
:
{
"type"
:
"float"
"type"
:
"float"
},
"asin_source_flag"
:
{
"type"
:
"integer"
},
"bsr_last_seen_at"
:
{
"type"
:
"date"
},
"bsr_seen_count_30d"
:
{
"type"
:
"integer"
},
"nsr_last_seen_at"
:
{
"type"
:
"date"
},
"nsr_seen_count_30d"
:
{
"type"
:
"integer"
}
}
}
}
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment