Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
e7745fcc
Commit
e7745fcc
authored
May 06, 2026
by
chenyuanjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
amazon标签存储格式调整
parent
9cd4cad0
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
9 additions
and
5 deletions
+9
-5
dim_asin_detail.py
Pyspark_job/dim/dim_asin_detail.py
+1
-1
kafka_flow_asin_detail.py
Pyspark_job/my_kafka/kafka_flow_asin_detail.py
+4
-2
kafka_rank_asin_detail.py
Pyspark_job/my_kafka/kafka_rank_asin_detail.py
+4
-2
No files found.
Pyspark_job/dim/dim_asin_detail.py
View file @
e7745fcc
...
@@ -162,7 +162,7 @@ class DimAsinDetail(object):
...
@@ -162,7 +162,7 @@ class DimAsinDetail(object):
get_json_object(product_json, '$.Color') as product_json_color,
get_json_object(product_json, '$.Color') as product_json_color,
get_json_object(product_json, '$.Number of Items') as product_json_number_of_items,
get_json_object(product_json, '$.Number of Items') as product_json_number_of_items,
current_asin,
current_asin,
nullif(
get_json_object(amazon_label, '$.badge_type'
), 'unknown') as amazon_label
nullif(
coalesce(get_json_object(amazon_label, '$[0].badge_type'), get_json_object(amazon_label, '$.badge_type')
), 'unknown') as amazon_label
from ods_asin_detail where site_name='{self.site_name}' {self.date_sql}"""
from ods_asin_detail where site_name='{self.site_name}' {self.date_sql}"""
print
(
sql
)
print
(
sql
)
self
.
df_asin_detail
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
self
.
df_asin_detail
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
...
...
Pyspark_job/my_kafka/kafka_flow_asin_detail.py
View file @
e7745fcc
...
@@ -810,8 +810,10 @@ class KafkaFlowAsinDetail(Templates):
...
@@ -810,8 +810,10 @@ class KafkaFlowAsinDetail(Templates):
.
withColumn
(
"img_type_arr"
,
F
.
split
(
F
.
col
(
"img_type"
),
","
))
\
.
withColumn
(
"img_type_arr"
,
F
.
split
(
F
.
col
(
"img_type"
),
","
))
\
.
withColumn
(
"img_type_arr"
,
F
.
expr
(
"transform(img_type_arr, x -> cast(x as int))"
))
\
.
withColumn
(
"img_type_arr"
,
F
.
expr
(
"transform(img_type_arr, x -> cast(x as int))"
))
\
.
withColumn
(
"amazon_label"
,
F
.
when
(
.
withColumn
(
"amazon_label"
,
F
.
when
(
F
.
get_json_object
(
F
.
col
(
"amazon_label"
),
"$.badge_type"
)
!=
"unknown"
,
F
.
coalesce
(
F
.
get_json_object
(
F
.
col
(
"amazon_label"
),
"$[0].badge_type"
),
F
.
get_json_object
(
F
.
col
(
"amazon_label"
),
"$.badge_type"
)
F
.
get_json_object
(
F
.
col
(
"amazon_label"
),
"$.badge_type"
))
!=
"unknown"
,
F
.
coalesce
(
F
.
get_json_object
(
F
.
col
(
"amazon_label"
),
"$[0].badge_type"
),
F
.
get_json_object
(
F
.
col
(
"amazon_label"
),
"$.badge_type"
))
))
))
df_save
=
df
.
select
(
"asin"
,
"ao_val"
,
"zr_counts"
,
"sp_counts"
,
"sb_counts"
,
"vi_counts"
,
"bs_counts"
,
"ac_counts"
,
df_save
=
df
.
select
(
"asin"
,
"ao_val"
,
"zr_counts"
,
"sp_counts"
,
"sb_counts"
,
"vi_counts"
,
"bs_counts"
,
"ac_counts"
,
"tr_counts"
,
"er_counts"
,
"bsr_orders"
,
"bsr_orders_sale"
,
"title"
,
"title_len"
,
"price"
,
"tr_counts"
,
"er_counts"
,
"bsr_orders"
,
"bsr_orders_sale"
,
"title"
,
"title_len"
,
"price"
,
...
...
Pyspark_job/my_kafka/kafka_rank_asin_detail.py
View file @
e7745fcc
...
@@ -809,8 +809,10 @@ class KafkaRankAsinDetail(Templates):
...
@@ -809,8 +809,10 @@ class KafkaRankAsinDetail(Templates):
.
withColumn
(
"img_type_arr"
,
F
.
split
(
F
.
col
(
"img_type"
),
","
))
\
.
withColumn
(
"img_type_arr"
,
F
.
split
(
F
.
col
(
"img_type"
),
","
))
\
.
withColumn
(
"img_type_arr"
,
F
.
expr
(
"transform(img_type_arr, x -> cast(x as int))"
))
\
.
withColumn
(
"img_type_arr"
,
F
.
expr
(
"transform(img_type_arr, x -> cast(x as int))"
))
\
.
withColumn
(
"amazon_label"
,
F
.
when
(
.
withColumn
(
"amazon_label"
,
F
.
when
(
F
.
get_json_object
(
F
.
col
(
"amazon_label"
),
"$.badge_type"
)
!=
"unknown"
,
F
.
coalesce
(
F
.
get_json_object
(
F
.
col
(
"amazon_label"
),
"$[0].badge_type"
),
F
.
get_json_object
(
F
.
col
(
"amazon_label"
),
"$.badge_type"
)
F
.
get_json_object
(
F
.
col
(
"amazon_label"
),
"$.badge_type"
))
!=
"unknown"
,
F
.
coalesce
(
F
.
get_json_object
(
F
.
col
(
"amazon_label"
),
"$[0].badge_type"
),
F
.
get_json_object
(
F
.
col
(
"amazon_label"
),
"$.badge_type"
))
))
))
df_save
=
df
.
select
(
"asin"
,
"ao_val"
,
"zr_counts"
,
"sp_counts"
,
"sb_counts"
,
"vi_counts"
,
"bs_counts"
,
"ac_counts"
,
df_save
=
df
.
select
(
"asin"
,
"ao_val"
,
"zr_counts"
,
"sp_counts"
,
"sb_counts"
,
"vi_counts"
,
"bs_counts"
,
"ac_counts"
,
"tr_counts"
,
"er_counts"
,
"bsr_orders"
,
"bsr_orders_sale"
,
"title"
,
"title_len"
,
"price"
,
"tr_counts"
,
"er_counts"
,
"bsr_orders"
,
"bsr_orders_sale"
,
"title"
,
"title_len"
,
"price"
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment