avg_volume_prive_weight.py 8.41 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import StringType, IntegerType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F


class StAvg(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2022-01'):
        super(StAvg, self).__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f'dwd_st_measure_test'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.get_date_info_tuple()
        self.df_st_asin = self.spark.sql(f"select 1+1;")
        self.df_asin_volume = self.spark.sql(f"select 1+1;")
        self.df_asin_price_weight = self.spark.sql(f"select 1+1;")

    def read_data(self):
        sql = f"select search_term, asin from dim_st_asin_info where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple[:2]};"
        print("sql:", sql)
        self.df_st_asin = self.spark.sql(sqlQuery=sql).cache()
        self.df_st_asin.show(10, truncate=False)
        # asin_length, asin_width, asin_height,
        sql = f"select asin, asin_length * asin_width * asin_height as asin_volume from dim_asin_volume_info where site_name='{self.site_name}'"
        print("sql:", sql)
        self.df_asin_volume = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_volume.show(10, truncate=False)
        sql = f"select asin, asin_price, asin_weight from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
        print("sql:", sql)
        self.df_asin_price_weight = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_price_weight.show(10, truncate=False)

    def handle_join_by_asin(self):
        self.df_st_asin = self.df_st_asin.drop_duplicates(['search_term', 'asin'])
        self.df_st_asin = self.df_st_asin.join(
            self.df_asin_volume, on='asin', how='left'
        ).join(
            self.df_asin_price_weight, on='asin', how='left'
        )

    def handle_st_attributes(self, attributes_type='asin_volume'):
        # 定义窗口函数
        window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}"))

        # 计算百分比排名并筛选 <= 0.25 的记录
        df = self.df_st_asin.select("search_term", f"{attributes_type}").filter(f'{attributes_type} is not null') \
            .withColumn(f"{attributes_type}_percent_rank", F.percent_rank().over(window)) \
            .filter(f'{attributes_type}_percent_rank <= 0.25') \

        # 使用 row_number() 方法获取每个 search_term 的最大百分比排名记录
        window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}_percent_rank"))
        df = df.withColumn(f"{attributes_type}_row_number", F.row_number().over(window)) \
            .filter(f'{attributes_type}_row_number = 1')

        # 显示结果
        df = df.drop(f"{attributes_type}_percent_rank", f"{attributes_type}_row_number")
        df = df.withColumnRenamed(f"{attributes_type}", f"{attributes_type.replace('asin', 'st')}_25_percent")
        df.show(10, truncate=False)

        return df

    def handle_data(self):
        self.handle_join_by_asin()
        df_st_volume = self.handle_st_attributes(attributes_type='asin_volume')
        df_st_price = self.handle_st_attributes(attributes_type='asin_price')
        df_st_weight = self.handle_st_attributes(attributes_type='asin_weight')
        df_st_min = self.df_st_asin.groupby(['search_term']).agg(
            F.min("asin_volume").alias('st_volume_min'),
            F.min("asin_price").alias('st_price_min'),
            F.min("asin_weight").alias('st_weight_min')
        )
        df_st_min = df_st_min.join(
            df_st_volume, on='search_term', how='left'
        ).join(
            df_st_price, on='search_term', how='left'
        ).join(
            df_st_weight, on='search_term', how='left'
        )
        df_st_min = df_st_min.withColumn(
            "st_volume_avg",
            1.5 * (df_st_min.st_volume_25_percent - df_st_min.st_volume_min) + df_st_min.st_volume_min
        ).withColumn(
            "st_price_avg",
            1.5 * (df_st_min.st_price_25_percent - df_st_min.st_price_min) + df_st_min.st_price_min
        ).withColumn(
            "st_weight_avg",
            1.5 * (df_st_min.st_weight_25_percent - df_st_min.st_weight_min) + df_st_min.st_weight_min
        )
        df_st_min.show(10, truncate=False)
        quit()

    def handle_st_attributes_old(self, attributes_type='asin_volume'):
        window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}"))
        df = self.df_st_asin.filter(f'{attributes_type} is not null').withColumn(f"{attributes_type}_percent_rank", F.percent_rank().over(window))
        df = df.filter(f'{attributes_type}_percent_rank<=0.25').select("search_term", f"{attributes_type}")
        window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}_percent_rank"))
        df = df.withColumn(f"{attributes_type}_row_number", F.row_number().over(window))
        df = df.filter(f'{attributes_type}_row_number=1')
        df.show(10, truncate=False)
        return df

    def handle_data_old(self):
        self.handle_data_join()
        # self.df_st_asin.filter('search_term="exrated stevie j cole"').show(100, truncate=False)
        self.df_st_asin.filter('search_term="almanack of naval ravikant"').show(100, truncate=False)
        # quit()

        window_volume = Window.partitionBy(['search_term']).orderBy(F.desc("asin_volume"))
        window_price = Window.partitionBy(['search_term']).orderBy(F.desc("asin_price"))
        window_weight = Window.partitionBy(['search_term']).orderBy(F.desc("asin_weight"))
        df_st_volume = self.df_st_asin.filter('asin_volume is not null').withColumn("volume_percent_rank", F.percent_rank().over(window_volume))
        df_st_price = self.df_st_asin.filter('asin_price is not null').withColumn("price_percent_rank", F.percent_rank().over(window_price))
        df_st_weight = self.df_st_asin.filter('asin_weight is not null').withColumn("weight_percent_rank", F.percent_rank().over(window_weight))
        df_st_min_value = self.df_st_asin.groupby(['search_term']).agg(
            F.min("asin_volume").alias('min_st_volume'),
            F.min("asin_price").alias('min_st_price'),
            F.min("asin_weight").alias('min_st_weight')
        )
        df_st_min_value.show(10, truncate=False)
        df_st_min_value = df_st_min_value.drop_duplicates(['search_term'])
        df_st_volume = df_st_volume.filter('volume_percent_rank>=0.25').select("search_term", "asin_volume").drop_duplicates(['search_term'])
        df_st_price = df_st_price.filter('price_percent_rank>=0.25').select("search_term", "asin_price").drop_duplicates(['search_term'])
        df_st_weight = df_st_weight.filter('weight_percent_rank>=0.25').select("search_term", "asin_weight").drop_duplicates(['search_term'])
        df_st_min_value = df_st_min_value.join(
            df_st_volume, on='search_term', how='left'
        ).join(
            df_st_price, on='search_term', how='left'
        ).join(
            df_st_weight, on='search_term', how='left'
        )
        df_st_min_value.show(10, truncate=False)
        df_st_min_value = df_st_min_value.withColumn(
            "st_volume_avg", 1.5 * (df_st_min_value.asin_volume - df_st_min_value.min_st_volume) + df_st_min_value.min_st_volume
        ).withColumn(
            "st_price_avg", 1.5 * (df_st_min_value.asin_price - df_st_min_value.min_st_price) + df_st_min_value.min_st_price
        ).withColumn(
            "st_weight_avg", 1.5 * (df_st_min_value.asin_weight - df_st_min_value.min_st_weight) + df_st_min_value.min_st_weight
        )
        df_st_min_value.show(10, truncate=False)
        quit()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:day/week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1
    handle_obj = StAvg(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()