import os
import re
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import StringType, IntegerType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from textblob import Word


class DwdStMeasure(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2022-01'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save_st_asin = f'dwd_st_asin_measure'
        self.db_save_st = f'dwd_st_measure'
        self.db_save_asin = f'dwd_asin_measure'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save_st_asin}, {self.db_save_st}, {self.db_save_asin}: {self.site_name}, {self.date_type}, {self.date_info}")
        # self.df_date = self.get_year_week_tuple()  # pandas的df对象
        self.get_date_info_tuple()
        self.get_year_week_tuple()
        self.get_year_month_days_dict(year=int(self.year))
        self.orders_transform_rate = self.get_orders_transform_rate()  # 获取月销-->日销,周销
        self.df_st_asin = self.spark.sql(f"select 1+1;")
        self.df_st_asin_flow = self.spark.sql(f"select 1+1;")
        self.df_st = self.spark.sql(f"select 1+1;")
        self.df_brand_analytics = self.spark.sql(f"select 1+1;")
        self.df_st_rate = self.spark.sql(f"select 1+1;")
        self.df_st_quantity = self.spark.sql(f"select 1+1;")
        self.df_asin_bs = self.spark.sql(f"select 1+1;")
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_bs_report = self.spark.sql(f"select 1+1;")
        self.df_st_asin_duplicated = self.spark.sql(f"select 1+1;")
        self.df_save_st_asin = self.spark.sql(f"select 1+1;")
        self.df_save_asin = self.spark.sql(f"select 1+1;")
        self.df_save_st = self.spark.sql(f"select 1+1;")
        self.df_asin_stable = self.spark.sql(f"select 1+1;")
        self.df_asin_price_weight = self.spark.sql(f"select 1+1;")
        self.df_st_templates = self.spark.sql("select st_zr_counts, st_sp_counts, st_sb1_counts,st_sb2_counts,st_sb3_counts,st_ac_counts,st_bs_counts,st_er_counts,st_tr_counts from dwd_st_measure limit 0")
        self.df_asin_templates = self.spark.sql("select asin_zr_counts, asin_sp_counts, asin_sb1_counts,asin_sb2_counts,asin_sb3_counts,asin_ac_counts,asin_bs_counts,asin_er_counts,asin_tr_counts from dwd_asin_measure limit 0")
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.u_is_title_appear = self.spark.udf.register("u_is_title_appear", self.udf_is_title_appear, IntegerType())

    def get_orders_transform_rate(self):
        month_days = self.year_month_days_dict[int(self.month)]
        if self.date_type in ['day', 'week']:
            if self.date_type == 'day':
                return 1 / month_days
            if self.date_type == 'week':
                return 7 / month_days
        else:
            return 1

    @staticmethod
    def udf_is_title_appear(search_term, title):
        english_prepositions = ["aboard", "about", "above", "across", "after", "against", "along", "amid", "among",
                                "around", "as", "at", "before", "behind", "below", "beneath", "beside", "between",
                                "beyond", "but", "by", "concerning", "considering", "despite", "down", "during",
                                "except", "for", "from", "in", "inside", "into", "like", "near", "of", "off", "on",
                                "onto", "out", "outside", "over", "past", "regarding", "round", "since", "through",
                                "to", "toward", "under", "underneath", "until", "unto", "up", "upon", "with", "within",
                                "without"]

        symbol_list = [',', '。', '?', '!', ':', '?', '!', '-', '%', '|', ';', '·', '…', '~', '&', '@', '#', '、', '…', '~', '&', '@', '#', '“', '”', '‘', '’', '〝', '〞', '"', "'", '"', ''', '´', ''', '(', ')', '【', '】', '《', '》', '<', '>', '﹝', '﹞', '<', '>', '«', '»', '‹', '›', '〔', '〕', '〈', '〉', '{', '}', '[', ']', '「', '」', '{', '}', '〖', '〗', '『', '』', '︵', '︷', '︹', '︿', '︽', '﹁', '﹃', '︻', '︗', '/', '\\', '︶', '︸', '︺', '﹀', '︾', '﹂', '﹄', '︼', '︘', '/', '|', '\', '_', '_', '﹏', '﹍', '﹎', '``', '¦', '¡', '^', '\xad', '¨', 'ˊ', '¯', ' ̄', '﹋', '﹉', '﹊', 'ˋ', '︴', '¿', 'ˇ']

        # 小写
        search_term = str(search_term).lower()
        title = f" {str(title).lower()} "
        # 1. 去掉特殊符号
        # search_term = re.sub(r'[,:()]', '', search_term)  # 去掉特殊符号
        # title = re.sub(r'[,:()]', '', title)  # 去掉特殊符号
        # for symbol in symbol_list:
        #     if symbol in title:
        #         title = title.replace(symbol, "")
        #     if symbol in search_term:
        #         search_term = search_term.replace(symbol, "")

        # 改成正则去掉特殊符号
        symbols = "".join(symbol_list)  # 将列表中的所有字符连接成一个字符串
        search_term = re.sub('[' + symbols + ']', '', search_term)  # 去掉特殊符号
        title = re.sub('[' + symbols + ']', '', title)  # 去掉特殊符号

        # 2. 去掉介词(关键词去掉就行)
        st_list = [f" {st} " for st in search_term.split(" ") if st not in english_prepositions]  # 去掉介词

        # 3. 复数一起匹配
        for st in st_list:
            # if st in symbol_list:
            #     st = st.replace(symbol, "")
            if st not in title:
                if Word(st) not in title:
                    return 0
        return 1

        # 旧版
        # if str(search_term).lower() in str(title).lower():
        #     return 1
        # else:
        #     return 0

    def read_data(self):
        print("self.year, self.month:", self.year, self.month)

        print("1 读取st+asin两个维度: dim_st_asin_info表和ods_rank_flow表")
        print("1.1 读取dim_st_asin_info表")
        if self.date_type == 'month_old':
            # self.get_year_week_tuple()
            if int(self.month) <= 9 and int(self.year) <= 2022:
                sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='month' and date_info ='{self.date_info}'"
            else:
                sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='week' and date_info in {self.year_week_tuple}"
        else:
            sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='week' and date_info in {self.year_week_tuple}"

        # else:
        #     if (int(self.year) == 2022 and int(self.month) < 10) or int(self.year) <= 2021:
        #         sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
        #     else:
        #         sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple}"
        print("sql:", sql)
        self.df_st_asin = self.spark.sql(sqlQuery=sql).cache()
        self.df_st_asin.show(10, truncate=False)
        # self.df_st_asin.filter("search_term='abiie high chair'").show(100, truncate=False)
        # quit()
        # self.df_st_asin = self.df_st_asin.drop_duplicates(["search_term", "asin", "data_type", "date_info"]).cache()
        # self.df_st_asin_duplicated = self.df_st_asin.drop_duplicates(['search_term', 'asin']).cache()
        # print("self.df_st_asin:", self.df_st_asin.count())
        # print("self.df_st_asin_duplicated:", self.df_st_asin_duplicated.count())
        # self.df_st_asin.show(10, truncate=False)
        # self.df_asin = self.df_st_asin.select("asin").drop_duplicates(["asin"])
        # self.df_st = self.df_st_asin.select("search_term").drop_duplicates(["search_term"])
        print("1.2 读取ods_rank_flow表")
        sql = f"select rank as page_rank, flow from ods_rank_flow " \
              f"where site_name='{self.site_name}'"
        self.df_st_asin_flow = self.spark.sql(sql).cache()
        # self.df_st_asin_flow.persist(StorageLevel.MEMORY_ONLY)
        self.df_st_asin_flow.show(10, truncate=False)
        print("2 读取st维度: dim_st_detail表和ods_brand_analytics表")
        print("self.year, self.month:", self.year, self.month)
        print("2.1 读取dim_st_detail和ods_brand_analytics表")
        sql = f"select search_term, st_rank, st_search_sum from dim_st_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info ='{self.date_info}';"
        print("sql:", sql)
        self.df_st = self.spark.sql(sqlQuery=sql)
        self.df_st.persist(StorageLevel.MEMORY_ONLY)
        self.df_st.show(10, truncate=False)
        # 统计词频
        print("2.2 读取ods_brand_analytics表")
        # sql = f"select search_term, date_info from ods_brand_analytics where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple}"
        sql = f"select search_term, date_info from ods_brand_analytics where site_name='{self.site_name}' and date_type='week' and date_info in {self.year_week_tuple}"
        print("sql:", sql)
        self.df_brand_analytics = self.spark.sql(sqlQuery=sql)
        self.df_brand_analytics.persist(StorageLevel.MEMORY_ONLY)
        self.df_brand_analytics.show(10, truncate=False)
        print("3 读取asin维度: dim_asin_bs_info+dim_asin_detail表")
        print("3.1 读取dim_asin_bs_info表")

        sql = f"select asin, asin_bs_cate_1_rank, asin_bs_cate_1_id " \
              f"from dim_asin_bs_info where site_name='{self.site_name}' and date_type='{self.date_type.replace('_old', '')}' and date_info='{self.date_info}';"
        self.df_asin_bs = self.spark.sql(sql).cache()
        self.df_asin_bs.show(10)

        sql = f"select asin, asin_title, asin_price " \
              f"from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type.replace('_old', '')}' and date_info='{self.date_info}';"
        self.df_asin_detail = self.spark.sql(sql).cache()

        self.df_asin_detail.show(10)
        print("4 读取bsr维度: ods_one_category_report表")
        print("4.1 读取ods_one_category_report表")

        if int(self.year) == 2022 and int(self.month) < 3:
            sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, ceil(orders*{self.orders_transform_rate}) as asin_bsr_orders from ods_one_category_report " \
                  f"where site_name='{self.site_name}' and date_type='month' and date_info='2022-12';"
        else:
            sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, ceil(orders*{self.orders_transform_rate}) as asin_bsr_orders from ods_one_category_report " \
                  f"where site_name='{self.site_name}' and date_type='month' and date_info='{self.year}-{self.month}';"
        print("sql:", sql)
        self.df_bs_report = self.spark.sql(sqlQuery=sql)
        self.df_bs_report.persist(StorageLevel.MEMORY_ONLY)
        self.df_bs_report.show(10, truncate=False)
        print("5 读取asin维度-体积信息: dim_asin_volume_info表")
        sql = f"select asin, asin_length * asin_width * asin_height as asin_volume, asin_weight from dim_asin_stable_info where site_name='{self.site_name}'"
        print("sql:", sql)
        self.df_asin_stable = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_stable.show(10, truncate=False)

    def save_data(self):
        self.reset_partitions(partitions_num=50)
        self.save_data_common(
            df_save=self.df_save_st_asin,
            db_save=self.db_save_st_asin,
            partitions_num=self.partitions_num,
            partitions_by=self.partitions_by
        )
        self.reset_partitions(partitions_num=5)
        self.save_data_common(
            df_save=self.df_save_st,
            db_save=self.db_save_st,
            partitions_num=self.partitions_num,
            partitions_by=self.partitions_by
        )
        self.reset_partitions(partitions_num=10)
        self.save_data_common(
            df_save=self.df_save_asin,
            db_save=self.db_save_asin,
            partitions_num=self.partitions_num,
            partitions_by=self.partitions_by
        )

    def handle_data(self):
        self.handle_join()
        self.df_save_asin = self.handle_st_asin_counts(cal_type="asin", df_templates=self.df_asin_templates)
        self.df_save_st = self.handle_st_asin_counts(cal_type="st", df_templates=self.df_st_templates)
        self.handle_st_zr_page1_title_rate()
        self.handle_st_asin_orders()  # 预估销量和bsr销量
        self.handle_st_asin_ao()
        self.handle_st_num()
        self.handle_st_weight_price_volume()
        # self.df_save_st.filter("search_term='abiie high chair'").show(10, truncate=False)
        # self.df_save_st_asin.filter("search_term='abiie high chair'").show(100, truncate=False)
        # self.df_save_st_asin.show(10, truncate=False)
        # self.df_save_st.show(10, truncate=False)
        del self.df_st_asin_duplicated
        del self.df_st_asin
        # self.df_save_asin.show(10, truncate=False)
        # quit()

    def handle_st_attributes(self, df, attributes_type='asin_volume'):
        # 定义窗口函数
        window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}"))

        # 计算百分比排名并筛选 <= 0.25 的记录
        df = df.select("search_term", f"{attributes_type}").filter(f'{attributes_type} is not null') \
            .withColumn(f"{attributes_type}_percent_rank", F.percent_rank().over(window)) \
            .filter(f'{attributes_type}_percent_rank <= 0.25') \

        # 使用 row_number() 方法获取每个 search_term 的最大百分比排名记录
        window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}_percent_rank"))
        df = df.withColumn(f"{attributes_type}_row_number", F.row_number().over(window)) \
            .filter(f'{attributes_type}_row_number = 1')

        # 显示结果
        df = df.drop(f"{attributes_type}_percent_rank", f"{attributes_type}_row_number")
        df = df.withColumnRenamed(f"{attributes_type}", f"{attributes_type.replace('asin', 'st')}_25_percent")
        df.show(10, truncate=False)

        return df

    def handle_st_weight_price_volume(self):
        # self.df_st_asin_duplicated = self.df_st_asin_duplicated.drop_duplicates(['search_term', 'asin']).cache()
        df_st_asin = self.df_st_asin_duplicated.select('search_term', 'asin').cache()
        df_asin_label = self.df_asin_detail.select("asin", "asin_price", "asin_weight", "asin_volume").cache()

        df_st_asin = df_st_asin.join(
            df_asin_label, on='asin', how='inner'
        )
        df_st_volume = self.handle_st_attributes(df=df_st_asin, attributes_type='asin_volume')
        df_st_price = self.handle_st_attributes(df=df_st_asin, attributes_type='asin_price')
        df_st_weight = self.handle_st_attributes(df=df_st_asin, attributes_type='asin_weight')
        df_st_min = df_st_asin.groupby(['search_term']).agg(
            F.min("asin_volume").alias('st_volume_min'),
            F.min("asin_price").alias('st_price_min'),
            F.min("asin_weight").alias('st_weight_min')
        )
        df_st_min = df_st_min.join(
            df_st_volume, on='search_term', how='left'
        ).join(
            df_st_price, on='search_term', how='left'
        ).join(
            df_st_weight, on='search_term', how='left'
        )
        df_st_min = df_st_min.withColumn(
            "st_volume_avg",
            1.5 * (df_st_min.st_volume_25_percent - df_st_min.st_volume_min) + df_st_min.st_volume_min
        ).withColumn(
            "st_price_avg",
            1.5 * (df_st_min.st_price_25_percent - df_st_min.st_price_min) + df_st_min.st_price_min
        ).withColumn(
            "st_weight_avg",
            1.5 * (df_st_min.st_weight_25_percent - df_st_min.st_weight_min) + df_st_min.st_weight_min
        )
        df_st_min.show(10, truncate=False)
        self.df_save_st = self.df_save_st.join(
            df_st_min, on='search_term', how='left'
        )

    def handle_join(self):
        # st+asin
        self.df_st_asin = self.df_st_asin.join(
            self.df_st_asin_flow, on=['page_rank'], how='left'
        )
        # st -- dim_st_detail已经有
        # asin
        self.df_asin_bs = self.df_asin_bs.join(
            self.df_bs_report, on=['asin_bs_cate_1_rank', 'asin_bs_cate_1_id'], how='left'
        )
        self.df_asin_detail = self.df_asin_detail.join(
            self.df_asin_bs, on='asin', how='left'
        ).join(
            self.df_asin_stable, on='asin', how='left'
        )
        # 合并
        self.df_st_asin = self.df_st_asin.join(
            self.df_st, on=['search_term'], how='left'
        ).join(
            self.df_asin_detail, on=['asin'], how='left'
        )
        # self.df_st_asin.show(10, truncate=False)
        # self.df_st_asin = self.df_st_asin.drop_duplicates(['search_term', 'asin', 'data_type'])
        self.df_st_asin = self.df_st_asin.cache()
        self.df_st_asin_duplicated = self.df_st_asin.drop_duplicates(['search_term', 'asin', 'data_type']).cache()
        # self.df_st_asin.persist(StorageLevel.MEMORY_ONLY)

    def handle_st_asin_counts(self, cal_type="asin", df_templates=None):
        print(f"计算{cal_type}_counts")
        cal_type_complete = "search_term" if cal_type == "st" else cal_type
        self.df_st_asin_duplicated = self.df_st_asin_duplicated.withColumn(
            f"{cal_type}_data_type",
            F.concat(F.lit(f"{cal_type}_"), self.df_st_asin_duplicated.data_type, F.lit(f"_counts"))
        )
        df = self.df_st_asin_duplicated.groupby([f'{cal_type_complete}']). \
            pivot(f"{cal_type}_data_type").count()

        df = df_templates.unionByName(df, allowMissingColumns=True)  # 防止爬虫数据没有导致程序运行出错
        df = df.fillna(0)
        # df.show(10, truncate=False)
        df = df.withColumn(
            f"{cal_type}_sb_counts",
            df[f"{cal_type}_sb1_counts"] + df[f"{cal_type}_sb2_counts"] + df[f"{cal_type}_sb3_counts"]
        )
        df = df.withColumn(
            f"{cal_type}_adv_counts",
            df[f"{cal_type}_sb_counts"] + df[f"{cal_type}_sp_counts"]
        )
        df = df.withColumn(f"site_name", F.lit(self.site_name))
        df = df.withColumn(f"date_type", F.lit(self.date_type))
        df = df.withColumn(f"date_info", F.lit(self.date_info))
        # df.show(10, truncate=False)
        return df

    def handle_st_zr_page1_title_rate(self):
        print("计算关键词的zr类型page=1的去重asin的标题密度")
        df_zr_page1 = self.df_st_asin.filter(
            "data_type='zr' and page=1"
        )
        df_zr_page1 = df_zr_page1.select("search_term", "asin", "asin_title").drop_duplicates(["search_term", "asin"])
        df_zr_page1 = df_zr_page1.withColumn(
            "st_asin_in_title_flag",
            self.u_is_title_appear(df_zr_page1.search_term, df_zr_page1.asin_title)
        )
        # df_zr_page1.show(10, truncate=False)
        df_zr_page1 = df_zr_page1.groupby(['search_term']).agg(
            {
                "search_term": "count",
                "st_asin_in_title_flag": "sum",
            }
        )
        df_zr_page1 = df_zr_page1.withColumnRenamed(
            "sum(st_asin_in_title_flag)", "st_zr_page1_title_appear_counts"
        ).withColumnRenamed(
            "count(search_term)", "st_zr_page1_title_counts"
        )
        df_zr_page1 = df_zr_page1.withColumn(
            "st_zr_page1_title_appear_rate", df_zr_page1.st_zr_page1_title_appear_counts / df_zr_page1.st_zr_page1_title_counts
        )
        self.df_save_st = self.df_save_st.join(
            df_zr_page1, on=['search_term'], how='left'
        )
        # df_zr_page1.show(10, truncate=False)
        # quit()
        del df_zr_page1

    def handle_st_asin_orders(self):
        # 预估销量+bsr销量
        print("1. 预估销量:zr, sp的销量")
        # 1.1 st+asin
        self.df_st_asin = self.df_st_asin.withColumn(
            "st_asin_orders",
            F.ceil(self.df_st_asin.flow * self.df_st_asin.st_search_sum * self.orders_transform_rate)
        )
        self.df_save_st_asin = self.df_st_asin.withColumn(
            "st_asin_orders_data_type", F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_orders"))
        )
        self.df_save_st_asin = self.df_save_st_asin.groupby(["search_term", "asin"]). \
            pivot("st_asin_orders_data_type").agg(F.mean(f"st_asin_orders"))
        self.df_save_st_asin = self.df_save_st_asin.select(
            "search_term", "asin", "st_asin_zr_orders", "st_asin_sp_orders"
        )
        # self.df_save_st_asin = self.df_save_st_asin.cache()
        self.df_save_st_asin.persist(StorageLevel.MEMORY_ONLY)
        self.df_save_st_asin = self.df_save_st_asin.withColumn(f"site_name", F.lit(self.site_name))
        self.df_save_st_asin = self.df_save_st_asin.withColumn(f"date_type", F.lit(self.date_type))
        self.df_save_st_asin = self.df_save_st_asin.withColumn(f"date_info", F.lit(self.date_info))
        # self.df_save_st_asin.show(10, truncate=False)
        # 1.2 st维度的zr和sp预估销量
        df_st_orders = self.df_save_st_asin.groupby(['search_term']).agg(
            F.sum('st_asin_zr_orders').alias("st_zr_orders"),
            F.sum('st_asin_sp_orders').alias("st_sp_orders"),
            # F.sum('st_asin_zr_orders').alias("st_zr_orders"),
            # F.sum('st_asin_sp_orders').alias("st_zr_orders"),
        )
        # df_st_orders = df_st_orders.withColumnRenamed(
        #     "sum(st_asin_zr_orders)", "st_zr_orders"
        # ).withColumnRenamed(
        #     "sum(st_asin_sp_orders)", "st_sp_orders"
        # )
        self.df_save_st = self.df_save_st.join(
            df_st_orders, on=['search_term'], how='left'
        )
        # 1.3 asin维度的zr和sp预估销量
        df_asin_orders = self.df_save_st_asin.groupby(['asin']).agg(
            F.mean('st_asin_zr_orders').alias("asin_zr_orders"),
            F.mean('st_asin_sp_orders').alias("asin_sp_orders"),
            F.sum('st_asin_zr_orders').alias("asin_zr_orders_sum"),
            F.sum('st_asin_sp_orders').alias("asin_sp_orders_sum"),
        )

        # df_asin_orders = df_asin_orders.withColumnRenamed(
        #     "avg(st_asin_zr_orders)", "asin_zr_orders"
        # ).withColumnRenamed(
        #     "avg(st_asin_sp_orders)", "asin_sp_orders"
        # ).withColumnRenamed(
        #     "sum(st_asin_zr_orders)", "asin_zr_orders_sum"
        # ).withColumnRenamed(
        #     "sum(st_asin_sp_orders)", "asin_sp_orders_sum"
        # )
        self.df_save_asin = self.df_save_asin.join(
            df_asin_orders, on=['asin'], how='left'
        )
        # 向上取整
        self.df_save_asin = self.df_save_asin.withColumn(
            "asin_zr_orders", F.ceil(self.df_save_asin.asin_zr_orders)
        ).withColumn(
            "asin_sp_orders", F.ceil(self.df_save_asin.asin_sp_orders)
        ).withColumn(
            "asin_zr_orders_sum", F.ceil(self.df_save_asin.asin_zr_orders_sum)
        ).withColumn(
            "asin_sp_orders_sum", F.ceil(self.df_save_asin.asin_sp_orders_sum)
        )

        print("2. bsr销量")
        # 2.1 st_bsr_orders
        df_st_bsr_orders = self.df_st_asin.select("search_term", "asin", "asin_bsr_orders").drop_duplicates(["search_term", "asin"])
        df_st_bsr_orders = df_st_bsr_orders.groupby(['search_term']).agg({"asin_bsr_orders": "sum"})
        df_st_bsr_orders = df_st_bsr_orders.withColumnRenamed(
            "sum(asin_bsr_orders)", "st_bsr_orders"
        )
        # 2.2 asin_bsr_orders
        df_asin_bsr_orders = self.df_st_asin.select("asin", "asin_bsr_orders").drop_duplicates(['asin'])
        # df_st_bsr_orders.show(10, truncate=False)
        # df_asin_bsr_orders.show(10, truncate=False)
        self.df_save_st = self.df_save_st.join(
            df_st_bsr_orders, on='search_term', how='left'
        )
        self.df_save_asin = self.df_save_asin.join(
            df_asin_bsr_orders, on='asin', how='left'
        )

    def handle_st_asin_ao(self):
        print("计算st和asin各自维度的ao")
        # asin_ao_val和asin_ao_val_rate
        self.df_save_asin = self.df_save_asin.withColumn(
            "asin_ao_val", self.df_save_asin.asin_adv_counts / self.df_save_asin.asin_zr_counts
        )
        # self.df_save_asin = self.df_save_asin.fillna({"asin_ao_val": 0})  # 不要把null置为0, null值产生原因是zr类型没有搜到对应的搜索词
        window = Window.orderBy(self.df_save_asin.asin_ao_val.asc_nulls_last())
        self.df_save_asin = self.df_save_asin.withColumn("asin_ao_val_rate", F.percent_rank().over(window=window))
        # st_ao_val和st_ao_val_rate
        df_asin_ao = self.df_save_asin.select("asin", "asin_ao_val")
        df_st_ao = self.df_st_asin_duplicated.filter("data_type='zr'").select("search_term", "asin").join(
            df_asin_ao, on=['asin'], how='left'
        )
        # 新增关键词对应asin的ao升序排序,前4——20的均值
        window = Window.partitionBy(['search_term']).orderBy(df_st_ao.asin_ao_val.asc_nulls_last())
        # df_st_ao = df_st_ao.withColumn("asin_ao_val_rank", F.dense_rank().over(window=window))
        df_st_ao = df_st_ao.withColumn("asin_ao_val_rank", F.row_number().over(window=window))
        df_st_ao_4_20 = df_st_ao.filter("asin_ao_val_rank between 4 and 20").select("search_term", "asin_ao_val")
        df_st_ao_4_20 = df_st_ao_4_20.groupby(["search_term"]).agg(F.mean(df_st_ao_4_20.asin_ao_val).alias("st_4_20_ao_avg"))
        # df_st_ao.filter("search_term='donna reed'").show(100, truncate=False)
        # df_st_ao_4_20.filter("search_term='donna reed'").show(100, truncate=False)
        # quit()

        df_st_ao = df_st_ao.groupby(["search_term"]).agg({"asin_ao_val": "mean"})
        df_st_ao = df_st_ao.withColumnRenamed("avg(asin_ao_val)", "st_ao_val")

        # 新增关键词对应zr排名在4-20asin的ao均值
        # df_st_ao_4_20 = self.df_save_asin.filter("asin_ao_val_rank between 4 and 20").select("search_term", "asin_ao_val")
        # df_st_ao_4_20 = df_st_ao_4_20.groupby(["search_term"]).agg(F.mean(df_st_ao_4_20.asin_ao_val).alias("st_4_20_ao_avg"))
        # df_st_ao_4_20 = self.df_st_asin_duplicated.filter("data_type='zr' and page_rank<=20").select("search_term", "asin").join(
        #     df_asin_ao, on=['asin'], how='left'
        # )
        # df_st_ao_4_20 = df_st_ao_4_20.groupby(["search_term"]).agg(F.mean(df_st_ao_4_20.asin_ao_val).alias("st_4_20_ao_avg"))
        self.df_save_st = self.df_save_st.join(
            df_st_ao, on=['search_term'], how='left'
        ).join(
            df_st_ao_4_20, on=['search_term'], how='left'
        )
        window = Window.orderBy(self.df_save_st.st_ao_val.asc())
        self.df_save_st = self.df_save_st.withColumn("st_ao_val_rate", F.percent_rank().over(window=window))

        self.df_save_asin = self.df_save_asin.drop("asin_ao_val_rank")

    def handle_st_num(self):
        df_num = self.df_brand_analytics.drop_duplicates(['search_term', 'date_info'])
        df_num = df_num.groupby(['search_term']).count()
        df_num = df_num.withColumnRenamed("count", "st_num")
        # self.df_save_st = self.df_save_st.withColumn("st_num", F.lit(1))
        self.df_save_st = self.df_save_st.join(
            df_num, on=['search_term'], how='left'
        )


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:day/week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwdStMeasure(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()