import os
import sys
import time

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from utils.db_util import DBUtil
from utils.spark_util import SparkUtil
from yswg_utils.common_udf import udf_detect_phrase_reg


class DwtAbaStAnalytics(Templates):

    def __init__(self, site_name="us", date_type="week", date_info="2022-40"):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f"dwt_aba_st_analytics"
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")

        # 写入、分区初始化
        self.df_save = self.spark.sql(f"select 1+1;")
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.reset_partitions(partitions_num=10)

        # 初始化列表
        self.sp_symbols = []

        # 初始化全局df
        self.df_st_measure = self.spark.sql(f"select 1+1;")
        self.df_asin_measure = self.spark.sql(f"select 1+1;")
        self.df_st_asin_measure = self.spark.sql(f"select 1+1;")
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_seller_asin_info = self.spark.sql(f"select 1+1;")
        self.df_st_asin_cal = self.spark.sql(f"select 1+1;")
        self.df_st_asin_join = self.spark.sql(f"select 1+1;")
        self.df_seller_asin_country = self.spark.sql(f"select 1+1;")
        self.df_st_brand_cal = self.spark.sql(f"select 1+1;")
        self.df_top3_st_brand_cal = self.spark.sql(f"select 1+1;")
        self.df_st_seller_cal = self.spark.sql(f"select 1+1;")
        self.df_top3_st_seller_cal = self.spark.sql(f"select 1+1;")
        self.df_st_num_stats = self.spark.sql(f"select 1+1;")
        self.df_st_detail = self.spark.sql(f"select 1+1;")
        self.df_st_key = self.spark.sql(f"select 1+1;")
        self.df_st_market = self.spark.sql(f"select 1+1;")
        self.df_st_volume_fba = self.spark.sql(f"select 1+1;")
        self.df_st_brand = self.spark.sql(f"select 1+1;")
        self.df_asin_label = self.spark.sql(f"select 1+1;")
        self.df_is_hidden_cate = self.spark.sql(f"select 1+1;")

        # 自定义udf函数注册
        self.u_contains = self.spark.udf.register('u_contains', self.udf_contains, IntegerType())
        self.u_judge_color = self.spark.udf.register('u_judge_color', self.udf_judge_color, IntegerType())
        self.u_judge_title_color = self.spark.udf.register(
            'u_judge_title_color', self.udf_judge_title_color, IntegerType()
        )
        self.u_judge_multi_size = self.spark.udf.register(
            'u_judge_multi_size', self.udf_judge_multi_size, IntegerType()
        )

    # 解析aba搜索词的拆词个数
    def st_word_count(self, sp_symbols):
        def udf_st_word_count(name):
            # 特殊字符基准列表---迁移到数据库维护 -已处理
            # sp_symbols = ['?', '!', '-', '%', '&', '|']
            split_list = name.split(" ")
            # 取切割list中包含的特殊字符
            sp_list = list(set(split_list).intersection(set(sp_symbols)))
            # 排除掉特殊list中的特殊字符
            word_list = list(filter(lambda x: x not in sp_list, split_list))
            word_count = len(word_list)
            # 存在多个特殊字符都设定为 1
            if len(sp_list) > 0:
                symbol_count = 1
            else:
                symbol_count = 0
            return word_count + symbol_count

        return F.udf(udf_st_word_count, IntegerType())

    @staticmethod
    def udf_contains(sub, text):
        if text is None:
            return None
        if str(sub).lower() in str(text).lower():
            return 1
        else:
            return 0

    # @staticmethod
    # def udf_get_volume(volume):
    #     # print("get_volume", volume)
    #     volume = str(volume)
    #     if volume == "null":
    #         return None
    #     else:
    #         pattern = r"\d+\.?\d*"
    #         volumeList = re.findall(pattern, volume)
    #         if len(volumeList):
    #             volumeList = list(map(float, volumeList))
    #             result = reduce((lambda x, y: x * y), volumeList)
    #             return result
    #         else:
    #             return None

    @staticmethod
    def udf_judge_color(color):
        if color is None:
            return None
        color = str(color).lower()
        color_len = len(color)
        if color not in ['null', 'none'] and color_len > 1:
            return 1
        else:
            return 0

    @staticmethod
    def udf_judge_title_color(asin_title):
        if asin_title is None:
            return None
        title = str(asin_title).lower()
        modeTypes = ['colorful', 'assorted color', 'multi color']
        for color in modeTypes:
            if color in title:
                return 1
        return 0

    @staticmethod
    def udf_judge_multi_size(size, style):
        size = str(size).lower()
        style = str(style).lower()
        # 变体表中即有size又有style时,取size进行计数。如果无size,则判断是否有style进行计数
        if size not in ['none', 'null']:
            return 1
        else:
            if style not in ['none', 'null']:
                return 1
        return 0

    def read_data(self):
        # 一些不涵盖month_old的分区,重定义成month,其他正常
        spe_date_type = 'month' if 'month_old' == self.date_type else self.date_type

        # 获取ods_st_key, st唯一主键
        sql = f"""
        select 
            search_term,
            cast(st_key as int) as id 
        from ods_st_key 
        where site_name = '{self.site_name}' 
        """
        self.df_st_key = self.spark.sql(sqlQuery=sql)
        self.df_st_key = self.df_st_key.repartition(80, 'search_term').cache()
        print("self.df_st_key:")
        self.df_st_key.show(10, truncate=True)

        # 获取dwd_st_measure 事实表
        sql = f"""
        select 
            search_term,
            st_zr_orders,
            st_bsr_orders,
            st_ao_val                     as st_ao_avg,
            st_ao_val_rate,
            st_zr_page1_title_appear_rate as page1_title_proportion,
            null                          as st_4_20_ao_avg,
            null                          as st_4_20_ao_rate,
            st_volume_avg                 as volume_avg,
            st_weight_avg                 as weight_avg,
            st_price_avg                  as price_avg,
            st_zr_page123_title_appear_rate,
            st_sp_page123_title_appear_rate,
            st_zr_flow_proportion,
            st_ao_val_matrix,
            st_flow_proportion_matrix,
            st_zr_counts,
            st_sp_counts,
            st_self_asin_counts,
            st_self_asin_proportion
        from dwd_st_measure
        where site_name = '{self.site_name}' 
        and date_type = '{self.date_type}' 
        and date_info = '{self.date_info}' 
        """
        self.df_st_measure = self.spark.sql(sqlQuery=sql)
        self.df_st_measure = self.df_st_measure.repartition(80, 'search_term').cache()
        print("self.df_st_measure:")
        self.df_st_measure.show(10, truncate=True)

        # 获取dwd_st_asin_measure 事实表
        sql = f"""
        select 
            search_term,
            asin 
        from dwd_st_asin_measure 
        where site_name = '{self.site_name}' 
        and date_type = '{self.date_type}' 
        and date_info = '{self.date_info}'
        """
        self.df_st_asin_measure = self.spark.sql(sqlQuery=sql)
        self.df_st_asin_measure = self.df_st_asin_measure.repartition(80, 'asin').cache()
        print("self.df_st_asin_measure:")
        self.df_st_asin_measure.show(10, truncate=True)

        # 获取dwd_asin_measure 事实表
        sql = f"""
        select 
            asin,
            asin_bsr_orders,
            asin_zr_orders,
            asin_amazon_orders 
        from dwd_asin_measure 
        where site_name = '{self.site_name}' 
        and date_type = '{self.date_type}' 
        and date_info = '{self.date_info}' 
        """
        self.df_asin_measure = self.spark.sql(sqlQuery=sql)
        self.df_asin_measure = self.df_asin_measure.repartition(80, 'asin').cache()
        print("self.df_asin_measure:")
        self.df_asin_measure.show(10, truncate=True)

        # 获取dim_asin_detail表
        sql = f"""
        select 
            asin, 
            asin_title, 
            asin_title_len, 
            asin_category_desc,
            asin_rank, 
            asin_color, 
            asin_size, 
            asin_style, 
            asin_price, 
            asin_rating,
            asin_total_comments, 
            asin_material, 
            asin_brand_name,
            bsr_cate_1_id, 
            asin_buy_box_seller_type, 
            asin_is_amazon,
            asin_is_fba, 
            asin_is_fbm, 
            asin_is_other, 
            asin_is_sale,
            asin_launch_time, 
            asin_is_new, 
            asin_img_num, 
            asin_img_type,
            asin_is_picture, 
            asin_is_video, 
            asin_is_aadd 
        from dim_asin_detail  
        where site_name = '{self.site_name}' 
        and date_type = '{spe_date_type}' 
        and date_info = '{self.date_info}' 
        """
        self.df_asin_detail = self.spark.sql(sqlQuery=sql)
        self.df_asin_detail = self.df_asin_detail.repartition(80, 'asin').cache()
        print("self.df_asin_detail:")
        self.df_asin_detail.show(10, truncate=True)

        # 仅获取 asin和country_name,对country_name进行了聚合处理
        sql = f"""
        select 
            asin,
            concat_ws(\",\",collect_list(cast(fd_country_name as string))) as country_name 
        from dim_fd_asin_info  
        where site_name = '{self.site_name}'  
        group by asin;
        """
        self.df_seller_asin_country = self.spark.sql(sqlQuery=sql)
        self.df_seller_asin_country = self.df_seller_asin_country.repartition(80, 'asin').cache()
        print("self.df_seller_asin_country:")
        self.df_seller_asin_country.show(10, truncate=True)

        # 获取 dim_fd_asin_info 表
        sql = f"""
        select 
            asin,
            fd_unique as account_id,
            fd_country_name as country_name 
        from dim_fd_asin_info 
        where site_name = '{self.site_name}' 
        """
        self.df_seller_asin_info = self.spark.sql(sqlQuery=sql)
        self.df_seller_asin_info = self.df_seller_asin_info.drop_duplicates(['asin']).repartition(80, 'asin').cache()
        print("self.df_seller_asin_info:")
        self.df_seller_asin_info.show(10, truncate=True)

        # 获取 dim_st_detail asin1-3共享点击信息表
        sql = f"""
        select 
            search_term,
            st_rank                                                          as rank,
            st_asin1                                                         as asin1,
            st_asin2                                                         as asin2,
            st_asin3                                                         as asin3,
            st_click_share1                                                  as click_share1,
            st_click_share2                                                  as click_share2,
            st_click_share3                                                  as click_share3,
            st_click_share_sum                                               as total_click_share,
            st_is_new_market_segment                                         as is_new_market_segment,
            st_conversion_share1                                             as conversion_share1,
            st_conversion_share2                                             as conversion_share2,
            st_conversion_share3                                             as conversion_share3,
            st_conversion_share_sum                                          as total_conversion_share,
            st_quantity_being_sold                                           as quantity_being_sold,
            cast(st_bsr_cate_1_id as int)                                    as category_id,
            st_search_num                                                    as search_volume,
            st_is_first_text                                                 as is_first_text,
            st_is_ascending_text                                             as is_ascending_text,
            st_is_search_text                                                as is_search_text,
            cast(st_bsr_cate_current_id as int)                              as category_current_id,
            st_appear_history_counts                                         as st_num,
            cast((st_quantity_being_sold / st_search_num) as decimal(10, 3)) as supply_demand,
            st_brand1,
            st_category1,
            st_brand2,
            st_category2,
            st_brand3,
            st_category3,
            st_bsr_cate_1_id_new,
            st_bsr_cate_current_id_new,
            if(st_appear_history_counts>=4 and (st_click_share_sum > st_conversion_share_sum),1,0) as is_high_return_text,
            date_format(st_updated_time, 'yyyy-MM-dd HH:mm:ss')              as st_crawl_date,
            st_competition_level
        from dim_st_detail
        where site_name = '{self.site_name}' 
        and date_type = '{self.date_type}' 
        and date_info = '{self.date_info}'
        """
        self.df_st_detail = self.spark.sql(sqlQuery=sql)
        self.df_st_detail = self.df_st_detail.repartition(80, 'search_term').cache()
        print("self.df_st_detail:")
        self.df_st_detail.show(10, truncate=True)

        # 获取dws_st_num_stats表 取max_num、most_proportion
        sql = f""" 
        select 
            search_term,
            cast(max_num as double) as max_num,
            most_proportion,
            max_num_asin,
            is_self_max_num_asin 
        from dws_st_num_stats 
        where site_name = '{self.site_name}' 
        and date_type = '{self.date_type}' 
        and date_info = '{self.date_info}'
        and max_num_asin is not null 
        """
        self.df_st_num_stats = self.spark.sql(sqlQuery=sql)
        self.df_st_num_stats = self.df_st_num_stats.repartition(80, 'search_term').cache()
        print("self.df_st_num_stats:")
        self.df_st_num_stats.show(10, truncate=True)

        # 获取dwt_st_market表 取market_cycle_type
        sql = f"""
        select 
            search_term,
            cast(market_cycle_type as int ) as market_cycle_type 
        from dwt_st_market 
        where site_name = '{self.site_name}' 
        and date_type = '{self.date_type}' 
        and date_info = '{self.date_info}'
        """
        self.df_st_market = self.spark.sql(sqlQuery=sql)
        self.df_st_market = self.df_st_market.repartition(80, 'search_term').cache()
        print("self.df_st_market:")
        self.df_st_market.show(10, truncate=True)

        # 获取dwd_st_volume_fba 取gross_profit_fee_air 和 gross_profit_fee_sea
        sql = f"""
        select 
            search_term,
            gross_profit_fee_air,
            gross_profit_fee_sea 
        from dwd_st_volume_fba 
        where site_name = '{self.site_name}' 
        and date_type = '{self.date_type}' 
        and date_info = '{self.date_info}' 
        """
        self.df_st_volume_fba = self.spark.sql(sqlQuery=sql)
        self.df_st_volume_fba = self.df_st_volume_fba.repartition(80, 'search_term').cache()
        print("self.df_st_volume_fba:")
        self.df_st_volume_fba.show(10, truncate=True)

        # 获取影视标签dim_asin_label 取 asin_label_type
        sql = f"""
        select 
            asin, 
            asin_label_type 
        from dim_asin_label 
        where site_name = '{self.site_name}' 
        and date_type = '{self.date_type}' 
        and date_info = '{self.date_info}' 
        """
        self.df_asin_label = self.spark.sql(sqlQuery=sql)
        self.df_asin_label = self.df_asin_label.repartition(80, 'asin').cache()
        print("self.df_asin_label:")
        self.df_asin_label.show(10, truncate=True)

        # 获取品牌词库
        sql = f"""
        select 
            search_term,
            st_brand_label 
        from dws_st_brand_info
        where site_name = '{self.site_name}'
        and date_type = '{self.date_type}'
        and date_info = '{self.date_info}'
        and st_brand_label = 1
        """
        self.df_st_brand = self.spark.sql(sqlQuery=sql)
        self.df_st_brand = self.df_st_brand.repartition(80, 'search_term').cache()
        print("self.df_st_brand:")
        self.df_st_brand.show(10, truncate=True)

        # 从pgsql获取特殊字符匹配字典表:match_character_dict
        pg_sql = f"""
        select 
            character_name 
        from match_character_dict 
        where match_type = '特殊字符'
        """
        conn_info = DBUtil.get_connection_info("mysql", "us")
        chart_dict_df = SparkUtil.read_jdbc_query(
            session=self.spark,
            url=conn_info["url"],
            pwd=conn_info["pwd"],
            username=conn_info["username"],
            query=pg_sql
        )
        # 将数据转换成pandas df
        dict_df = chart_dict_df.toPandas()
        # 提取特殊字符list
        self.sp_symbols = dict_df["character_name"].values.tolist()

        # 隐藏分类
        # Apps & Games、Audible Books & Originals、Books、CDs & Vinyl、Digital Music、Kindle Store、Movies & TV、Software
        sql = f"""
        select 
            category_id as st_bsr_cate_1_id_new, 
            1 as is_hidden_cate 
        from dim_bsr_category_tree
        where site_name = '{self.site_name}'
          and en_name in ("Apps & Games", "Audible Books & Originals", "Books", "CDs & Vinyl", "Digital Music", "Kindle Store", "Movies & TV", "Software")
          and category_parent_id = 0;
        """
        self.df_is_hidden_cate = self.spark.sql(sqlQuery=sql)
        self.df_is_hidden_cate = self.df_is_hidden_cate.repartition(80).cache()
        print("self.df_is_hidden_cate:")
        self.df_is_hidden_cate.show(10, truncate=True)

    def handle_data(self):
        # 对基础计算表进行关联
        self.handle_base_join()

        # 对st_asin按st进行指标聚合
        self.handle_st_agg()

        # 对品牌和卖家按st进行指标聚合
        self.handle_brand_seller_agg()

        # 计算最终指标
        self.handle_st_cal()

        # 语种处理
        self.handle_calc_lang()

        # 处理输出字段
        self.handle_column()

    def handle_base_join(self):
        self.df_st_asin_join = self.df_st_asin_measure.join(
            self.df_asin_measure, on=['asin'], how='left'
        ).join(
            self.df_asin_detail, on=['asin'], how='left'
        ).join(
            self.df_asin_label, on=['asin'], how='left'
        ).cache()
        self.df_st_asin_measure.unpersist()
        self.df_asin_measure.unpersist()
        self.df_asin_detail.unpersist()
        self.df_asin_label.unpersist()

        self.df_st_asin_cal = self.df_st_asin_join.join(
            self.df_seller_asin_country, on=['asin'], how='left'
        )

        # 计算品牌相关指标df
        self.df_st_brand_cal = self.df_st_asin_join

        # 计算卖家相关指标df
        self.df_st_seller_cal = self.df_st_asin_join.join(
            self.df_seller_asin_info, on=['asin'], how='left'
        )

    def handle_st_agg(self):
        self.df_st_asin_cal = self.df_st_asin_cal.withColumn(
            # 打上是否中国卖家标签
            "asin_is_cn",
            self.u_contains(F.lit('CN'), F.col("country_name"))
        ).withColumn(
            # 新品bsr销量
            "asin_is_new_bsr_orders",
            F.when(F.col("asin_is_new") == 1, F.col("asin_bsr_orders"))
        ).withColumn(
            # 新品zr销量
            "asin_is_new_zr_orders",
            F.when(F.col("asin_is_new") == 1, F.col("asin_zr_orders"))
        ).withColumn(
            # 标记是否有颜色标签
            "asin_is_color_flag",
            self.u_judge_color(F.col("asin_color"))
        ).withColumn(
            # 标记标题中是否出现多色关键词
            "asin_is_multi_color",
            self.u_judge_title_color(F.col("asin_title"))
        ).na.fill({
            # 判断是否多尺寸时,先对多尺寸判断字段进行空值处理
            "asin_size": "None",
            "asin_style": "None"
        }).withColumn(
            # 判断是否多尺寸
            "asin_is_multi_size",
            self.u_judge_multi_size(F.col("asin_size"), F.col("asin_style"))
        ).withColumn(
            # 增加判断是否是影视产品标签
            "asin_is_movie_flag",
            F.when(F.col("asin_label_type") == 1, F.lit(1))
        )

        # group by 按search_term 聚合
        self.df_st_asin_cal = self.df_st_asin_cal.groupby(['search_term']).agg(
            F.count("asin").alias("asin_count"),
            F.sum("asin_is_new").alias("asin_is_new_total"),
            F.sum("asin_is_aadd").alias("asin_aadd_count"),
            F.sum("asin_is_video").alias("asin_video_count"),
            F.sum("asin_is_fbm").alias("asin_fbm_count"),
            F.sum("asin_is_amazon").alias("asin_amazon_count"),
            F.sum("asin_is_cn").alias("asin_cn_count"),
            F.sum("asin_is_new_bsr_orders").alias("new_asin_bsr_orders"),
            F.sum("asin_is_new_zr_orders").alias("new_asin_orders"),
            F.sum("asin_is_color_flag").alias("asin_color_count"),
            F.sum("asin_is_multi_color").alias("asin_multi_color_count"),
            F.sum("asin_is_multi_size").alias("asin_multi_size_count"),
            F.sum("asin_is_movie_flag").alias("asin_movie_type_count"),
            F.sum("asin_amazon_orders").alias("amazon_monthly_sales"),
            F.avg("asin_title_len").alias("title_length_avg"),
            F.avg("asin_rating").alias("rating_avg"),
            F.avg("asin_total_comments").alias("total_comments_avg")
        ).repartition(80, 'search_term').cache()

    def handle_brand_seller_agg(self):
        # 计算品牌top3销量和总销量
        self.df_st_brand_cal = self.df_st_brand_cal.filter("asin_brand_name is not null")
        self.df_st_brand_cal = self.df_st_brand_cal.filter("asin_brand_name not in('null','None')")
        self.df_st_brand_cal = self.df_st_brand_cal.groupby(['search_term', 'asin_brand_name']).agg(
            F.sum("asin_bsr_orders").alias("asin_brand_bsr_orders_total"),
            F.sum("asin_zr_orders").alias("asin_brand_zr_orders_total")
        )

        self.df_top3_st_brand_cal = self.df_st_brand_cal
        # top3品牌bsr销量
        brand_bsr_window = Window.partitionBy(["search_term"]).orderBy(
            self.df_top3_st_brand_cal.asin_brand_bsr_orders_total.desc_nulls_last()
        )
        df_st_brand_top3_bsr_orders = self.df_top3_st_brand_cal.withColumn(
            "brand_rank",
            F.row_number().over(window=brand_bsr_window)
        )
        df_st_brand_top3_bsr_orders = df_st_brand_top3_bsr_orders.filter("brand_rank<=3")
        df_st_brand_top3_bsr_orders = df_st_brand_top3_bsr_orders.groupby(["search_term"]).agg(
            F.sum("asin_brand_bsr_orders_total").alias("top3_brand_bsr_orders")
        )

        # top3品牌zr销量
        brand_zr_window = Window.partitionBy(["search_term"]).orderBy(
            self.df_top3_st_brand_cal.asin_brand_zr_orders_total.desc_nulls_last()
        )
        df_st_brand_top3_zr_orders = self.df_top3_st_brand_cal.withColumn(
            "brand_rank",
            F.row_number().over(window=brand_zr_window)
        )
        df_st_brand_top3_zr_orders = df_st_brand_top3_zr_orders.filter("brand_rank<=3")
        df_st_brand_top3_zr_orders = df_st_brand_top3_zr_orders.groupby(["search_term"]).agg(
            F.sum("asin_brand_zr_orders_total").alias("top3_brand_orders")
        )

        # 品牌总销量
        self.df_st_brand_cal = self.df_st_brand_cal.groupby(['search_term']).agg(
            F.count_distinct("asin_brand_name").alias("page3_brand_num")
        ).repartition(80, 'search_term')

        # 聚合得到st_brand
        self.df_st_brand_cal = self.df_st_brand_cal.join(
            df_st_brand_top3_bsr_orders, on=['search_term'], how='left'
        ).join(
            df_st_brand_top3_zr_orders, on=['search_term'], how='left'
        )

        self.df_st_brand_cal = self.df_st_brand_cal.select(
            "search_term", "page3_brand_num", "top3_brand_bsr_orders", "top3_brand_orders"
        ).cache()

        # 计算卖家top3销量和总销量
        self.df_st_seller_cal = self.df_st_seller_cal.filter("account_id is not null")
        self.df_st_seller_cal = self.df_st_seller_cal.groupby(['search_term', 'account_id']).agg(
            F.sum("asin_bsr_orders").alias("asin_seller_bsr_orders_total"),
            F.sum("asin_zr_orders").alias("asin_seller_zr_orders_total")
        )

        self.df_top3_st_seller_cal = self.df_st_seller_cal

        # 计算top3卖家bsr销量
        seller_bsr_window = Window.partitionBy(["search_term"]).orderBy(
            self.df_top3_st_seller_cal.asin_seller_bsr_orders_total.desc_nulls_last()
        )
        df_st_seller_top3_bsr_orders = self.df_top3_st_seller_cal.withColumn(
            "seller_rank",
            F.row_number().over(window=seller_bsr_window)
        )
        df_st_seller_top3_bsr_orders = df_st_seller_top3_bsr_orders.filter("seller_rank<=3")
        df_st_seller_top3_bsr_orders = df_st_seller_top3_bsr_orders.groupby(["search_term"]).agg(
            F.sum("asin_seller_bsr_orders_total").alias("top3_seller_bsr_orders")
        )

        # 计算top3卖家的zr销量
        seller_zr_window = Window.partitionBy(["search_term"]).orderBy(
            self.df_st_seller_cal.asin_seller_zr_orders_total.desc_nulls_last()
        )
        df_st_seller_top3_zr_orders = self.df_st_seller_cal.withColumn(
            "seller_rank",
            F.row_number().over(window=seller_zr_window)
        )
        df_st_seller_top3_zr_orders = df_st_seller_top3_zr_orders.filter("seller_rank<=3")
        df_st_seller_top3_zr_orders = df_st_seller_top3_zr_orders.groupby(["search_term"]).agg(
            F.sum("asin_seller_zr_orders_total").alias("top3_seller_orders")
        )

        # 卖家总数量
        self.df_st_seller_cal = self.df_st_seller_cal.groupby(['search_term']).agg(
            F.countDistinct("account_id").alias("page3_seller_num")
        ).repartition(80, 'search_term')

        # 聚合得到st_seller
        self.df_st_seller_cal = self.df_st_seller_cal.join(
            df_st_seller_top3_bsr_orders, on=['search_term'], how='left'
        ).join(
            df_st_seller_top3_zr_orders, on=['search_term'], how='left'
        )

        self.df_st_seller_cal = self.df_st_seller_cal.select(
            "search_term", "page3_seller_num", "top3_seller_bsr_orders", "top3_seller_orders"
        ).cache()

    # 计算最终指标
    def handle_st_cal(self):
        # 将st,st_asin,st_seller,st_brand按search_term聚合
        df_st_agg = self.df_st_measure.join(
            self.df_st_asin_cal, on=['search_term'], how='left'
        ).join(
            self.df_st_brand_cal, on=['search_term'], how='left'
        ).join(
            self.df_st_seller_cal, on=['search_term'], how='left'
        )
        self.df_st_measure.unpersist()
        self.df_st_asin_cal.unpersist()
        self.df_st_brand_cal.unpersist()
        self.df_st_seller_cal.unpersist()

        # 求值
        df_st_agg = df_st_agg.withColumn(
            # 新品产品数量/前三页产品总数
            "new_asin_proportion",
            F.round(F.col("asin_is_new_total") / F.col("asin_count"), 3)
        ).withColumn(
            # 当日A+商品占比
            "aadd_proportion",
            F.round(F.col("asin_aadd_count") / F.col("asin_count"), 3)
        ).withColumn(
            # 当日视频商品占比
            "sp_proportion",
            F.round(F.col("asin_video_count") / F.col("asin_count"), 3)
        ).withColumn(
            # 当日FBM商品占比
            "fbm_proportion",
            F.round(F.col("asin_fbm_count") / F.col("asin_count"), 3)
        ).withColumn(
            # 中国卖家占比
            "cn_proportion",
            F.round(F.col("asin_cn_count") / F.col("asin_count"), 3)
        ).withColumn(
            # Amazon自营占比
            "amzon_proportion",
            F.round(F.col("asin_amazon_count") / F.col("asin_count"), 3)
        ).withColumn(
            # 多颜色占比 = 关键字有颜色的asin数/关键字的asin数
            "color_proportion",
            F.round(F.col("asin_color_count") / F.col("asin_count"), 3)
        ).withColumn(
            # 多色比例 = 关键词前三页产品标题中出现colorful/assorted color/multi color的词产品个数/前三页产品数量
            "multi_color_proportion",
            F.round(F.col("asin_multi_color_count") / F.col("asin_count"), 3)
        ).withColumn(
            # 多尺寸占比
            "multi_size_proportion",
            F.round(F.col("asin_multi_size_count") / F.col("asin_count"), 3)
        ).withColumnRenamed(
            # 新品总数
            "asin_is_new_total",
            "new_asin_num"
        ).withColumnRenamed(
            # 产品总数
            "asin_count",
            "total_asin_num"
        ).withColumnRenamed(
            # 总bsr销量
            "st_bsr_orders",
            "bsr_orders"
        ).withColumnRenamed(
            # 总预估销量
            "st_zr_orders",
            "orders"
        ).withColumn(
            # 销量占比 新品销量占比
            "new_bsr_orders_proportion",
            F.round(F.col("new_asin_bsr_orders") / F.col("bsr_orders"), 3)
        ).withColumn(
            # 品牌垄断系数
            "brand_monopoly",
            F.ceil((F.col("top3_brand_bsr_orders") / F.col("bsr_orders")) * 1000) / 1000
        ).withColumn(
            # 卖家垄断系数
            "seller_monopoly",
            F.ceil((F.col("top3_seller_bsr_orders") / F.col("bsr_orders")) * 1000) / 1000
        ).withColumn(
            # ABA搜索词拆分的单词个数
            "st_word_num",
            self.st_word_count(self.sp_symbols)(F.col("search_term"))
        ).withColumn(
            # aba搜索词影视比例
            "movie_prop",
            F.round((F.col("asin_movie_type_count") / F.col("total_asin_num"))*100, 2)
        ).withColumn(
            # 影视标记类型 0:非影视; 1:0< 比例 <= 20%; 2: 20% < 比例 <= 50%; 3:50% < 比例
            "st_movie_label",
            F.when(
                (F.col("movie_prop") > 0) & (F.col("movie_prop") <= 20), F.lit(1)
            ).when(
                (F.col("movie_prop") > 20) & (F.col("movie_prop") <= 50), F.lit(2)
            ).when(
                (F.col("movie_prop") > 50), F.lit(3)
            ).otherwise(F.lit(0))
        )

        self.df_save = df_st_agg.join(
            self.df_st_detail, on=['search_term'], how='inner'
        ).join(
            self.df_st_key, on=['search_term'], how='inner'
        ).join(
            self.df_st_num_stats, on=['search_term'], how='left'
        ).join(
            self.df_st_market, on=['search_term'], how='left'
        ).join(
            self.df_st_volume_fba, on=['search_term'], how='left'
        ).join(
            self.df_st_brand, on=['search_term'], how='left'
        ).join(
            self.df_is_hidden_cate, on=['st_bsr_cate_1_id_new'], how='left'
        )
        self.df_st_detail.unpersist()
        self.df_st_key.unpersist()
        self.df_st_num_stats.unpersist()
        self.df_st_market.unpersist()
        self.df_st_volume_fba.unpersist()
        self.df_st_brand.unpersist()
        self.df_is_hidden_cate.unpersist()

    # 语种处理
    def handle_calc_lang(self):
        sql = """
            select 
                word, 
                langs 
            from big_data_selection.tmp_lang_word_frequency;
        """
        lang_word_list = self.spark.sql(sql).collect()
        # 转为map
        lang_word_map = {row['word']: row['langs'] for row in lang_word_list}

        self.df_save = self.df_save.withColumn(
            "lang",
            F.coalesce(udf_detect_phrase_reg(lang_word_map)(F.col("search_term")).getField("lang"), F.lit("other"))
        )

    def handle_column(self):
        # 入库前字段处理
        self.df_save = self.df_save.select(
            "id",
            "search_term",
            "rank",
            "category_id",
            "orders",
            "bsr_orders",
            "search_volume",
            "quantity_being_sold",
            F.round("st_ao_avg", 3).alias("st_ao_avg"),
            "st_ao_val_rate",
            "new_bsr_orders_proportion",
            "new_asin_proportion",
            F.round("page1_title_proportion", 3).alias("page1_title_proportion"),
            F.round("price_avg", 3).alias("price_avg"),
            F.round("total_comments_avg", 0).alias("total_comments_avg"),
            F.round("rating_avg", 3).alias("rating_avg"),
            F.round("weight_avg", 3).alias("weight_avg"),
            F.round("volume_avg", 3).alias("volume_avg"),
            F.round("title_length_avg", 0).alias("title_length_avg"),
            "st_num",
            "aadd_proportion",
            "sp_proportion",
            "fbm_proportion",
            "cn_proportion",
            "amzon_proportion",
            "most_proportion",
            "max_num",
            "asin1",
            "asin2",
            "asin3",
            F.round("click_share1", 3).alias("click_share1"),
            F.round("click_share2", 3).alias("click_share2"),
            F.round("click_share3", 3).alias("click_share3"),
            F.round("total_click_share", 3).alias("total_click_share"),
            F.round("conversion_share1", 3).alias("conversion_share1"),
            F.round("conversion_share2", 3).alias("conversion_share2"),
            F.round("conversion_share3", 3).alias("conversion_share3"),
            F.round("total_conversion_share", 3).alias("total_conversion_share"),
            "new_asin_num",
            "total_asin_num",
            "new_asin_orders",
            "new_asin_bsr_orders",
            "is_first_text",
            "is_ascending_text",
            "is_search_text",
            "top3_seller_orders",
            "top3_seller_bsr_orders",
            "top3_brand_orders",
            "top3_brand_bsr_orders",
            "page3_brand_num",
            "page3_seller_num",
            "brand_monopoly",
            "seller_monopoly",
            "max_num_asin",
            "is_self_max_num_asin",
            "is_new_market_segment",
            F.when(F.col('category_current_id').isNull(), F.col('category_id'))
                .otherwise(F.col('category_current_id')).alias('category_current_id'),
            "supply_demand",
            "market_cycle_type",
            "color_proportion",
            "gross_profit_fee_air",
            "gross_profit_fee_sea",
            "multi_color_proportion",
            "multi_size_proportion",
            "st_4_20_ao_avg",
            "st_4_20_ao_rate",
            "asin_aadd_count",
            "asin_video_count",
            "asin_fbm_count",
            "asin_cn_count",
            "asin_amazon_count",
            "asin_color_count",
            "asin_multi_color_count",
            "asin_multi_size_count",
            "st_word_num",
            "st_movie_label",
            "st_brand_label",
            "st_brand1",
            "st_category1",
            "st_brand2",
            "st_category2",
            "st_brand3",
            "st_category3",
            "st_bsr_cate_1_id_new",
            "st_bsr_cate_current_id_new",
            "st_crawl_date",
            "is_high_return_text",
            F.round("st_zr_page123_title_appear_rate", 3).alias("st_zr_page123_title_appear_rate"),
            F.round("st_sp_page123_title_appear_rate", 3).alias("st_sp_page123_title_appear_rate"),
            "st_competition_level",
            "amazon_monthly_sales",
            "st_zr_flow_proportion",
            "st_ao_val_matrix",
            "st_flow_proportion_matrix",
            "st_zr_counts",
            "st_sp_counts",
            "st_self_asin_counts",
            "st_self_asin_proportion",
            "lang",
            "asin_movie_type_count",
            "is_hidden_cate"
        )

        # 空值处理
        self.df_save = self.df_save.na.fill({
            "is_first_text": 0,
            "is_ascending_text": 0,
            "is_search_text": 0,
            "st_movie_label": 0,
            "st_brand_label": 0,
            "is_self_max_num_asin": 0,
            "market_cycle_type": 0,
            "is_new_market_segment": 0,
            "is_high_return_text": 0,
            "amazon_monthly_sales": 0,
            "is_hidden_cate": 0
        })

        # 日期字段补全
        self.df_save = self.df_save.withColumn(
            "created_time",
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')
        ).withColumn(
            "updated_time",
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')
        )

        # 预留字段补全
        self.df_save = self.df_save.withColumn(
            "re_string_field1", F.lit(None)
        )

        # 用户标记字段(已废弃,济苍直接填充)
        self.df_save = self.df_save.withColumn(
            "usr_mask_type", F.lit(None)
        ).withColumn(
            "usr_mask_progress", F.lit(None)
        )

        # 分区字段补全
        self.df_save = self.df_save.withColumn(
            "site_name", F.lit(self.site_name)
        ).withColumn(
            "date_type", F.lit(self.date_type)
        ).withColumn(
            "date_info", F.lit(self.date_info)
        )


if __name__ == '__main__':
    start_time = time.time()
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwtAbaStAnalytics(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()
    end_time = time.time()
    consumer_time = end_time - start_time
    print("aba数据存储完毕, 执行时长为:" + str(consumer_time))