import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.storagelevel import StorageLevel from utils.templates import Templates # from ..utils.templates import Templates # from AmazonSpider.pyspark_job.utils.templates_test import Templates from pyspark.sql.types import StringType, IntegerType # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F class StAvg(Templates): def __init__(self, site_name='us', date_type="month", date_info='2022-01'): super(StAvg, self).__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f'dwd_st_measure_test' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") self.get_date_info_tuple() self.df_st_asin = self.spark.sql(f"select 1+1;") self.df_asin_volume = self.spark.sql(f"select 1+1;") self.df_asin_price_weight = self.spark.sql(f"select 1+1;") def read_data(self): sql = f"select search_term, asin from dim_st_asin_info where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple[:2]};" print("sql:", sql) self.df_st_asin = self.spark.sql(sqlQuery=sql).cache() self.df_st_asin.show(10, truncate=False) # asin_length, asin_width, asin_height, sql = f"select asin, asin_length * asin_width * asin_height as asin_volume from dim_asin_volume_info where site_name='{self.site_name}'" print("sql:", sql) self.df_asin_volume = self.spark.sql(sqlQuery=sql).cache() self.df_asin_volume.show(10, truncate=False) sql = f"select asin, asin_price, asin_weight from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'" print("sql:", sql) self.df_asin_price_weight = self.spark.sql(sqlQuery=sql).cache() self.df_asin_price_weight.show(10, truncate=False) def handle_join_by_asin(self): self.df_st_asin = self.df_st_asin.drop_duplicates(['search_term', 'asin']) self.df_st_asin = self.df_st_asin.join( self.df_asin_volume, on='asin', how='left' ).join( self.df_asin_price_weight, on='asin', how='left' ) def handle_st_attributes(self, attributes_type='asin_volume'): # 定义窗口函数 window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}")) # 计算百分比排名并筛选 <= 0.25 的记录 df = self.df_st_asin.select("search_term", f"{attributes_type}").filter(f'{attributes_type} is not null') \ .withColumn(f"{attributes_type}_percent_rank", F.percent_rank().over(window)) \ .filter(f'{attributes_type}_percent_rank <= 0.25') \ # 使用 row_number() 方法获取每个 search_term 的最大百分比排名记录 window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}_percent_rank")) df = df.withColumn(f"{attributes_type}_row_number", F.row_number().over(window)) \ .filter(f'{attributes_type}_row_number = 1') # 显示结果 df = df.drop(f"{attributes_type}_percent_rank", f"{attributes_type}_row_number") df = df.withColumnRenamed(f"{attributes_type}", f"{attributes_type.replace('asin', 'st')}_25_percent") df.show(10, truncate=False) return df def handle_data(self): self.handle_join_by_asin() df_st_volume = self.handle_st_attributes(attributes_type='asin_volume') df_st_price = self.handle_st_attributes(attributes_type='asin_price') df_st_weight = self.handle_st_attributes(attributes_type='asin_weight') df_st_min = self.df_st_asin.groupby(['search_term']).agg( F.min("asin_volume").alias('st_volume_min'), F.min("asin_price").alias('st_price_min'), F.min("asin_weight").alias('st_weight_min') ) df_st_min = df_st_min.join( df_st_volume, on='search_term', how='left' ).join( df_st_price, on='search_term', how='left' ).join( df_st_weight, on='search_term', how='left' ) df_st_min = df_st_min.withColumn( "st_volume_avg", 1.5 * (df_st_min.st_volume_25_percent - df_st_min.st_volume_min) + df_st_min.st_volume_min ).withColumn( "st_price_avg", 1.5 * (df_st_min.st_price_25_percent - df_st_min.st_price_min) + df_st_min.st_price_min ).withColumn( "st_weight_avg", 1.5 * (df_st_min.st_weight_25_percent - df_st_min.st_weight_min) + df_st_min.st_weight_min ) df_st_min.show(10, truncate=False) quit() def handle_st_attributes_old(self, attributes_type='asin_volume'): window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}")) df = self.df_st_asin.filter(f'{attributes_type} is not null').withColumn(f"{attributes_type}_percent_rank", F.percent_rank().over(window)) df = df.filter(f'{attributes_type}_percent_rank<=0.25').select("search_term", f"{attributes_type}") window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}_percent_rank")) df = df.withColumn(f"{attributes_type}_row_number", F.row_number().over(window)) df = df.filter(f'{attributes_type}_row_number=1') df.show(10, truncate=False) return df def handle_data_old(self): self.handle_data_join() # self.df_st_asin.filter('search_term="exrated stevie j cole"').show(100, truncate=False) self.df_st_asin.filter('search_term="almanack of naval ravikant"').show(100, truncate=False) # quit() window_volume = Window.partitionBy(['search_term']).orderBy(F.desc("asin_volume")) window_price = Window.partitionBy(['search_term']).orderBy(F.desc("asin_price")) window_weight = Window.partitionBy(['search_term']).orderBy(F.desc("asin_weight")) df_st_volume = self.df_st_asin.filter('asin_volume is not null').withColumn("volume_percent_rank", F.percent_rank().over(window_volume)) df_st_price = self.df_st_asin.filter('asin_price is not null').withColumn("price_percent_rank", F.percent_rank().over(window_price)) df_st_weight = self.df_st_asin.filter('asin_weight is not null').withColumn("weight_percent_rank", F.percent_rank().over(window_weight)) df_st_min_value = self.df_st_asin.groupby(['search_term']).agg( F.min("asin_volume").alias('min_st_volume'), F.min("asin_price").alias('min_st_price'), F.min("asin_weight").alias('min_st_weight') ) df_st_min_value.show(10, truncate=False) df_st_min_value = df_st_min_value.drop_duplicates(['search_term']) df_st_volume = df_st_volume.filter('volume_percent_rank>=0.25').select("search_term", "asin_volume").drop_duplicates(['search_term']) df_st_price = df_st_price.filter('price_percent_rank>=0.25').select("search_term", "asin_price").drop_duplicates(['search_term']) df_st_weight = df_st_weight.filter('weight_percent_rank>=0.25').select("search_term", "asin_weight").drop_duplicates(['search_term']) df_st_min_value = df_st_min_value.join( df_st_volume, on='search_term', how='left' ).join( df_st_price, on='search_term', how='left' ).join( df_st_weight, on='search_term', how='left' ) df_st_min_value.show(10, truncate=False) df_st_min_value = df_st_min_value.withColumn( "st_volume_avg", 1.5 * (df_st_min_value.asin_volume - df_st_min_value.min_st_volume) + df_st_min_value.min_st_volume ).withColumn( "st_price_avg", 1.5 * (df_st_min_value.asin_price - df_st_min_value.min_st_price) + df_st_min_value.min_st_price ).withColumn( "st_weight_avg", 1.5 * (df_st_min_value.asin_weight - df_st_min_value.min_st_weight) + df_st_min_value.min_st_weight ) df_st_min_value.show(10, truncate=False) quit() if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:day/week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1 handle_obj = StAvg(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()