avg_volume_prive_weight.py 8.41 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import StringType, IntegerType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F


class StAvg(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2022-01'):
        super(StAvg, self).__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f'dwd_st_measure_test'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.get_date_info_tuple()
        self.df_st_asin = self.spark.sql(f"select 1+1;")
        self.df_asin_volume = self.spark.sql(f"select 1+1;")
        self.df_asin_price_weight = self.spark.sql(f"select 1+1;")

    def read_data(self):
        sql = f"select search_term, asin from dim_st_asin_info where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple[:2]};"
        print("sql:", sql)
        self.df_st_asin = self.spark.sql(sqlQuery=sql).cache()
        self.df_st_asin.show(10, truncate=False)
        # asin_length, asin_width, asin_height,
        sql = f"select asin, asin_length * asin_width * asin_height as asin_volume from dim_asin_volume_info where site_name='{self.site_name}'"
        print("sql:", sql)
        self.df_asin_volume = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_volume.show(10, truncate=False)
        sql = f"select asin, asin_price, asin_weight from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
        print("sql:", sql)
        self.df_asin_price_weight = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_price_weight.show(10, truncate=False)

    def handle_join_by_asin(self):
        self.df_st_asin = self.df_st_asin.drop_duplicates(['search_term', 'asin'])
        self.df_st_asin = self.df_st_asin.join(
            self.df_asin_volume, on='asin', how='left'
        ).join(
            self.df_asin_price_weight, on='asin', how='left'
        )

    def handle_st_attributes(self, attributes_type='asin_volume'):
        # 定义窗口函数
        window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}"))

        # 计算百分比排名并筛选 <= 0.25 的记录
        df = self.df_st_asin.select("search_term", f"{attributes_type}").filter(f'{attributes_type} is not null') \
            .withColumn(f"{attributes_type}_percent_rank", F.percent_rank().over(window)) \
            .filter(f'{attributes_type}_percent_rank <= 0.25') \

        # 使用 row_number() 方法获取每个 search_term 的最大百分比排名记录
        window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}_percent_rank"))
        df = df.withColumn(f"{attributes_type}_row_number", F.row_number().over(window)) \
            .filter(f'{attributes_type}_row_number = 1')

        # 显示结果
        df = df.drop(f"{attributes_type}_percent_rank", f"{attributes_type}_row_number")
        df = df.withColumnRenamed(f"{attributes_type}", f"{attributes_type.replace('asin', 'st')}_25_percent")
        df.show(10, truncate=False)

        return df

    def handle_data(self):
        self.handle_join_by_asin()
        df_st_volume = self.handle_st_attributes(attributes_type='asin_volume')
        df_st_price = self.handle_st_attributes(attributes_type='asin_price')
        df_st_weight = self.handle_st_attributes(attributes_type='asin_weight')
        df_st_min = self.df_st_asin.groupby(['search_term']).agg(
            F.min("asin_volume").alias('st_volume_min'),
            F.min("asin_price").alias('st_price_min'),
            F.min("asin_weight").alias('st_weight_min')
        )
        df_st_min = df_st_min.join(
            df_st_volume, on='search_term', how='left'
        ).join(
            df_st_price, on='search_term', how='left'
        ).join(
            df_st_weight, on='search_term', how='left'
        )
        df_st_min = df_st_min.withColumn(
            "st_volume_avg",
            1.5 * (df_st_min.st_volume_25_percent - df_st_min.st_volume_min) + df_st_min.st_volume_min
        ).withColumn(
            "st_price_avg",
            1.5 * (df_st_min.st_price_25_percent - df_st_min.st_price_min) + df_st_min.st_price_min
        ).withColumn(
            "st_weight_avg",
            1.5 * (df_st_min.st_weight_25_percent - df_st_min.st_weight_min) + df_st_min.st_weight_min
        )
        df_st_min.show(10, truncate=False)
        quit()

    def handle_st_attributes_old(self, attributes_type='asin_volume'):
        window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}"))
        df = self.df_st_asin.filter(f'{attributes_type} is not null').withColumn(f"{attributes_type}_percent_rank", F.percent_rank().over(window))
        df = df.filter(f'{attributes_type}_percent_rank<=0.25').select("search_term", f"{attributes_type}")
        window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}_percent_rank"))
        df = df.withColumn(f"{attributes_type}_row_number", F.row_number().over(window))
        df = df.filter(f'{attributes_type}_row_number=1')
        df.show(10, truncate=False)
        return df

    def handle_data_old(self):
        self.handle_data_join()
        # self.df_st_asin.filter('search_term="exrated stevie j cole"').show(100, truncate=False)
        self.df_st_asin.filter('search_term="almanack of naval ravikant"').show(100, truncate=False)
        # quit()

        window_volume = Window.partitionBy(['search_term']).orderBy(F.desc("asin_volume"))
        window_price = Window.partitionBy(['search_term']).orderBy(F.desc("asin_price"))
        window_weight = Window.partitionBy(['search_term']).orderBy(F.desc("asin_weight"))
        df_st_volume = self.df_st_asin.filter('asin_volume is not null').withColumn("volume_percent_rank", F.percent_rank().over(window_volume))
        df_st_price = self.df_st_asin.filter('asin_price is not null').withColumn("price_percent_rank", F.percent_rank().over(window_price))
        df_st_weight = self.df_st_asin.filter('asin_weight is not null').withColumn("weight_percent_rank", F.percent_rank().over(window_weight))
        df_st_min_value = self.df_st_asin.groupby(['search_term']).agg(
            F.min("asin_volume").alias('min_st_volume'),
            F.min("asin_price").alias('min_st_price'),
            F.min("asin_weight").alias('min_st_weight')
        )
        df_st_min_value.show(10, truncate=False)
        df_st_min_value = df_st_min_value.drop_duplicates(['search_term'])
        df_st_volume = df_st_volume.filter('volume_percent_rank>=0.25').select("search_term", "asin_volume").drop_duplicates(['search_term'])
        df_st_price = df_st_price.filter('price_percent_rank>=0.25').select("search_term", "asin_price").drop_duplicates(['search_term'])
        df_st_weight = df_st_weight.filter('weight_percent_rank>=0.25').select("search_term", "asin_weight").drop_duplicates(['search_term'])
        df_st_min_value = df_st_min_value.join(
            df_st_volume, on='search_term', how='left'
        ).join(
            df_st_price, on='search_term', how='left'
        ).join(
            df_st_weight, on='search_term', how='left'
        )
        df_st_min_value.show(10, truncate=False)
        df_st_min_value = df_st_min_value.withColumn(
            "st_volume_avg", 1.5 * (df_st_min_value.asin_volume - df_st_min_value.min_st_volume) + df_st_min_value.min_st_volume
        ).withColumn(
            "st_price_avg", 1.5 * (df_st_min_value.asin_price - df_st_min_value.min_st_price) + df_st_min_value.min_st_price
        ).withColumn(
            "st_weight_avg", 1.5 * (df_st_min_value.asin_weight - df_st_min_value.min_st_weight) + df_st_min_value.min_st_weight
        )
        df_st_min_value.show(10, truncate=False)
        quit()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:day/week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1
    handle_obj = StAvg(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()