Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
92434c42
Commit
92434c42
authored
Jun 06, 2025
by
chenyuanjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
关联流量-增加内部asin流量数据+异常asin过滤+新增广告类型
parent
85b27911
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
25 additions
and
5 deletions
+25
-5
dim_asin_related_traffic.py
Pyspark_job/dim/dim_asin_related_traffic.py
+25
-5
No files found.
Pyspark_job/dim/dim_asin_related_traffic.py
View file @
92434c42
...
...
@@ -31,6 +31,7 @@ class DimAsinRelatedTraffic(object):
self
.
partitions_by
=
[
'site_name'
,
'date_type'
,
'date_info'
]
self
.
df_asin_detail
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_self_asin_detail
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_result_list_json
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_together_asin
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
self
.
df_sp_initial_seen_asins_json
=
self
.
spark
.
sql
(
f
"select 1+1;"
)
...
...
@@ -57,7 +58,7 @@ class DimAsinRelatedTraffic(object):
key_lower
=
key
.
lower
()
if
key_lower
==
'4 stars and above'
:
return
"four_star_above"
elif
key_lower
in
(
'brands you might like'
,
'more to consider from our brands'
,
elif
key_lower
in
(
'brands you might like'
,
'more to consider from our brands'
,
'similar brands on amazon'
,
'exclusive items from our brands'
,
'more from frequently bought brands'
):
return
"brand_recommendation"
elif
key_lower
in
(
'products related to this item'
,
'based on your recent views'
,
'customer also bought'
,
...
...
@@ -132,9 +133,28 @@ class DimAsinRelatedTraffic(object):
from ods_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';
"""
self
.
df_asin_detail
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
# 去重
print
(
"读取ods_self_asin_related_traffic数据"
)
sql
=
f
"""
select
asin,
together_asin,
sp_initial_seen_asins_json,
sp_4stars_initial_seen_asins_json,
sp_delivery_initial_seen_asins_json,
compare_similar_asin_json,
result_list_json,
null as bundles_this_asins_json,
updated_at
from ods_self_asin_related_traffic where site_name='{self.site_name}';
"""
self
.
df_self_asin_detail
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
# 合并去重
window
=
Window
.
partitionBy
([
'asin'
])
.
orderBy
(
self
.
df_asin_detail
.
updated_at
.
desc_nulls_last
())
self
.
df_asin_detail
=
self
.
df_asin_detail
.
withColumn
(
self
.
df_asin_detail
=
self
.
df_asin_detail
.
unionByName
(
self
.
df_self_asin_detail
,
allowMissingColumns
=
False
)
.
withColumn
(
"dt_rank"
,
F
.
row_number
()
.
over
(
window
=
window
)
)
.
filter
(
"dt_rank=1"
)
.
drop
(
"updated_at"
,
"dt_rank"
)
.
cache
()
print
(
"详情数据如下:"
)
...
...
@@ -158,9 +178,9 @@ class DimAsinRelatedTraffic(object):
)
.
withColumn
(
"distinct_values"
,
F
.
array_distinct
(
"value_array"
)
)
.
withColumn
(
# 过滤掉
数据中的脏数据asin
# 过滤掉
asin长度不为10的脏数据
"filtered_values"
,
F
.
expr
(
"filter(distinct_values, x -> x != '' AND length(x) = 10)"
)
F
.
filter
(
"distinct_values"
,
lambda
x
:
(
x
.
isNotNull
()
&
(
x
!=
""
)
&
(
F
.
length
(
F
.
trim
(
x
))
==
10
))
)
)
.
withColumn
(
"values"
,
F
.
concat_ws
(
","
,
"filtered_values"
)
)
.
filter
(
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment