Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
bafbae51
Commit
bafbae51
authored
Jan 16, 2026
by
fangxingjun
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
f5804396
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
8 additions
and
1 deletions
+8
-1
dim_st_asin_info.py
Pyspark_job/dim/dim_st_asin_info.py
+2
-0
kafka_asin_detail.py
Pyspark_job/my_kafka/kafka_asin_detail.py
+6
-1
No files found.
Pyspark_job/dim/dim_st_asin_info.py
View file @
bafbae51
...
...
@@ -94,8 +94,10 @@ class DimStAsinInfo(Templates):
f
"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' {params};"
print
(
"sql:"
,
sql
)
df
=
self
.
spark
.
sql
(
sqlQuery
=
sql
)
print
(
f
"data_type--{data_type}, sql:"
,
sql
,
df
.
count
())
# print(f"site_name: {self.site_name}, data_type: {data_type}, partitions: {df.rdd.getNumPartitions()}")
self
.
df_save
=
self
.
df_save
.
unionByName
(
df
,
allowMissingColumns
=
True
)
print
(
self
.
df_save
.
count
())
# self.df_save.show(n=10, truncate=False)
# print("self.df_save.count():", self.df_save.count())
...
...
Pyspark_job/my_kafka/kafka_asin_detail.py
View file @
bafbae51
...
...
@@ -377,6 +377,7 @@ class DimStAsinInfo(Templates):
"brand"
,
"account_id"
,
"account_name"
,
"account_url"
,
"buy_box_seller_type"
,
"volume"
,
"weight"
,
"weight_str"
,
"launch_time"
,
"total_comments"
,
"page_inventory"
,
"asinUpdateTime"
,
"site_name"
,
"node_id"
,
"buy_sales"
,
'asin_amazon_orders'
,
'asin_ao_val'
,
'matrix_ao_val'
,
"asin_zr_flow_proportion"
,
'matrix_flow_proportion'
,
'describe'
)
df
=
df
.
withColumn
(
"price"
,
F
.
round
(
F
.
col
(
"price"
),
2
))
return
df
def
rename_cols
(
self
,
df
):
...
...
@@ -417,6 +418,7 @@ class DimStAsinInfo(Templates):
'bsrOrders'
,
'bsrOrdersSale'
,
'brandName'
,
'accountId'
,
'accountName'
,
'accountUrl'
,
'buyBoxSellerType'
,
'volume'
,
'weight'
,
'launchTime'
,
'totalComments'
,
'pageInventory'
,
'asinUpdateTime'
,
'asinBoughtMonth'
,
"asinAmazonOrders"
,
"fdCountryName"
,
"key_outer"
,
"key_inner"
,
"volumeFormat"
,
"weightFormat"
,
"isSelfAsin"
,
"auctionsNum"
,
"skusNumCreat"
,
"asinZrFlowProportion"
,
"asinZrFlowProportionMatrix"
,
"asinDescribe"
)
return
df
def
get_topic_name
(
self
):
...
...
@@ -467,7 +469,10 @@ class DimStAsinInfo(Templates):
df_save
=
df_save
.
withColumn
(
"weightFormat"
,
F
.
when
(
F
.
col
(
"weight_str"
)
.
isNotNull
(),
self
.
u_extract_weight
(
"weight_str"
)))
df_save
=
df_save
.
withColumn
(
"asin_bsr_orders_sale"
,
df_save
.
price
*
df_save
.
asin_bsr_orders
)
# df_save = df_save.withColumn("asin_bsr_orders_sale", df_save.price * df_save.asin_bsr_orders)
# df_save = df_save.withColumn("asin_bsr_orders_sale", F.round(df_save.price * df_save.asin_bsr_orders)) # 四舍五入
# df_save = df_save.withColumn("asin_bsr_orders_sale", F.floor(df_save.price * df_save.asin_bsr_orders)) # 向下取整
df_save
=
df_save
.
withColumn
(
"asin_bsr_orders_sale"
,
F
.
ceil
(
df_save
.
price
*
df_save
.
asin_bsr_orders
))
# 向上取整
df_save
=
self
.
rename_cols
(
df
=
df_save
)
df_save
=
df_save
.
fillna
({
"isSelfAsin"
:
0
})
self
.
save_to_redis
(
df
=
df_save
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment