1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.DorisHelper import DorisHelper
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from pyspark.sql import functions as F
class CateAsinDetailToDoris(object):
def __init__(self):
# 创建spark对象
self.spark = SparkUtil.get_spark_session(app_name="cate_asin_detail_to_doris")
self.df_asin_detail = self.spark.sql(f"select 1+1;")
self.df_asin_last5m_comments = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
self.last_5m = CommonUtil.get_month_offset('2025-01', -5)
def read_data(self):
# 读取流量选品的asin数据
sql1 = f"""
select category_id, asin, asin_total_comments, asin_comments_change, asin_launch_time, asin_launch_time_type,
asin_bsr_orders_change, asin_price, asin_size_type, is_alarm_brand
from dwt_flow_asin where site_name = 'us' and date_type = 'month' and date_info = '2025-01' and category_id is not null;
"""
self.df_asin_detail = self.spark.sql(sqlQuery=sql1).repartition(80, 'category_id').cache()
print("当前月流量选品的基础数据:")
self.df_asin_detail.show(10, True)
# 读取asin前五个月的评论数据
sql2 = f"""
select asin, asin_total_comments
from dwt_flow_asin where site_name = 'us' and date_type = 'month' and date_info >= '{self.last_5m}'
and date_info < '2025-01';
"""
self.df_asin_last5m_comments = self.spark.sql(sqlQuery=sql2).cache()
print("asin前5个月的评论数是:")
self.df_asin_last5m_comments.show(10, True)
def handle_data(self):
# 计算每个分类的平均评论数
df_comments_avg = self.df_asin_detail.groupby(['category_id']).agg(
F.round(F.avg("asin_total_comments")).cast("int").alias("comments_avg")
)
print("分类的平均评论数:")
df_comments_avg.show(10, True)
# 计算asin前五个月的评论数
df_comments_last_5m = self.df_asin_last5m_comments.groupby(['asin']).agg(
F.sum("asin_total_comments").alias("comments_last_five_month")
)
print("asin前5个月的评论数之和:")
df_comments_last_5m.show(10, True)
self.df_asin_detail = self.df_asin_detail\
.join(df_comments_avg, 'category_id', 'left')\
.join(df_comments_last_5m, 'asin', 'left')\
.withColumnRenamed('asin_total_comments', 'comments')\
.withColumnRenamed('asin_comments_change', 'comments_change')\
.withColumnRenamed('asin_launch_time', 'launch_time')\
.withColumnRenamed('asin_launch_time_type', 'launch_time_type')\
.withColumnRenamed('asin_bsr_orders_change', 'bsr_orders_change')\
.withColumnRenamed('asin_price', 'price')\
.withColumnRenamed('asin_size_type', 'size_type')\
.fillna({'comments_last_five_month': 0})
self.df_asin_detail = self.df_asin_detail\
.withColumn('comments_avg_type',
F.when(F.col('comments_avg') < 400, 1)
.when((F.col('comments_avg') < 700) & (F.col('comments_avg') >= 400), 2)
.when((F.col('comments_avg') < 1000) & (F.col('comments_avg') >= 700), 3)
.when((F.col('comments_avg') < 1400) & (F.col('comments_avg') >= 1000), 4)
.when((F.col('comments_avg') <= 1800) & (F.col('comments_avg') >= 1400), 5)
.when(F.col('comments_avg') > 1800, 6)
.otherwise(0)
)\
.withColumn('one_year_and_rise_type',
F.when((F.col('launch_time_type') <= 4) & (F.col('launch_time_type') > 0) & (F.col('comments_change') >= 1) & (F.col('bsr_orders_change') >= 0.2) & (F.col('bsr_orders_change') <= 0.4), 1)
.when((F.col('launch_time_type') <= 4) & (F.col('launch_time_type') > 0) & (F.col('comments_change') >= 1) & (F.col('bsr_orders_change') > 0.4) & (F.col('bsr_orders_change') <= 0.6), 2)
.when((F.col('launch_time_type') <= 4) & (F.col('launch_time_type') > 0) & (F.col('comments_change') >= 1) & (F.col('bsr_orders_change') > 0.6), 3)
.otherwise(0)
)\
.withColumn('half_year_and_rise_type',
F.when((F.col('launch_time_type') <= 3) & (F.col('launch_time_type') > 0) & (F.col('comments') > F.col('comments_last_five_month')) & (F.col('bsr_orders_change') >= 0.3) & (F.col('bsr_orders_change') <= 0.5), 1)
.when((F.col('launch_time_type') <= 3) & (F.col('launch_time_type') > 0) & (F.col('comments') > F.col('comments_last_five_month')) & (F.col('bsr_orders_change') > 0.5) & (F.col('bsr_orders_change') <= 0.7), 2)
.when((F.col('launch_time_type') <= 3) & (F.col('launch_time_type') > 0) & (F.col('comments') > F.col('comments_last_five_month')) & (F.col('bsr_orders_change') > 0.7), 3)
.otherwise(0)
)\
.withColumn('price_right_type',
F.when((F.col('price') >= 15) & (F.col('price') < 20), 1)
.when((F.col('price') >= 20) & (F.col('price') < 30), 2)
.when((F.col('price') >= 30) & (F.col('price') < 40), 3)
.when((F.col('price') >= 40) & (F.col('price') <= 50), 4)
.otherwise(0)
)
self.df_save = self.df_asin_detail.select(
'category_id', 'asin', 'comments', 'comments_last_five_month', 'comments_avg', 'comments_change',
'launch_time', 'launch_time_type', 'bsr_orders_change', 'price', 'size_type', 'is_alarm_brand',
'comments_avg_type', 'one_year_and_rise_type', 'half_year_and_rise_type', 'price_right_type'
)
def save_data(self):
columns = self.df_save.columns
columns_str = ",".join(columns)
DorisHelper.spark_export_with_columns(self.df_save, 'test', 'us_cate_asin_flag', columns_str)
print('导出完成')
def run(self):
self.read_data()
self.handle_data()
self.save_data()
if __name__ == '__main__':
obj = CateAsinDetailToDoris()
obj.run()