avg_volume_prive_weight.py
8.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import StringType, IntegerType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
class StAvg(Templates):
def __init__(self, site_name='us', date_type="month", date_info='2022-01'):
super(StAvg, self).__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = f'dwd_st_measure_test'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
self.get_date_info_tuple()
self.df_st_asin = self.spark.sql(f"select 1+1;")
self.df_asin_volume = self.spark.sql(f"select 1+1;")
self.df_asin_price_weight = self.spark.sql(f"select 1+1;")
def read_data(self):
sql = f"select search_term, asin from dim_st_asin_info where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple[:2]};"
print("sql:", sql)
self.df_st_asin = self.spark.sql(sqlQuery=sql).cache()
self.df_st_asin.show(10, truncate=False)
# asin_length, asin_width, asin_height,
sql = f"select asin, asin_length * asin_width * asin_height as asin_volume from dim_asin_volume_info where site_name='{self.site_name}'"
print("sql:", sql)
self.df_asin_volume = self.spark.sql(sqlQuery=sql).cache()
self.df_asin_volume.show(10, truncate=False)
sql = f"select asin, asin_price, asin_weight from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
print("sql:", sql)
self.df_asin_price_weight = self.spark.sql(sqlQuery=sql).cache()
self.df_asin_price_weight.show(10, truncate=False)
def handle_join_by_asin(self):
self.df_st_asin = self.df_st_asin.drop_duplicates(['search_term', 'asin'])
self.df_st_asin = self.df_st_asin.join(
self.df_asin_volume, on='asin', how='left'
).join(
self.df_asin_price_weight, on='asin', how='left'
)
def handle_st_attributes(self, attributes_type='asin_volume'):
# 定义窗口函数
window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}"))
# 计算百分比排名并筛选 <= 0.25 的记录
df = self.df_st_asin.select("search_term", f"{attributes_type}").filter(f'{attributes_type} is not null') \
.withColumn(f"{attributes_type}_percent_rank", F.percent_rank().over(window)) \
.filter(f'{attributes_type}_percent_rank <= 0.25') \
# 使用 row_number() 方法获取每个 search_term 的最大百分比排名记录
window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}_percent_rank"))
df = df.withColumn(f"{attributes_type}_row_number", F.row_number().over(window)) \
.filter(f'{attributes_type}_row_number = 1')
# 显示结果
df = df.drop(f"{attributes_type}_percent_rank", f"{attributes_type}_row_number")
df = df.withColumnRenamed(f"{attributes_type}", f"{attributes_type.replace('asin', 'st')}_25_percent")
df.show(10, truncate=False)
return df
def handle_data(self):
self.handle_join_by_asin()
df_st_volume = self.handle_st_attributes(attributes_type='asin_volume')
df_st_price = self.handle_st_attributes(attributes_type='asin_price')
df_st_weight = self.handle_st_attributes(attributes_type='asin_weight')
df_st_min = self.df_st_asin.groupby(['search_term']).agg(
F.min("asin_volume").alias('st_volume_min'),
F.min("asin_price").alias('st_price_min'),
F.min("asin_weight").alias('st_weight_min')
)
df_st_min = df_st_min.join(
df_st_volume, on='search_term', how='left'
).join(
df_st_price, on='search_term', how='left'
).join(
df_st_weight, on='search_term', how='left'
)
df_st_min = df_st_min.withColumn(
"st_volume_avg",
1.5 * (df_st_min.st_volume_25_percent - df_st_min.st_volume_min) + df_st_min.st_volume_min
).withColumn(
"st_price_avg",
1.5 * (df_st_min.st_price_25_percent - df_st_min.st_price_min) + df_st_min.st_price_min
).withColumn(
"st_weight_avg",
1.5 * (df_st_min.st_weight_25_percent - df_st_min.st_weight_min) + df_st_min.st_weight_min
)
df_st_min.show(10, truncate=False)
quit()
def handle_st_attributes_old(self, attributes_type='asin_volume'):
window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}"))
df = self.df_st_asin.filter(f'{attributes_type} is not null').withColumn(f"{attributes_type}_percent_rank", F.percent_rank().over(window))
df = df.filter(f'{attributes_type}_percent_rank<=0.25').select("search_term", f"{attributes_type}")
window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}_percent_rank"))
df = df.withColumn(f"{attributes_type}_row_number", F.row_number().over(window))
df = df.filter(f'{attributes_type}_row_number=1')
df.show(10, truncate=False)
return df
def handle_data_old(self):
self.handle_data_join()
# self.df_st_asin.filter('search_term="exrated stevie j cole"').show(100, truncate=False)
self.df_st_asin.filter('search_term="almanack of naval ravikant"').show(100, truncate=False)
# quit()
window_volume = Window.partitionBy(['search_term']).orderBy(F.desc("asin_volume"))
window_price = Window.partitionBy(['search_term']).orderBy(F.desc("asin_price"))
window_weight = Window.partitionBy(['search_term']).orderBy(F.desc("asin_weight"))
df_st_volume = self.df_st_asin.filter('asin_volume is not null').withColumn("volume_percent_rank", F.percent_rank().over(window_volume))
df_st_price = self.df_st_asin.filter('asin_price is not null').withColumn("price_percent_rank", F.percent_rank().over(window_price))
df_st_weight = self.df_st_asin.filter('asin_weight is not null').withColumn("weight_percent_rank", F.percent_rank().over(window_weight))
df_st_min_value = self.df_st_asin.groupby(['search_term']).agg(
F.min("asin_volume").alias('min_st_volume'),
F.min("asin_price").alias('min_st_price'),
F.min("asin_weight").alias('min_st_weight')
)
df_st_min_value.show(10, truncate=False)
df_st_min_value = df_st_min_value.drop_duplicates(['search_term'])
df_st_volume = df_st_volume.filter('volume_percent_rank>=0.25').select("search_term", "asin_volume").drop_duplicates(['search_term'])
df_st_price = df_st_price.filter('price_percent_rank>=0.25').select("search_term", "asin_price").drop_duplicates(['search_term'])
df_st_weight = df_st_weight.filter('weight_percent_rank>=0.25').select("search_term", "asin_weight").drop_duplicates(['search_term'])
df_st_min_value = df_st_min_value.join(
df_st_volume, on='search_term', how='left'
).join(
df_st_price, on='search_term', how='left'
).join(
df_st_weight, on='search_term', how='left'
)
df_st_min_value.show(10, truncate=False)
df_st_min_value = df_st_min_value.withColumn(
"st_volume_avg", 1.5 * (df_st_min_value.asin_volume - df_st_min_value.min_st_volume) + df_st_min_value.min_st_volume
).withColumn(
"st_price_avg", 1.5 * (df_st_min_value.asin_price - df_st_min_value.min_st_price) + df_st_min_value.min_st_price
).withColumn(
"st_weight_avg", 1.5 * (df_st_min_value.asin_weight - df_st_min_value.min_st_weight) + df_st_min_value.min_st_weight
)
df_st_min_value.show(10, truncate=False)
quit()
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:day/week/4_week/month/quarter
date_info = sys.argv[3] # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1
handle_obj = StAvg(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()