import os
import sys


sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil, DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.dataframe import DataFrame
from yswg_utils.common_udf import udf_detect_phrase_reg


class DwtAbaLast365(object):

    def __init__(self, site_name, date_type, date_info):
        self.site_name = site_name
        self.date_info = date_info
        self.date_type = date_type
        self.date_type_original = DateTypes.month.name
        assert date_type in [DateTypes.month.name, DateTypes.year.name], "date_type 输入有误!!"
        app_name = f"{self.__class__.__name__}:{self.site_name}:{self.date_type}:{self.date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.hive_tb = "dwt_aba_last365"
        # 全局df
        self.df_base = self.spark.sql(f"select 1+1;")
        self.df_orders = self.spark.sql(f"select 1+1;")
        self.df_st_sv_rank = self.spark.sql(f"select 1+1;")
        self.df_history = self.spark.sql(f"select 1+1;")
        self.df_last_year = self.spark.sql(f"select 1+1;")
        self.df_sv_change_rate = self.spark.sql(f"select 1+1;")
        self.df_label = self.spark.sql(f"select 1+1;")
        self.df_base_lastest = self.spark.sql(f"select 1+1;")
        self.df_change_rate_lastest = self.spark.sql(f"select 1+1;")
        # 过去12月list
        self.last_12_month = []
        for i in range(0, 12):
            self.last_12_month.append(CommonUtil.get_month_offset(self.date_info, -i))
        # 12个月前
        self.last_year_month = CommonUtil.get_month_offset(self.date_info, -12)
        pass

    #  对指定的行进行行转列
    def pivot_df(self, last_12_month: list, df_all: DataFrame, df_agg: DataFrame, group_col: str, pivot_col: str, agg_col_arr: list):
        """
        对指定的行进行行转列
        """
        # 列名如下: 2024-07_st_num|2024-06_st_num|……|2024-07_bsr_orders|2024-06_bsr_orders|……
        df_tmp = df_all.groupBy(group_col).pivot(pivot_col, last_12_month).agg(
            *list(map(lambda col: F.first(col).alias(col), agg_col_arr))
        ).cache()

        # 列名如下: st_num1|st_num2|……|bsr_orders1|bsr_orders2|……
        for index in range(0, len(last_12_month)):
            for col in agg_col_arr:
                prefix = last_12_month[index]
                month = int(prefix.split('-')[-1])
                df_tmp = df_tmp.withColumnRenamed(f"{prefix}_{col}", f"{col}{month}")
        return df_agg.join(df_tmp, group_col, "inner")

    def run(self):
        self.read_data()
        self.handle_data()
        self.save_data()

    def read_data(self):
        # ABA月数据
        sql1 = f"""
            select 
                search_term,
                id,
                st_bsr_cate_1_id_new as category_id,
                bsr_orders,
                search_volume,
                st_ao_avg,
                st_ao_val_rate,
                price_avg,
                weight_avg,
                volume_avg,
                rating_avg,
                total_comments_avg,
                total_asin_num,
                aadd_proportion,
                sp_proportion,
                fbm_proportion,
                cn_proportion,
                amzon_proportion,
                top3_seller_orders,
                top3_seller_bsr_orders,
                top3_brand_orders,
                top3_brand_bsr_orders,
                page3_brand_num,
                page3_seller_num,
                new_bsr_orders_proportion,
                new_asin_proportion,
                supply_demand,
                max_num,
                most_proportion,
                gross_profit_fee_sea,
                gross_profit_fee_air,
                st_bsr_cate_current_id_new as category_current_id,
                color_proportion,
                max_num_asin,
                is_self_max_num_asin,
                multi_color_proportion,
                multi_size_proportion,
                is_new_market_segment,
                market_cycle_type,
                brand_monopoly,
                seller_monopoly,
                is_ascending_text,
                is_search_text,
                st_word_num,
                date_info,
                st_num,
                rank
            from dwt_aba_st_analytics
            where site_name = '{self.site_name}'
              and date_type = '{self.date_type_original}'
              and date_info in ({CommonUtil.list_to_insql(self.last_12_month)})
        """
        self.df_base = self.spark.sql(sql1).repartition(80, "search_term", "date_info").cache()

        # 搜索词同比、环比
        sql2 = f"""
            select 
                search_term, 
                rank_change_rate as rank_change_rate_lastest, 
                rank_rate_of_change as rank_rate_of_change_lastest 
            from dwt_aba_last_change_rate
            where site_name = '{self.site_name}'
              and date_type = '{self.date_type_original}'
              and date_info = '{self.date_info}';
        """
        self.df_change_rate_lastest = self.spark.sql(sql2).repartition(80, 'search_term').cache()

        # 搜索词预估销量
        if date_info < '2023-09':
            old_list = list(filter(lambda it: it < '2022-10', self.last_12_month))
            new_list = list(filter(lambda it: it >= '2022-10', self.last_12_month))
            sql3 = f"""
                select 
                    search_term,
                    st_search_sum as orders,
                    date_info
                from dim_st_detail
                where site_name = '{self.site_name}'
                  and date_type = 'month_old'
                  and date_info in ({CommonUtil.list_to_insql(old_list)})
                union
                select 
                    search_term,
                    orders as orders,
                    date_info
                from dwt_aba_st_analytics
                where site_name = '{self.site_name}'
                  and date_type = 'month'
                  and date_info in ({CommonUtil.list_to_insql(new_list)})
            """
        else:
            sql3 = f"""
                select 
                    search_term,
                    orders as orders,
                    date_info
                from dwt_aba_st_analytics
                where site_name = '{self.site_name}'
                  and date_type = 'month'
                  and date_info in ({CommonUtil.list_to_insql(self.last_12_month)})
            """
        self.df_orders = self.spark.sql(sql3).repartition(80, "search_term", "date_info").cache()

        # 搜索量排名:本次和上年度排名
        sql = f"""
            select 
                search_term_id,
                collect_set(rank)[0]      as rank,
                collect_set(last_rank)[0] as last_rank
            from (
                select 
                    search_term_id,
                    case date_info when '{self.date_info}' then sv_rank end as rank,
                    case date_info when '{self.last_year_month}' then sv_rank end as last_rank
                from dwt_st_sv_last365
                where site_name = '{self.site_name}'
                  and date_info in ('{self.date_info}', '{self.last_year_month}')
            ) tmp
            group by search_term_id;
        """
        self.df_st_sv_rank = self.spark.sql(sql).na.fill({"last_rank": 0}).cache()

        # 历史新增词识别:读取月表所有历史数据,判断是否为历史新增
        sql = f"""
            select 
                search_term, 
                0 as is_history_first_text, 
                min(date_info) as history_first_appear_month 
            from dwt_aba_st_analytics
            where site_name = '{self.site_name}'
              and date_type = '{self.date_type_original}'
              and date_info <= '{self.last_year_month}'
            group by search_term;
        """
        self.df_history = self.spark.sql(sql).repartition(80, 'search_term').cache()

        # 新增词识别:读取年表同比数据,判断是否为近1年新增
        sql = f"""
            select 
                search_term, 
                0 as is_first_text 
            from dwt_aba_last365
            where site_name = '{self.site_name}'
              and date_type = '{self.date_type}'
              and date_info = '{self.last_year_month}';
        """
        self.df_last_year = self.spark.sql(sql).repartition(80, 'search_term').cache()

        # 搜索量同比增长识别:判断持续上升/下降
        sql = f"""
            select 
                search_term, 
                search_volume_change_rate 
            from dwt_aba_last_change_rate
            where site_name = '{self.site_name}'
              and date_type = '{self.date_type_original}'
              and date_info in ({CommonUtil.list_to_insql(self.last_12_month)});
        """
        self.df_sv_change_rate = self.spark.sql(sql).repartition(80, 'search_term').cache()

        # 影视+品牌标签识别
        sql = f"""
            select
                search_term, 
                asin_movie_type_count, 
                total_asin_num, 
                st_brand_label 
            from dwt_aba_st_analytics
            where site_name = '{self.site_name}'
              and date_type = '{self.date_type_original}'
              and date_info in ({CommonUtil.list_to_insql(self.last_12_month)});
        """
        self.df_label = self.spark.sql(sql).repartition(80, 'search_term').fillna({
            'asin_movie_type_count': 0,
            'total_asin_num': 0
        }).cache()

    def handle_data(self):
        # st最新月数据
        self.handle_month_lastest()
        # 聚合字段处理
        self.handle_agg()
        # 标签处理
        self.handle_label()
        # 语种处理
        self.handle_calc_lang()
        # 入库前处理
        self.handle_save()

    def handle_month_lastest(self):
        # 保留最新的月数据
        self.df_base_lastest = self.df_base.filter(f"date_info = '{self.date_info}'").select(
            "search_term", "rank", "color_proportion", "multi_color_proportion", "multi_size_proportion", "st_ao_avg",
            "st_ao_val_rate", "supply_demand", "total_asin_num", "new_asin_proportion", "bsr_orders",
            "new_bsr_orders_proportion", "price_avg", "weight_avg", "volume_avg", "page3_brand_num", "brand_monopoly",
            "page3_seller_num", "seller_monopoly", "aadd_proportion", "sp_proportion", "fbm_proportion", "cn_proportion",
            "amzon_proportion", "most_proportion", "max_num", "max_num_asin", "is_self_max_num_asin", "rating_avg",
            "total_comments_avg", "date_info"
        ).withColumnRenamed(
            'rank', 'rank_lastest'
        ).withColumnRenamed(
            'multi_color_proportion', 'multi_color_avg_proportion'
        ).withColumnRenamed(
            'multi_size_proportion', 'multi_size_avg_proportion'
        ).withColumnRenamed(
            'new_asin_proportion', 'new_asin_num_avg_monopoly'
        ).withColumnRenamed(
            'new_bsr_orders_proportion', 'new_asin_bsr_orders_avg_monopoly'
        ).withColumnRenamed(
            'most_proportion', 'most_avg_proportion'
        ).withColumnRenamed(
            'date_info', 'appear_month_lastest'
        ).repartition(80, 'search_term').cache()

    def handle_agg(self):
        self.df_base = self.df_base.join(
            other=self.df_orders, on=["search_term", "date_info"], how="left"
        )
        self.df_orders.unpersist()

        df_agg = self.df_base.groupBy("id").agg(
            F.first("search_term").alias("search_term"),
            F.first("category_id").alias("category_id"),
            F.first("category_current_id").alias("category_current_id"),
            F.expr("round(sum(top3_seller_orders)/12,4)").alias("top3_seller_orders"),
            F.expr("round(sum(top3_seller_bsr_orders)/12,4)").alias("top3_seller_bsr_orders"),
            F.expr("round(sum(top3_brand_orders)/12,4)").alias("top3_brand_orders"),
            F.expr("round(sum(top3_brand_bsr_orders)/12,4)").alias("top3_brand_bsr_orders"),
            F.expr("round(sum(gross_profit_fee_sea)/12,4)").alias("gross_profit_fee_sea"),
            F.expr("round(sum(gross_profit_fee_air)/12,4)").alias("gross_profit_fee_air"),
            F.sum(F.col("orders")).alias("orders"),
            F.sum(F.col("st_num")).alias("total_st_num"),
            F.max("st_word_num").alias("st_word_num"),
            # bsr销量最高对应的月
            F.max(F.struct("bsr_orders", "date_info")).alias("tmp_row_1"),
            # 销量最高对应的月
            F.max(F.struct("orders", "date_info")).alias("tmp_row_2"),
            # 首次出现对应的月
            F.min("date_info").alias("first_appear_month"),
            # 所有出现的月
            F.concat_ws(
                ",", F.sort_array(F.collect_set(F.split("date_info", "-")[1].cast(IntegerType())))
            ).alias("total_appear_month"),
            # 是否新细分市场 非平均数算法 12个月都是新出现 表明同比年也是新出现 即 sum=12 表示为1 否则都是0
            F.avg("is_new_market_segment").cast(IntegerType()).alias("is_new_market_segment"),
            # 同比是否是热搜词 热搜词:最近1月/年中,出现的次数大于80% 如果月热搜词 is_search_text的和>=10 则是热搜词
            F.expr("sum(is_search_text) / 9.6").cast(IntegerType()).alias("is_search_text")
        )

        # 行转列的字段
        agg_col_arr = ['st_num', 'bsr_orders', 'orders', 'market_cycle_type', 'search_volume']
        self.df_base = self.pivot_df(
            self.last_12_month, self.df_base, df_agg, "id", "date_info", agg_col_arr
        ).repartition(80, "search_term").cache()

    def handle_label(self):
        # 影视品牌标签
        self.df_label = self.df_label.groupBy('search_term').agg(
            F.round((F.sum("asin_movie_type_count") / F.sum("total_asin_num")) * 100, 2).alias('movie_prop'),
            F.max('st_brand_label').alias('st_brand_label')
        ).withColumn(
            # 影视标记类型 0:非影视; 1:0< 比例 <= 20%; 2: 20% < 比例 <= 50%; 3:50% < 比例
            'st_movie_label',
            F.when(
                (F.col("movie_prop") > 0) & (F.col("movie_prop") <= 20), F.lit(1)
            ).when(
                (F.col("movie_prop") > 20) & (F.col("movie_prop") <= 50), F.lit(2)
            ).when(
                (F.col("movie_prop") > 50), F.lit(3)
            ).otherwise(F.lit(0))
        ).withColumn(
            'st_brand_label',
            F.when(F.col('st_brand_label') == 1, F.lit(4)).otherwise(F.lit(0))
        ).withColumn(
            'st_movie_brand_label',
            F.sort_array(F.expr("array_distinct(array(st_movie_label, st_brand_label))"))
        ).withColumn(
            'st_movie_brand_label',
            F.when(
                F.array_contains(F.col('st_movie_brand_label'), 0) & (F.size(F.col('st_movie_brand_label')) > 1),
                F.expr("filter(st_movie_brand_label, x -> x != 0)")
            ).otherwise(F.col('st_movie_brand_label'))
        ).withColumn(
            'st_movie_brand_label',
            F.concat_ws(",", F.col('st_movie_brand_label'))
        ).select(
            'search_term', 'st_movie_brand_label'
        )
        self.df_base = self.df_base.join(
            self.df_label, on='search_term', how='left'
        )

        # 上升词判断
        self.df_base = self.df_base.join(
            self.df_st_sv_rank, self.df_base['id'].eqNullSafe(self.df_st_sv_rank['search_term_id']), "left"
        ).withColumn(
            "is_ascending_text",
            F.expr("rank / last_rank <= 0.5").cast(IntegerType())
        )

        # 新增词判断
        self.df_base = self.df_base.join(
            self.df_last_year, on='search_term', how='left'
        ).fillna({'is_first_text': 1})

        # 历史新增词判断
        self.df_base = self.df_base.join(
            self.df_history, on='search_term', how='left'
        ).fillna({
            'is_history_first_text': 1
        }).withColumn(
            'history_first_appear_month',
            F.when(F.col('history_first_appear_month').isNotNull(), F.col('history_first_appear_month'))
                .otherwise(F.col('first_appear_month'))
        )

        # 判断市场周期类型,优先保留最近月的数据,若为null则往前推
        num = int(self.date_info.split('-')[-1])
        fields_first_round = [F.col(f'market_cycle_type{i}') for i in range(num, 0, -1)]
        fields_second_round = [F.col(f'market_cycle_type{i}') for i in range(12, num, -1)]
        fields = fields_first_round + fields_second_round
        self.df_base = self.df_base.withColumn('market_cycle_type', F.coalesce(*fields))

        # 持续上升、下降判断
        self.df_sv_change_rate = self.df_sv_change_rate.withColumn(
            'sv_rising_flag', F.when(F.col('search_volume_change_rate') > 0, 1).otherwise(0)
        ).withColumn(
            'sv_decline_flag', F.when(F.col('search_volume_change_rate') < 0, 1).otherwise(0)
        )
        # # 计算上升率、下降率
        self.df_sv_change_rate = self.df_sv_change_rate.groupBy('search_term').agg(
            F.round(F.sum('sv_rising_flag') / F.count('search_term'), 4).alias('sv_rising_rate'),
            F.round(F.sum('sv_decline_flag') / F.count('search_term'), 4).alias('sv_decline_rate')
        )
        self.df_base = self.df_base.join(
            self.df_sv_change_rate, on='search_term', how='left'
        ).withColumn(
            'sv_change_rate_flag',
            F.when(F.col('sv_rising_rate') > 0.7, 1).when(F.col('sv_decline_rate') > 0.7, 2).otherwise(0)
        ).cache()

        self.df_label.unpersist()
        self.df_st_sv_rank.unpersist()
        self.df_last_year.unpersist()
        self.df_history.unpersist()
        self.df_sv_change_rate.unpersist()

    def handle_calc_lang(self):
        sql = """
            select word, langs from big_data_selection.tmp_lang_word_frequency;
        """
        lang_word_list = self.spark.sql(sql).collect()
        # 转为map
        lang_word_map = {row['word']: row['langs'] for row in lang_word_list}
        self.df_base = self.df_base.withColumn(
            "lang",
            F.coalesce(udf_detect_phrase_reg(lang_word_map)(F.col("search_term")).getField("lang"), F.lit("other"))
        ).cache()

    def handle_save(self):
        # 关联月数据
        self.df_base = self.df_base.join(
            other=self.df_base_lastest, on="search_term", how="left"
        ).join(
            other=self.df_change_rate_lastest, on="search_term", how="left"
        ).cache()
        self.df_base_lastest.unpersist()
        self.df_change_rate_lastest.unpersist()

        self.df_base = self.df_base.select(
            F.col("id"),
            F.col("search_term"),
            F.col("category_id"),
            F.col("category_current_id"),
            F.col('rank').cast(IntegerType()),
            F.col("total_st_num"),

            # 后缀数字表示对应的月份,如:2024-07则为st_num7,2023-12则为st_num12。下面字段同理
            F.col("st_num1").cast(IntegerType()),
            F.col("st_num2").cast(IntegerType()),
            F.col("st_num3").cast(IntegerType()),
            F.col("st_num4").cast(IntegerType()),
            F.col("st_num5").cast(IntegerType()),
            F.col("st_num6").cast(IntegerType()),
            F.col("st_num7").cast(IntegerType()),
            F.col("st_num8").cast(IntegerType()),
            F.col("st_num9").cast(IntegerType()),
            F.col("st_num10").cast(IntegerType()),
            F.col("st_num11").cast(IntegerType()),
            F.col("st_num12").cast(IntegerType()),

            F.col("orders1").cast(IntegerType()),
            F.col("orders2").cast(IntegerType()),
            F.col("orders3").cast(IntegerType()),
            F.col("orders4").cast(IntegerType()),
            F.col("orders5").cast(IntegerType()),
            F.col("orders6").cast(IntegerType()),
            F.col("orders7").cast(IntegerType()),
            F.col("orders8").cast(IntegerType()),
            F.col("orders9").cast(IntegerType()),
            F.col("orders10").cast(IntegerType()),
            F.col("orders11").cast(IntegerType()),
            F.col("orders12").cast(IntegerType()),

            F.col("bsr_orders1").cast(IntegerType()),
            F.col("bsr_orders2").cast(IntegerType()),
            F.col("bsr_orders3").cast(IntegerType()),
            F.col("bsr_orders4").cast(IntegerType()),
            F.col("bsr_orders5").cast(IntegerType()),
            F.col("bsr_orders6").cast(IntegerType()),
            F.col("bsr_orders7").cast(IntegerType()),
            F.col("bsr_orders8").cast(IntegerType()),
            F.col("bsr_orders9").cast(IntegerType()),
            F.col("bsr_orders10").cast(IntegerType()),
            F.col("bsr_orders11").cast(IntegerType()),
            F.col("bsr_orders12").cast(IntegerType()),

            F.col("market_cycle_type1").cast(IntegerType()),
            F.col("market_cycle_type2").cast(IntegerType()),
            F.col("market_cycle_type3").cast(IntegerType()),
            F.col("market_cycle_type4").cast(IntegerType()),
            F.col("market_cycle_type5").cast(IntegerType()),
            F.col("market_cycle_type6").cast(IntegerType()),
            F.col("market_cycle_type7").cast(IntegerType()),
            F.col("market_cycle_type8").cast(IntegerType()),
            F.col("market_cycle_type9").cast(IntegerType()),
            F.col("market_cycle_type10").cast(IntegerType()),
            F.col("market_cycle_type11").cast(IntegerType()),
            F.col("market_cycle_type12").cast(IntegerType()),

            F.col("search_volume1").cast(IntegerType()),
            F.col("search_volume2").cast(IntegerType()),
            F.col("search_volume3").cast(IntegerType()),
            F.col("search_volume4").cast(IntegerType()),
            F.col("search_volume5").cast(IntegerType()),
            F.col("search_volume6").cast(IntegerType()),
            F.col("search_volume7").cast(IntegerType()),
            F.col("search_volume8").cast(IntegerType()),
            F.col("search_volume9").cast(IntegerType()),
            F.col("search_volume10").cast(IntegerType()),
            F.col("search_volume11").cast(IntegerType()),
            F.col("search_volume12").cast(IntegerType()),

            F.col("st_ao_avg"),
            F.expr("round(st_ao_val_rate, 4)").alias("st_ao_val_rate"),
            F.col("price_avg"),
            F.col("weight_avg"),
            F.col("volume_avg"),
            F.col("rating_avg"),
            F.col("total_comments_avg"),

            F.col("multi_size_avg_proportion"),
            F.col("multi_color_avg_proportion"),

            F.col("brand_monopoly"),
            F.col("seller_monopoly"),
            F.col("most_avg_proportion"),
            F.col("supply_demand"),
            F.col("aadd_proportion"),
            F.col("sp_proportion"),
            F.col("fbm_proportion"),
            F.col("cn_proportion"),
            F.col("amzon_proportion"),
            F.col("top3_seller_orders"),
            F.col("top3_seller_bsr_orders"),
            F.col("top3_brand_orders"),
            F.col("top3_brand_bsr_orders"),
            F.col("page3_brand_num").cast(IntegerType()),
            F.col("page3_seller_num").cast(IntegerType()),
            F.col("new_asin_num_avg_monopoly"),
            F.col("new_asin_bsr_orders_avg_monopoly"),
            F.col("orders").cast(IntegerType()),
            F.col("bsr_orders"),

            F.col("gross_profit_fee_sea"),
            F.col("gross_profit_fee_air"),
            F.col("color_proportion"),

            F.col("total_asin_num"),

            F.col("max_num"),
            F.col("max_num_asin"),
            F.col("is_self_max_num_asin"),

            F.col("tmp_row_1").getField("date_info").alias("max_bsr_orders_month"),
            F.col("tmp_row_2").getField("date_info").alias("max_orders_month"),

            F.col("is_new_market_segment").alias("is_new_market_segment"),
            F.col("is_ascending_text").alias("is_ascending_text"),
            F.col("is_search_text").alias("is_search_text"),
            F.col("st_word_num").alias("st_word_num"),

            F.current_date().alias("updated_time").cast(StringType()),
            F.current_date().alias("created_time").cast(StringType()),

            F.lit(None).alias("usr_mask_type"),
            F.lit(None).alias("usr_mask_progress"),
            F.col("lang"),

            # 历史新增词判断,对比所有历史数据
            F.col("is_history_first_text"),
            # 该词首次出现的月份
            F.col("history_first_appear_month"),
            # 新增词判断,同比上一年
            F.col("is_first_text"),
            # 该词今年首次出现的月份
            F.col("first_appear_month"),
            # 搜索量上升率
            F.col("sv_rising_rate"),
            # 搜索量下降率
            F.col("sv_decline_rate"),
            # 持续上升或下降标签,1上升 2下降 0都不是
            F.col("sv_change_rate_flag"),
            # 影视品牌标签
            F.col("st_movie_brand_label"),
            # 该词今年出现的所有月份
            F.col("total_appear_month"),
            # 市场周期类型
            F.col("market_cycle_type"),
            # 最新月份的排名
            F.col("rank_lastest"),
            # 最新月份的同比
            F.col("rank_change_rate_lastest"),
            # 最新月份的环比
            F.col("rank_rate_of_change_lastest"),
            # 最新出现的月份
            F.col("appear_month_lastest"),

            F.lit(self.site_name).alias("site_name"),
            F.lit(self.date_type).alias("date_type"),
            F.lit(self.date_info).alias("date_info")
        )
        # 四个季度bsr销量
        self.df_base = self.df_base.withColumn(
            "q1_bsr_orders",
            F.expr("coalesce(bsr_orders1,0) + coalesce(bsr_orders2,0) + coalesce(bsr_orders3,0)")
        ).withColumn(
            "q2_bsr_orders",
            F.expr("coalesce(bsr_orders4,0) + coalesce(bsr_orders5,0) + coalesce(bsr_orders6,0)")
        ).withColumn(
            "q3_bsr_orders",
            F.expr("coalesce(bsr_orders7,0) + coalesce(bsr_orders8,0) + coalesce(bsr_orders9,0)")
        ).withColumn(
            "q4_bsr_orders",
            F.expr("coalesce(bsr_orders10,0) + coalesce(bsr_orders11,0) + coalesce(bsr_orders12,0)")
        )
        # 四个季度预估销量
        self.df_base = self.df_base.withColumn(
            "q1_orders",
            F.expr("coalesce(orders1,0) + coalesce(orders2,0) + coalesce(orders3,0)")
        ).withColumn(
            "q2_orders",
            F.expr("coalesce(orders4,0) + coalesce(orders5,0) + coalesce(orders6,0)")
        ).withColumn(
            "q3_orders",
            F.expr("coalesce(orders7,0) + coalesce(orders8,0) + coalesce(orders9,0)")
        ).withColumn(
            "q4_orders",
            F.expr("coalesce(orders10,0) + coalesce(orders11,0) + coalesce(orders12,0)")
        )
        # top_rank 兼容
        self.df_base = self.df_base.withColumn(
            "top_rank", F.col("rank")
        ).na.fill({
            "rank": 0,
            "top_rank": 0
        }).cache()

    def save_data(self):
        # 重新分区
        self.df_base = self.df_base.repartition(20)
        partition_by = ["site_name", "date_type", "date_info"]
        print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
        hdfs_path = CommonUtil.build_hdfs_path(
            self.hive_tb,
            partition_dict={
                "site_name": self.site_name,
                "date_type": self.date_type,
                "date_info": self.date_info,
            }
        )
        print(f"清除hdfs目录中:{hdfs_path}")
        HdfsUtils.delete_file_in_folder(hdfs_path)
        self.df_base.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    obj = DwtAbaLast365(site_name=site_name, date_type=date_type, date_info=date_info)
    obj.run()