"""
ABA搜索词统计报表
"""
"""
author: 汪瑞
description: 基于dwd层等表,计算出search_term维度的报表信息
table_read_name: dim_cal_asin_history_detail系列表,dwd_st_measure系列表,dws_top100_asin_info系列表,ods_st_key系列表, dwd_asin_measure系列表, dwd_st_asin_measure系列表
table_save_name: dwt_aba_st_analytics_report
table_save_level: dwt
version: 1.0
created_date: 2022-11-17
updated_date: 2022-11-17
"""

import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, MapType
from datetime import datetime, timedelta
from yswg_utils.common_udf import udf_get_package_quantity
import math


class DwtAbaStAnalyticsReport(Templates):

    def __init__(self, site_name="us", date_type="day", date_info="2022-11-02"):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f"dwt_aba_st_analytics_report"
        self.spark = self.create_spark_object(app_name=f"{self.db_save} {self.site_name}, {self.date_info}")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.df_asin_history_detail = self.spark.sql(f"select 1+1;")
        self.df_st_asin_measure = self.spark.sql(f"select 1+1;")
        self.df_st_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_st_key = self.spark.sql(f"select 1+1;")
        self.df_top100_asin = self.spark.sql(f"select 1+1;")
        self.df_st_measure = self.spark.sql(f"select 1+1;")
        self.df_asin_measure = self.spark.sql(f"select 1+1;")
        self.df_asin_count = self.spark.sql(f"select 1+1;")
        self.df_st_buy_box = self.spark.sql(f"select 1+1;")
        self.df_st_color = self.spark.sql(f"select 1+1;")
        self.df_st_seller = self.spark.sql(f"select 1+1;")
        self.df_st_top20 = self.spark.sql(f"select 1+1;")
        self.df_st_attribute = self.spark.sql(f"select 1+1;")
        self.df_st_img_type_group = self.spark.sql(f"select 1+1;")
        self.df_st_launch_time_group = self.spark.sql(f"select 1+1;")
        self.df_st_price_group = self.spark.sql(f"select 1+1;")
        self.df_st_ao_group = self.spark.sql(f"select 1+1;")
        self.df_st_comments_group = self.spark.sql("select 1+1;")
        self.df_st_rating_group = self.spark.sql("select 1+1;")
        self.df_st_detail = self.spark.sql("select 1+1;")
        self.df_seller_asin = self.spark.sql("select 1+1;")
        self.df_st_size = self.spark.sql("select 1+1;")
        self.df_st_package_num = self.spark.sql("select 1+1;")
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.reset_partitions(65)

        self.u_get_buy_box_num = self.spark.udf.register("u_get_buy_box_num", self.udf_get_buy_box_num, StringType())
        self.u_get_buy_box = self.spark.udf.register("u_get_buy_box", self.udf_get_buy_box_type, StringType())
        self.u_get_img_type = self.spark.udf.register("u_get_img_type", self.udf_get_img_type, StringType())
        self.u_get_seller_num = self.spark.udf.register('u_get_seller_num', self.udf_get_seller_num, StringType())
        self.u_get_seller_bsr_orders = self.spark.udf.register('u_get_seller_bsr_orders', self.udf_get_seller_bsr_orders, StringType())
        self.u_judge_color = self.spark.udf.register('u_judge_color', self.udf_judge_color, IntegerType())
        self.u_judge_size = self.spark.udf.register('u_judge_size', self.udf_judge_multi_size, StringType())
        self.u_get_package_num = F.udf(udf_get_package_quantity, IntegerType())
        self.u_get_package_num_trend = self.spark.udf.register('u_get_package_num_trend', self.udf_reverse_package_num_array, StringType())
        self.u_get_package_num_trend_market_share = self.spark.udf.register('u_get_package_num_trend_market_share', self.udf_package_num_counts, StringType())
        self.u_get_package_num_corresponding_asin = self.spark.udf.register('u_get_package_num_corresponding_asin', self.udf_package_num_corresponding_asin_list, StringType())
        self.u_get_price_interval = self.spark.udf.register('u_get_price_interval', self.udf_price_interval_info, MapType(StringType(), StringType(), False))


    @staticmethod
    def udf_get_img_type(img_type):
        all_img_type = str(img_type).split(",")
        if ('2' in all_img_type) & ('3' in all_img_type):
            return 'aadd_video_num'
        elif ('2' not in all_img_type) & ('3' in all_img_type):
            return 'aadd_no_video_num'
        elif ('2' not in all_img_type) & ('3' not in all_img_type):
            return 'no_aadd_no_video_num'
        elif ('2' in all_img_type) & ('3' not in all_img_type):
            return 'no_aadd_video_num'

    @staticmethod
    def udf_get_buy_box_type(buy_box):
        if "1" == str(buy_box):
            return 'Amazon'
        elif "2" == str(buy_box):
            return 'FBA'
        elif "3" == str(buy_box):
            return 'FBM'
        else:
            return 'other'

    @staticmethod
    def udf_get_seller_num(seller_type, all_seller):
        all_seller_type = str(seller_type).split(",")
        seller_num = ''
        for i in range(len(all_seller_type)):
            splits = all_seller.count(all_seller_type[i])
            if (i < len(all_seller_type) - 1):
                seller_num = seller_num + str(splits) + ','
            else:
                seller_num = seller_num + str(splits)
        return seller_num

    @staticmethod
    def udf_get_seller_bsr_orders(seller_type, all_seller, all_seller_bsr_orders):
        all_seller_type = str(seller_type).split(",")
        all_seller_list = str(all_seller).split(",")
        all_seller_bsr_orders = str(all_seller_bsr_orders).split(",")
        seller_bsr_orders = ""
        for seller in all_seller_type:
            one_seller_bsr_orders = 0
            for (i, j) in zip(all_seller_list, all_seller_bsr_orders):
                if seller == i:
                    one_seller_bsr_orders = int(one_seller_bsr_orders) + int(j)
            if (seller != all_seller_type[-1]):
                seller_bsr_orders = seller_bsr_orders + str(one_seller_bsr_orders) + ','
            else:
                seller_bsr_orders = seller_bsr_orders + str(one_seller_bsr_orders)
        return seller_bsr_orders

    @staticmethod
    def udf_get_buy_box_num(buy_box_type, buy_box_list):
        all_buy_box_type = str(buy_box_type).split(",")
        buy_box_num = ''
        for i in range(len(all_buy_box_type)):
            splits = buy_box_list.count(all_buy_box_type[i])
            if (i < len(all_buy_box_type) - 1):
                buy_box_num = buy_box_num + str(splits) + ','
            else:
                buy_box_num = buy_box_num + str(splits)
        return buy_box_num

    @staticmethod
    def udf_judge_color(color):
        color_len = len(str(color))
        if str(color).lower() not in ['none', 'null'] and color_len > 1:
            return 1
        else:
            return 0

    @staticmethod
    def udf_judge_multi_size(size, style):
        size = str(size).lower()
        style = str(style).lower()
        # 变体表中即有size又有style时,取size进行计数。如果无size,则判断是否有style进行计数
        if size not in ['none', 'null']:
            return size
        elif style not in ['none', 'null']:
            return style

    @staticmethod
    def udf_reverse_package_num_array(package_num_list):
        if str(package_num_list) != '':
            package_num_trend = str(package_num_list).strip("[").strip("]").replace("\'", '').replace(' ', '')
            return package_num_trend

    @staticmethod
    def udf_package_num_counts(package_num_list):
        if str(package_num_list) != '':
            package_num_list = str(package_num_list).strip("[").strip("]").replace(' ', '')
            package_num_trend_market_share = ''
            package_num_arr = str(package_num_list).split(",")
            package_num_int_list = list(map(int, package_num_arr))
            total_package_num = sum(package_num_int_list)
            for i in range(len(package_num_int_list)):
                if (i < len(package_num_int_list) - 1):
                    package_num_trend_market_share = package_num_trend_market_share + str(
                        round(package_num_int_list[i] / total_package_num, 3)) + ','
                else:
                    package_num_trend_market_share = package_num_trend_market_share + str(
                        round(package_num_int_list[i] / total_package_num, 3))
            return package_num_trend_market_share

    @staticmethod
    def udf_package_num_corresponding_asin_list(asin_list):
        if str(asin_list) != '':
            package_num_corresponding_asin = str(asin_list).strip("[").strip("]").replace("\'", '').replace(' ', '')
            return package_num_corresponding_asin

    @staticmethod
    def udf_price_interval_info(st_price_avg, asin_price):
        def get_price_interval(interval_num, interval_span, interval_boundary=0):
            price_interval_list = []
            for i in range(interval_num):
                lower_bound = interval_boundary + interval_span * i
                upper_bound = interval_boundary + interval_span * (i + 1)
                price_interval_list.append(f"{lower_bound}-{upper_bound}")
            price_interval = ','.join(price_interval_list)
            return price_interval

        if st_price_avg > 0 and asin_price > 0:
            price_range = [
                (0, 25, 5, 10),
                (25, 30, 10, 6),
                (30, 40, 10, 8),
                (40, 50, 10, 10),
                (50, 60, 10, 12),
                (60, 70, 10, 14),
                (70, 80, 10, 16),
                (80, 90, 10, 18),
                (90, 100, 10, 20),
                (100, 105, 15, 14),
                (105, 120, 15, 16),
                (120, 135, 15, 18),
                (135, 150, 15, 20),
                (150, 160, 20, 16),
                (160, 180, 20, 18),
                (180, 200, 20, 20),
                (200, 0, 20, 20)
            ]
            for price_lower, price_upper, interval_span, interval_num_item in price_range:
                if price_lower <= st_price_avg < price_upper:
                    interval_num = interval_num_item
                elif price_lower > price_upper:
                    interval_num = interval_num_item
                else:
                    continue
                interval_boundary = max(int(math.ceil(float(st_price_avg) / 10.0) * 10) - 200, 0)
                price_interval = get_price_interval(interval_num, interval_span, interval_boundary)
                price_type = min(max(int((asin_price - interval_boundary) // interval_span) + 1, 1), interval_num)
                return {
                    'price_interval': price_interval,
                    'price_type': str(price_type),
                    'interval_num': str(interval_num)
                }

    def hadle_cols(self, col_list1, col_list2, col_list3, df):
        for col1, col2, col3 in zip(col_list1, col_list2, col_list3):
            df = df.withColumn(
                col1, F.round(df[col2] / df[col3], 4)
            )
        return df

    def check_cols(self, actual_col_list, schema_col_list, df):
        for schema_col in schema_col_list:
            if schema_col not in actual_col_list:
                df = df.withColumn(
                    schema_col, F.lit(None).astype('int')
                )
        return df

    def read_data(self):
        print("1.1 读取dwd_st_asin_measure系列表")
        sql = f"""
            select search_term, asin from dwd_st_asin_measure 
            where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';
        """
        print("sql:", sql)
        self.df_st_asin_measure = self.spark.sql(sql).repartition(80, 'asin').cache()

        print("1.2 读取dim_asin_detail系列表")
        sql = f"""
            select 
                asin,
                asin_title,
                asin_price as price,
                asin_rating as rating,
                asin_is_new,
                asin_img_type,
                asin_is_aadd,
                asin_launch_time,
                asin_total_comments,
                asin_buy_box_seller_type,
                lower(asin_color) as asin_color,
                asin_brand_name,
                lower(asin_size) as asin_size,
                lower(asin_style) as asin_style
            from dim_asin_detail
            where site_name='{self.site_name}' 
            and date_type='{self.date_type}' 
            and date_info='{self.date_info}';
        """
        print("sql:", sql)
        self.df_asin_history_detail = self.spark.sql(sql).repartition(80, 'asin').cache()
        self.df_asin_history_detail = self.df_asin_history_detail.na.fill({
            "asin_img_type": "3",
             "asin_size": "none",
             "asin_style": "none"
        })

        print("1.3 读取dwd_st_measure系列表")
        sql = f"""
            select search_term, st_zr_orders as orders, st_price_std, st_price_avg from dwd_st_measure
            where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';
        """
        print("sql:", sql)
        self.df_st_measure = self.spark.sql(sql).repartition(80, 'search_term').cache()
        self.df_st_measure = self.df_st_measure.na.fill({
            "orders": 0
        })

        print("1.4 读取ods_st_key系列表")
        sql = f"""
            select cast(st_key as int) as search_term_id, search_term from ods_st_key where site_name='{self.site_name}';
        """
        print("sql:", sql)
        self.df_st_key = self.spark.sql(sql).repartition(80, 'search_term').cache()

        print("1.5 读取dws_top100_asin_info系列表")
        sql = f"""
            select search_term_id, top100_asin, top100_orders, top100_market_share, top100_is_new
            from dws_top100_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' 
            and date_info='{self.date_info}';
        """
        print("sql:", sql)
        self.df_top100_asin = self.spark.sql(sql).repartition(80, 'search_term_id').cache()

        print("1.6 读取dwd_asin_measure系列表")
        sql = f"""
            select asin, cast(asin_bsr_orders as int) as asin_bsr_orders, asin_ao_val from dwd_asin_measure
            where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';
        """
        print("sql:", sql)
        self.df_asin_measure = self.spark.sql(sql).repartition(80, 'asin').cache()
        self.df_asin_measure = self.df_asin_measure.na.fill({"asin_bsr_orders": 0})

        print("1.7 读取dim_seller_asin_history_info系列表")
        sql = f"""
            select asin, 
            case when upper(fd_country_name) = 'US' then 'US' 
            when upper(fd_country_name) = 'CN' then 'CN' 
            when upper(fd_country_name) = 'HK' then 'HK' 
            when upper(fd_country_name) = 'FR' then 'FR' 
            when upper(fd_country_name) = 'DE' then 'DE' 
            else 'OTHER' end as seller_name
            from dim_fd_asin_info where site_name = '{self.site_name}';
        """
        print("sql:", sql)
        self.df_seller_asin = self.spark.sql(sql).repartition(80, 'asin').cache()
        self.df_seller_asin = self.df_seller_asin.drop_duplicates(['asin'])

        print("1.8 读取dim_st_detail系列表")
        sql = f"""
            select search_term from dim_st_detail 
            where site_name='{self.site_name}' 
            and date_type='{self.date_type}' 
            and date_info='{self.date_info}';
        """
        print("sql:", sql)
        self.df_st_detail = self.spark.sql(sql).repartition(80, 'search_term').cache()

    def get_st_asin_detail(self):
        # 获取asin详情
        self.df_st_asin_detail = self.df_st_asin_measure.join(
            self.df_asin_history_detail, on=['asin'], how='left'
        ).join(
            self.df_asin_measure, on=['asin'], how='left'
        ).repartition(80, 'search_term')
        self.df_st_asin_detail = self.df_st_asin_detail.withColumn(
            'aadd_asin_bsr_orders',
            F.when(
                F.col("asin_is_aadd") == 1,
                F.lit(self.df_st_asin_detail.asin_bsr_orders)
            ).otherwise(F.lit(0))
        ).withColumn(
            'new_asin_bsr_orders_detail',
            F.when(
                F.col("asin_is_new") == 1,
                F.lit(self.df_st_asin_detail.asin_bsr_orders)
            ).otherwise(F.lit(0))
        )

    def get_st_attribute(self):
        # 通过asin_detail获取新品、bsr销量、A+产品bsr销量等信息
        self.df_st_attribute = self.df_st_asin_detail.groupby(["search_term"]).agg(
            F.sum("asin_is_new").alias('new_asin_num'),
            F.sum("asin_bsr_orders").alias('bsr_orders'),
            F.sum("aadd_asin_bsr_orders").alias('aadd_bsr_orders'),
            F.sum("new_asin_bsr_orders_detail").alias("new_asin_bsr_orders")
        )

    def get_img_type(self):
        # 获取图片类型信息
        self.df_st_asin_detail = self.df_st_asin_detail.withColumn(
            'img_type',
            self.u_get_img_type(self.df_st_asin_detail.asin_img_type)
        )
        self.df_st_img_type_group = self.df_st_asin_detail.groupby(["search_term"]).pivot(f"img_type").agg(
            F.count(f"search_term")
        )
        img_type_group_list = self.df_st_img_type_group.columns
        img_type_schema_list = ['aadd_video_num', 'aadd_no_video_num', 'no_aadd_no_video_num', 'no_aadd_video_num']
        self.df_st_img_type_group = self.check_cols(
            actual_col_list=img_type_group_list,
            schema_col_list=img_type_schema_list,
            df=self.df_st_img_type_group
        )

    def get_asin_count(self):
        # 获取asin总数信息
        self.df_asin_count = self.df_st_asin_detail.groupby(["search_term"]).agg(
            F.count(f"asin").alias('total_asin_num')
        )

    def get_price_range(self):
        df_st_asin_price_info = self.df_st_asin_detail.select("search_term", "price").filter("price > 0")
        df_st_price_agg = self.df_st_measure.select("search_term", "st_price_std", "st_price_avg")
        self.df_st_measure = self.df_st_measure.drop("st_price_std", "st_price_avg")
        df_st_asin_price_agg = df_st_asin_price_info.join(
            df_st_price_agg, on=['search_term'], how='left'
        )
        df_st_asin_price_agg = df_st_asin_price_agg.filter("price <= st_price_std")
        asin_pirce_info_map = self.u_get_price_interval(
            df_st_asin_price_agg.st_price_avg,
            df_st_asin_price_agg.price
        )
        df_st_asin_price_agg = df_st_asin_price_agg.withColumn(
            "price_interval",
            asin_pirce_info_map["price_interval"]
        ).withColumn(
            "price_type",
            asin_pirce_info_map["price_type"]
        ).withColumn(
            "interval_num",
            asin_pirce_info_map['interval_num']
        )
        df_st_price_interval = df_st_asin_price_agg.groupby('search_term').agg(
            F.first('price_interval').alias('price_interval'),
            F.first('interval_num').alias('interval_num')
        )
        self.df_st_price_group = df_st_asin_price_agg.groupby(['search_term']).pivot("price_type").agg(
            F.count(F.col("search_term"))
        )
        self.df_st_price_group = self.df_st_price_group.join(
            df_st_price_interval, on=['search_term'], how='left'
        )
        price_type_list = [f"{i}" for i in range(1, 21)]
        price_group_list = self.df_st_price_group.columns
        self.df_st_price_group = self.check_cols(
            actual_col_list=price_group_list,
            schema_col_list=price_type_list,
            df=self.df_st_price_group
        )
        self.df_st_price_group = self.df_st_price_group.select(
            "search_term", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13",
            "14", "15", "16", "17", "18", "19", "20", "price_interval", "interval_num"
        ).na.fill({
            "1": 0, "2": 0, "3": 0, "4": 0, "5": 0, "6": 0, "7": 0, "8": 0, "9": 0, "10": 0, "11": 0, "12": 0, "13": 0,
            "14": 0, "15": 0, "16": 0, "17": 0, "18": 0, "19": 0, "20": 0
        })
        self.df_st_price_group = self.df_st_price_group.join(
            self.df_asin_count, on=['search_term'], how='left'
        )

        column_conditions = {
            20: list(range(1, 21)),
            18: list(range(1, 19)),
            16: list(range(1, 17)),
            14: list(range(1, 15)),
            12: list(range(1, 13)),
            10: list(range(1, 11)),
            8: list(range(1, 9)),
            6: list(range(1, 7))
        }
        asin_count_case_expr = "CASE " + " ".join([
            f"WHEN interval_num = '{interval}' THEN CONCAT_WS(',', {', '.join([f'`{col}`' for col in columns])})"
            for interval, columns in column_conditions.items()
        ]) + " ELSE NULL END"
        self.df_st_price_group = self.df_st_price_group.withColumn(
            "price_interval_asin_count",
            F.expr(asin_count_case_expr)
        )
        asin_market_share_case_expr = "CASE " + " ".join([
            f"WHEN interval_num = '{interval}' THEN CONCAT_WS(',', {', '.join([f'ROUND(`{col}`/`total_asin_num`,4)' for col in columns])})"
            for interval, columns in column_conditions.items()
        ]) + " ELSE NULL END"
        self.df_st_price_group = self.df_st_price_group.withColumn(
            "price_interval_asin_market_share",
            F.expr(asin_market_share_case_expr)
        )
        self.df_st_price_group = self.df_st_price_group.select(
            'search_term', 'price_interval', 'price_interval_asin_count', 'price_interval_asin_market_share'
        )

    def get_ao_range(self):
        # 获取ao值信息
        df_st_asin_detail_ao = self.df_st_asin_detail.withColumn(
            'ao_type',
            F.when(
                (F.col("asin_ao_val") >= 0) & (F.col("asin_ao_val") < 0.05), "ao_range_val1"
            ).when(
                (F.col("asin_ao_val") >= 0.05) & (F.col("asin_ao_val") < 0.1), "ao_range_val2"
            ).when(
                (F.col("asin_ao_val") >= 0.1) & (F.col("asin_ao_val") < 0.2), "ao_range_val3"
            ).when(
                (F.col("asin_ao_val") >= 0.2) & (F.col("asin_ao_val") < 0.3), "ao_range_val4"
            ).when(
                (F.col("asin_ao_val") >= 0.3) & (F.col("asin_ao_val") < 0.4), "ao_range_val5"
            ).when(
                (F.col("asin_ao_val") >= 0.4) & (F.col("asin_ao_val") < 0.5), "ao_range_val6"
            ).when(
                (F.col("asin_ao_val") >= 0.5) & (F.col("asin_ao_val") < 0.6), "ao_range_val7"
            ).when(
                (F.col("asin_ao_val") >= 0.6) & (F.col("asin_ao_val") < 0.7), "ao_range_val8"
            ).when(
                (F.col("asin_ao_val") >= 0.7) & (F.col("asin_ao_val") < 0.8), "ao_range_val9"
            ).when(
                (F.col("asin_ao_val") >= 0.8) & (F.col("asin_ao_val") < 0.9), "ao_range_val10"
            ).when(
                (F.col("asin_ao_val") >= 0.9) & (F.col("asin_ao_val") < 1), "ao_range_val11"
            ).when(
                F.col("asin_ao_val") >= 1, "ao_range_val12"
            )
        )
        self.df_st_ao_group = df_st_asin_detail_ao.groupby(["search_term"]).pivot(f"ao_type").agg(
            F.count(f"search_term")
        )
        ao_range_val_list = [f"ao_range_val{i}" for i in range(1, 13)]
        ao_group_list = self.df_st_ao_group.columns
        self.df_st_ao_group = self.check_cols(
            actual_col_list=ao_group_list,
            schema_col_list=ao_range_val_list,
            df=self.df_st_ao_group
        )
        self.df_st_ao_group = self.df_st_ao_group.select(
            "search_term", "ao_range_val1", "ao_range_val2", "ao_range_val3", "ao_range_val4", "ao_range_val5",
            "ao_range_val6", "ao_range_val7", "ao_range_val8", "ao_range_val9", "ao_range_val10", "ao_range_val11",
            "ao_range_val12"
        )
        self.df_st_ao_group = self.df_st_ao_group.join(
            self.df_asin_count, on=['search_term'], how='left'
        )
        ao_range_market_share_list = [f"ao_range_market_share{i}" for i in range(1, 13)]
        total_asin_num_list = [f"total_asin_num" for i in range(1, 13)]
        self.df_st_ao_group = self.hadle_cols(
            col_list1=ao_range_market_share_list,
            col_list2=ao_range_val_list,
            col_list3=total_asin_num_list,
            df=self.df_st_ao_group
        )
        # self.df_st_ao_group = self.df_st_ao_group.drop("total_asin_num")

    def get_st_buy_box(self):
        # 获取配送方式信息
        df_st_asin_buy_box_detail = self.df_st_asin_detail.withColumn(
            "buy_box_seller_type",
            self.u_get_buy_box(self.df_st_asin_detail.asin_buy_box_seller_type)
        )
        df_st_asin_buy_box_detail_agg = df_st_asin_buy_box_detail.groupby(['search_term']).agg(
            F.concat_ws(",", F.collect_set(df_st_asin_buy_box_detail.buy_box_seller_type)).alias("buy_box_name"),
            F.concat_ws(",", F.collect_list(df_st_asin_buy_box_detail.buy_box_seller_type)).alias("buy_box_list")
        )
        self.df_st_buy_box = self.df_st_measure.join(
            df_st_asin_buy_box_detail_agg, on=['search_term'], how='left'
        )
        self.df_st_buy_box = self.df_st_buy_box.withColumn(
            "buy_box_num",
            self.u_get_buy_box_num(self.df_st_buy_box.buy_box_name, self.df_st_buy_box.buy_box_list)
        )
        self.df_st_buy_box = self.df_st_buy_box.drop("buy_box_list", "orders")

    def get_st_color(self):
        df_st_asin_color_detail = self.df_st_asin_detail.withColumn(
            "asin_is_color_flag",
            self.u_judge_color(F.col("asin_color"))
        )
        df_st_asin_color_detail = df_st_asin_color_detail.filter("asin_is_color_flag = 1")
        df_st_color_agg = df_st_asin_color_detail.groupby(['search_term', 'asin_color']).agg(
            F.sum("asin_bsr_orders").alias("color_bsr_orders"),
            F.count("asin_color").alias("color_quantity")
        )
        df_st_color_agg = df_st_color_agg.join(
            self.df_st_attribute, on=['search_term'], how='left'
        )
        df_st_color_agg = df_st_color_agg.withColumn(
            "color_bsr_orders_percent",
            F.round(F.col("color_bsr_orders") / F.col("bsr_orders"), 3)
        )
        df_st_color_agg = df_st_color_agg.select(
            "search_term", "asin_color", "color_quantity", "color_bsr_orders_percent"
        )
        self.df_st_color = df_st_color_agg.groupby(['search_term']).agg(
            F.concat_ws("&&&&", F.collect_list(df_st_color_agg.asin_color)).alias("color_name"),
            F.concat_ws(",", F.collect_list(df_st_color_agg.color_quantity)).alias("color_num"),
            F.concat_ws(",", F.collect_list(df_st_color_agg.color_bsr_orders_percent)).alias("color_bsr_orders_percent"),
        )

    def get_launch_time_range(self):
        # 获取上架时间信息
        time = datetime.now().date()
        one_month_time = time + timedelta(days=-30)
        three_months_time = time + timedelta(days=-90)
        six_months_time = time + timedelta(days=-180)
        twelve_months_time = time + timedelta(days=-360)
        fifteen_months_time = time + timedelta(days=-450)
        twenty_four_months_time = time + timedelta(days=-720)
        thirty_six_months_time = time + timedelta(days=-1080)
        df_st_asin_detail_launch_time = self.df_st_asin_detail.withColumn(
            'launch_time_type',
            F.when(
                F.col("asin_launch_time") >= one_month_time, "launch_time_num1"
            ).when(
                (F.col("asin_launch_time") >= three_months_time) & (F.col("asin_launch_time") < one_month_time), "launch_time_num2"
            ).when(
                (F.col("asin_launch_time") >= six_months_time) & (F.col("asin_launch_time") < three_months_time), "launch_time_num3"
            ).when(
                (F.col("asin_launch_time") >= twelve_months_time) & (F.col("asin_launch_time") < six_months_time), "launch_time_num4"
            ).when(
                (F.col("asin_launch_time") >= fifteen_months_time) & (F.col("asin_launch_time") < twelve_months_time), "launch_time_num5"
            ).when(
                (F.col("asin_launch_time") >= twenty_four_months_time) & (F.col("asin_launch_time") < fifteen_months_time), "launch_time_num6"
            ).when(
                (F.col("asin_launch_time") >= thirty_six_months_time) & (F.col("asin_launch_time") < twenty_four_months_time), "launch_time_num7"
            ).when(
                (F.col("asin_launch_time") < thirty_six_months_time), "launch_time_num8"
            )
        )
        self.df_st_launch_time_group = df_st_asin_detail_launch_time.groupby(["search_term"]).pivot(f"launch_time_type").agg(
            F.count(f"search_term")
        )
        launch_time_num_list = [f"launch_time_num{i}" for i in range(1, 9)]
        launch_time_group = self.df_st_launch_time_group.columns
        self.df_st_launch_time_group = self.check_cols(
            actual_col_list=launch_time_group,
            schema_col_list=launch_time_num_list,
            df=self.df_st_launch_time_group
        )
        self.df_st_launch_time_group = self.df_st_launch_time_group.select(
            "search_term", "launch_time_num1", "launch_time_num2", "launch_time_num3", "launch_time_num4",
            "launch_time_num5", "launch_time_num6", "launch_time_num7", "launch_time_num8"
        )
        self.df_st_launch_time_group = self.df_st_launch_time_group.join(
            self.df_asin_count, on=["search_term"], how='left'
        )
        launch_time_market_share_list = [f"launch_time_market_share{i}" for i in range(1, 9)]
        total_asin_num_list = [f"total_asin_num" for i in range(1, 9)]
        self.df_st_launch_time_group = self.hadle_cols(
            col_list1=launch_time_market_share_list,
            col_list2=launch_time_num_list,
            col_list3=total_asin_num_list,
            df=self.df_st_launch_time_group
        )
        self.df_st_launch_time_group = self.df_st_launch_time_group.drop("total_asin_num")

    def get_comments_num_range(self):
        # 获取评论数信息
        df_st_asin_detail_comments = self.df_st_asin_detail.withColumn(
            "comments_type",
            F.when(
                (F.col("asin_total_comments") >= 0) & (F.col("asin_total_comments") < 50), "comments_num1"
            ).when(
                (F.col("asin_total_comments") >= 50) & (F.col("asin_total_comments") < 100), "comments_num2"
            ).when(
                (F.col("asin_total_comments") >= 100) & (F.col("asin_total_comments") < 150), "comments_num3"
            ).when(
                (F.col("asin_total_comments") >= 150) & (F.col("asin_total_comments") < 200), "comments_num4"
            ).when(
                (F.col("asin_total_comments") >= 200) & (F.col("asin_total_comments") < 300), "comments_num5"
            ).when(
                (F.col("asin_total_comments") >= 300) & (F.col("asin_total_comments") < 400), "comments_num6"
            ).when(
                (F.col("asin_total_comments") >= 400) & (F.col("asin_total_comments") < 500), "comments_num7"
            ).when(
                (F.col("asin_total_comments") >= 500) & (F.col("asin_total_comments") < 600), "comments_num8"
            ).when(
                (F.col("asin_total_comments") >= 600) & (F.col("asin_total_comments") < 700), "comments_num9"
            ).when(
                (F.col("asin_total_comments") >= 700) & (F.col("asin_total_comments") < 800), "comments_num10"
            ).when(
                (F.col("asin_total_comments") >= 800) & (F.col("asin_total_comments") < 900), "comments_num11"
            ).when(
                (F.col("asin_total_comments") >= 900) & (F.col("asin_total_comments") < 1000), "comments_num12"
            ).when(
                F.col("asin_total_comments") >= 1000, "comments_num13"
            )
        )
        self.df_st_comments_group = df_st_asin_detail_comments.groupby(["search_term"]).pivot(f"comments_type").agg(
            F.count(f"search_term")
        )
        comments_num_list = [f"comments_num{i}" for i in range(1, 14)]
        comments_group_list = self.df_st_comments_group.columns
        self.df_st_comments_group = self.check_cols(
            actual_col_list=comments_group_list,
            schema_col_list=comments_num_list,
            df=self.df_st_comments_group
        )
        self.df_st_comments_group = self.df_st_comments_group.select(
            "search_term", "comments_num1", "comments_num2", "comments_num3", "comments_num4", "comments_num5",
            "comments_num6", "comments_num7", "comments_num8", "comments_num9", "comments_num10", "comments_num11",
            "comments_num12", "comments_num13"
        )
        self.df_st_comments_group = self.df_st_comments_group.join(
            self.df_asin_count, on=["search_term"], how='left'
        )
        comments_num_market_share_list = [f"comments_num_market_share{i}" for i in range(1, 14)]
        total_asin_num_list = [f"total_asin_num" for i in range(1, 14)]
        self.df_st_comments_group = self.hadle_cols(
            col_list1=comments_num_market_share_list,
            col_list2=comments_num_list,
            col_list3=total_asin_num_list,
            df=self.df_st_comments_group
        )
        self.df_st_comments_group = self.df_st_comments_group.drop("total_asin_num")

    def get_rating_group(self):
        df_st_asin_detail_rating = self.df_st_asin_detail.withColumn(
            "rating_type",
            F.when(
                (F.col("rating") >= 0) & (F.col("rating") < 1), "rating_section1"
            ).when(
                (F.col("rating") >= 1) & (F.col("rating") < 2), "rating_section2"
            ).when(
                (F.col("rating") >= 2) & (F.col("rating") < 3), "rating_section3"
            ).when(
                (F.col("rating") >= 3) & (F.col("rating") < 4), "rating_section4"
            ).when(
                (F.col("rating") >= 4) & (F.col("rating") < 4.3), "rating_section5"
            ).when(
                (F.col("rating") >= 4.3) & (F.col("rating") < 4.5), "rating_section6"
            ).when(
                F.col("rating") >= 4.5, "rating_section7"
            )
        )
        self.df_st_rating_group = df_st_asin_detail_rating.groupby(["search_term"]).pivot(f"rating_type").agg(
            F.count(f"search_term")
        )
        rating_section_list = [f"rating_section{i}" for i in range(1, 8)]
        rating_group_list = self.df_st_rating_group.columns
        self.df_st_rating_group = self.check_cols(
            actual_col_list=rating_group_list,
            schema_col_list=rating_section_list,
            df=self.df_st_rating_group
        )
        self.df_st_rating_group = self.df_st_rating_group.select(
            "search_term", "rating_section1", "rating_section2", "rating_section3", "rating_section4",
            "rating_section5", "rating_section6", "rating_section7"
        ).na.fill({
            "rating_section1": 0, "rating_section2": 0, "rating_section3": 0, "rating_section4": 0,
            "rating_section5": 0, "rating_section6": 0, "rating_section7": 0
        })
        self.df_st_rating_group = self.df_st_rating_group.join(
            self.df_asin_count, on=["search_term"], how='left'
        )
        total_asin_num_list = [f"total_asin_num" for i in range(1, 14)]
        rating_market_share_section_list = [f"rating_market_share_section{i}" for i in range(1, 8)]
        self.df_st_rating_group = self.hadle_cols(
            col_list1=rating_market_share_section_list,
            col_list2=rating_section_list,
            col_list3=total_asin_num_list,
            df=self.df_st_rating_group
        )
        self.df_st_rating_group = self.df_st_rating_group.na.fill({
            "rating_market_share_section1": 0.0, "rating_market_share_section2": 0.0,
            "rating_market_share_section3": 0.0, "rating_market_share_section4": 0.0,
            "rating_market_share_section5": 0.0, "rating_market_share_section6": 0.0,
            "rating_market_share_section7": 0.0
        })
        self.df_st_rating_group = self.df_st_rating_group.withColumn(
            "rating_section",
            F.concat_ws(",",
                F.col("rating_section1"),
                F.col("rating_section2"),
                F.col("rating_section3"),
                F.col("rating_section4"),
                F.col("rating_section5"),
                F.col("rating_section6"),
                F.col("rating_section7")
            )
        )
        self.df_st_rating_group = self.df_st_rating_group.withColumn(
            "rating_market_share_section",
            F.concat_ws(",",
                F.col("rating_market_share_section1"),
                F.col("rating_market_share_section2"),
                F.col("rating_market_share_section3"),
                F.col("rating_market_share_section4"),
                F.col("rating_market_share_section5"),
                F.col("rating_market_share_section6"),
                F.col("rating_market_share_section7")
            )
        )
        self.df_st_rating_group = self.df_st_rating_group.drop(
            "total_asin_num", "rating_section1", "rating_section2", "rating_section3", "rating_section4",
            "rating_section5", "rating_section6", "rating_section7", "rating_market_share_section1",
            "rating_market_share_section2", "rating_market_share_section3", "rating_market_share_section4",
            "rating_market_share_section5", "rating_market_share_section6", "rating_market_share_section7"
        )

    def get_st_asin_seller(self):
        # 获取卖家相关信息
        df_seller_asin_detail = self.df_st_asin_measure.join(
            self.df_seller_asin, on=['asin'], how='left'
        ).join(
            self.df_asin_measure, on=['asin'], how='left'
        )
        df_seller_asin_detail = df_seller_asin_detail.na.fill({
            "seller_name": 'OTHER'
        })
        df_seller_asin_agg = df_seller_asin_detail.groupBy(['search_term']).agg(
            F.concat_ws(",", F.collect_list("seller_name")).alias("seller_name_list"),
            F.concat_ws(",", F.collect_list("asin_bsr_orders")).alias("seller_bsr_orders_list"),
            F.concat_ws(",", F.collect_set("seller_name")).alias("seller_name_type")
        )
        self.df_st_seller = self.df_st_measure.join(
            df_seller_asin_agg, on=['search_term'], how='left'
        )
        self.df_st_seller = self.df_st_seller.withColumn(
            "seller_num",
            self.u_get_seller_num(self.df_st_seller.seller_name_type, self.df_st_seller.seller_name_list)
        ).withColumn(
            "seller_bsr_orders",
            self.u_get_seller_bsr_orders(self.df_st_seller.seller_name_type, self.df_st_seller.seller_name_list, self.df_st_seller.seller_bsr_orders_list)
        )
        self.df_st_seller = self.df_st_seller.drop("st_zr_orders", "orders")
        self.df_st_seller = self.df_st_seller.withColumnRenamed("seller_name_type", "seller_name")

    def get_top20_asin(self):
        # 获取特定asin下某品牌的总商品数,新品数,bsr销量
        df_st_brand_asin_agg = self.df_st_asin_detail.groupBy(["search_term", "asin_brand_name"]).agg(
            F.count("asin").alias("brand_total_asin"),
            F.sum("asin_is_new").alias("brand_new_asin"),
            F.sum("asin_bsr_orders").alias("brand_bsr_orders")
        )
        df_st_brand_asin_agg = df_st_brand_asin_agg.na.fill({
            "brand_new_asin": 0, "brand_bsr_orders": 0.0, "asin_brand_name": "None"
        })
        df_st_brand_asin_agg = df_st_brand_asin_agg.filter("asin_brand_name not in ('null', 'None', 'none', 'NULL')")

        brand_bsr_orders_window = Window.partitionBy(["search_term"]).orderBy(
            df_st_brand_asin_agg.brand_bsr_orders.desc_nulls_last()
        )
        df_st_top20_asin_detail_agg = df_st_brand_asin_agg.withColumn(
            "brand_bsr_orders_rank",
            F.row_number().over(window=brand_bsr_orders_window)
        )
        df_st_top20_asin_detail_agg = df_st_top20_asin_detail_agg.filter("brand_bsr_orders_rank<=20")
        df_st_top20_agg = df_st_top20_asin_detail_agg.select("search_term", "asin_brand_name")
        df_st_top20_brand = df_st_top20_agg.groupBy(["search_term"]).agg(
            F.concat_ws("&&&&", F.collect_list(df_st_top20_agg.asin_brand_name)).alias("top20_brand")
        )
        df_st_top20_brand_asin_agg = df_st_top20_asin_detail_agg.join(
            self.df_st_attribute, on=["search_term"], how='left'
        )
        df_st_top20_brand_asin_agg = df_st_top20_brand_asin_agg.withColumn(
            "brand_new_num_proportion",
            df_st_top20_brand_asin_agg.brand_new_asin / df_st_top20_brand_asin_agg.brand_total_asin
        ).withColumn(
            "brand_market_share",
            F.when(
                F.col("bsr_orders") == 0, F.lit(0)
            ).otherwise(
                df_st_top20_brand_asin_agg.brand_bsr_orders / df_st_top20_brand_asin_agg.bsr_orders
            )
        )
        df_st_top20_brand_agg = df_st_top20_brand_asin_agg.groupBy(["search_term"]).agg(
            F.concat_ws(",", F.collect_list(df_st_top20_brand_asin_agg.brand_new_num_proportion)).alias("top20_brand_new_num_proportion"),
            F.concat_ws(",", F.collect_list(df_st_top20_brand_asin_agg.brand_bsr_orders)).alias("top20_brand_bsr_oders"),
            F.concat_ws(",", F.collect_list(df_st_top20_brand_asin_agg.brand_market_share)).alias("top20_brand_market_share")
        )
        self.df_st_top20 = self.df_st_measure.join(
            df_st_top20_brand_agg, on=["search_term"], how='left'
        ).join(
            df_st_top20_brand, on=['search_term'], how='left'
        )
        # 弃用字段
        self.df_st_top20 = self.df_st_top20.withColumn(
            "top20_asin", F.lit(None)
        ).withColumn(
            "top20_orders", F.lit(None)
        )
        self.df_st_top20 = self.df_st_top20.select(
            "search_term", "top20_asin", "top20_orders", "top20_brand", "top20_brand_new_num_proportion",
            "top20_brand_bsr_oders", "top20_brand_market_share"
        )

    def get_st_size(self):
        df_st_asin_size = self.df_st_asin_detail.withColumn(
            "asin_size_flag",
            self.u_judge_size(F.col("asin_size"), F.col("asin_style"))
        )
        df_st_asin_size = df_st_asin_size.filter("asin_size_flag not in ('none','null')")
        df_st_size_agg = df_st_asin_size.groupby(['search_term', 'asin_size_flag']).agg(
            F.sum("asin_bsr_orders").alias("size_bsr_orders"),
            F.count("asin_size_flag").alias("size_quantity")
        )
        df_st_size_agg = df_st_size_agg.join(
            self.df_st_attribute, on=['search_term'], how='left'
        )
        df_st_size_agg = df_st_size_agg.withColumn(
            "size_bsr_orders_percent",
            F.round(F.col("size_bsr_orders") / F.col("bsr_orders"), 3)
        )
        df_st_size_agg = df_st_size_agg.select(
            "search_term", "asin_size_flag", "size_quantity", "size_bsr_orders_percent"
        )
        self.df_st_size = df_st_size_agg.groupby(['search_term']).agg(
            F.concat_ws("&&&&", F.collect_list(df_st_size_agg.asin_size_flag)).alias("size_name"),
            F.concat_ws(",", F.collect_list(df_st_size_agg.size_quantity)).alias("size_num"),
            F.concat_ws(",", F.collect_list(df_st_size_agg.size_bsr_orders_percent)).alias("size_bsr_orders_percent")
        )

    def get_asin_package_num(self):
        self.df_st_asin_detail = self.df_st_asin_detail.withColumn(
            "package_num",
            self.u_get_package_num(self.df_st_asin_detail.asin_title)
        )
        self.df_st_asin_detail = self.df_st_asin_detail.drop("asin_title")
        self.df_st_asin_detail = self.df_st_asin_detail.na.fill({
            "package_num": 1
        })
        self.df_st_package_num = self.df_st_asin_detail.groupby(["search_term"]).agg(
            F.sort_array(F.collect_list(F.struct(F.col("package_num"), F.col("asin"))), False).alias("value")
        )
        self.df_st_package_num = self.df_st_package_num.select(
            "search_term",
            F.expr("transform(value, x -> x.asin)").alias("asin_list"),
            F.expr("transform(value, x -> x.package_num)").alias("package_num_list")
        )
        # self.df_st_package_num.show(10, truncate=False)
        self.df_st_package_num = self.df_st_package_num.withColumn(
            "package_num_trend",
            self.u_get_package_num_trend(self.df_st_package_num.package_num_list)
        ).withColumn(
            "package_num_trend_market_share",
            self.u_get_package_num_trend_market_share(self.df_st_package_num.package_num_list)
        ).withColumn(
            "package_num_corresponding_asin",
            self.u_get_package_num_corresponding_asin(self.df_st_package_num.asin_list)
        )
        self.df_st_package_num = self.df_st_package_num.drop("package_num_array", "asin_list")

    def handle_data_group(self):
        self.df_st_measure = self.df_st_measure.join(
            self.df_st_key, on=['search_term'], how='inner'
        ).join(
            self.df_st_detail, on=['search_term'], how='inner'
        )
        self.df_save = self.df_st_measure.join(
            self.df_st_price_group, on=['search_term'], how='left'
        ).join(
            self.df_st_ao_group, on=['search_term'], how='left'
        ).join(
            self.df_st_attribute, on=['search_term'], how='left'
        ).join(
            self.df_st_img_type_group, on=['search_term'], how='left'
        ).join(
            self.df_top100_asin, on=['search_term_id'], how='left'
        ).join(
            self.df_st_launch_time_group, on=['search_term'], how='left'
        ).join(
            self.df_st_comments_group, on=['search_term'], how='left'
        ).join(
            self.df_st_buy_box, on=['search_term'], how='left'
        ).join(
            self.df_st_seller, on=['search_term'], how='left'
        ).join(
            self.df_st_color, on=['search_term'], how='left'
        ).join(
            self.df_st_top20, on=['search_term'], how='left'
        ).join(
            self.df_st_rating_group, on=['search_term'], how='left'
        ).join(
            self.df_st_size, on=['search_term'], how='left'
        ).join(
            self.df_st_package_num, on=['search_term'], how='left'
        )

        for i in range(1, 19):
            self.df_save = self.df_save.withColumn(f"price_range_num{i}_deprecated", F.lit(-1))
            self.df_save = self.df_save.withColumn(f"price_range_market_share{i}_deprecated", F.lit(-1.0))
        self.df_save = self.df_save.select(
            "search_term_id", "search_term",
            "price_range_num1_deprecated", "price_range_num2_deprecated", "price_range_num3_deprecated",
            "price_range_num4_deprecated", "price_range_num5_deprecated", "price_range_num6_deprecated",
            "price_range_num7_deprecated", "price_range_num8_deprecated", "price_range_num9_deprecated",
            "price_range_num10_deprecated", "price_range_num11_deprecated", "price_range_num12_deprecated",
            "price_range_num13_deprecated", "price_range_num14_deprecated", "price_range_num15_deprecated",
            "price_range_num16_deprecated", "price_range_num17_deprecated", "price_range_num18_deprecated",
            "price_range_market_share1_deprecated", "price_range_market_share2_deprecated",
            "price_range_market_share3_deprecated", "price_range_market_share4_deprecated",
            "price_range_market_share5_deprecated", "price_range_market_share6_deprecated",
            "price_range_market_share7_deprecated", "price_range_market_share8_deprecated",
            "price_range_market_share9_deprecated", "price_range_market_share10_deprecated",
            "price_range_market_share11_deprecated", "price_range_market_share12_deprecated",
            "price_range_market_share13_deprecated", "price_range_market_share14_deprecated",
            "price_range_market_share15_deprecated", "price_range_market_share16_deprecated",
            "price_range_market_share17_deprecated", "price_range_market_share18_deprecated",
            "ao_range_val1", "ao_range_val2", "ao_range_val3", "ao_range_val4", "ao_range_val5", "ao_range_val6",
            "ao_range_val7", "ao_range_val8", "ao_range_val9", "ao_range_val10", "ao_range_val11", "ao_range_val12",
            "ao_range_market_share1", "ao_range_market_share2", "ao_range_market_share3", "ao_range_market_share4",
            "ao_range_market_share5", "ao_range_market_share6", "ao_range_market_share7", "ao_range_market_share8",
            "ao_range_market_share9", "ao_range_market_share10", "ao_range_market_share11", "ao_range_market_share12",
            "total_asin_num", "new_asin_num", "orders", "bsr_orders",
            "aadd_bsr_orders", "aadd_video_num", "aadd_no_video_num", "no_aadd_no_video_num", "no_aadd_video_num",
            "top100_asin", "top100_orders", "top100_market_share", "top100_is_new",
            "launch_time_num1", "launch_time_num2", "launch_time_num3", "launch_time_num4",
            "launch_time_num5", "launch_time_num6", "launch_time_num7", "launch_time_num8",
            "launch_time_market_share1", "launch_time_market_share2",
            "launch_time_market_share3", "launch_time_market_share4",
            "launch_time_market_share5", "launch_time_market_share6",
            "launch_time_market_share7", "launch_time_market_share8",
            "top20_asin", "top20_orders", "top20_brand", "top20_brand_new_num_proportion",
            "top20_brand_bsr_oders", "top20_brand_market_share",
            "comments_num1", "comments_num2", "comments_num3", "comments_num4",
            "comments_num5", "comments_num6", "comments_num7", "comments_num8",
            "comments_num9", "comments_num10", "comments_num11", "comments_num12",
            "comments_num13",
            "comments_num_market_share1", "comments_num_market_share2",
            "comments_num_market_share3", "comments_num_market_share4",
            "comments_num_market_share5", "comments_num_market_share6",
            "comments_num_market_share7", "comments_num_market_share8",
            "comments_num_market_share9", "comments_num_market_share10",
            "comments_num_market_share11", "comments_num_market_share12",
            "comments_num_market_share13",
            "buy_box_name", "buy_box_num",
            "seller_name", "seller_num", "seller_bsr_orders",
            "color_name", "color_num",
            "new_asin_bsr_orders",
            "rating_section", "rating_market_share_section",
            "size_name", "size_num",
            "package_num_trend", "package_num_trend_market_share", "package_num_corresponding_asin",
            "price_interval", "price_interval_asin_count", "price_interval_asin_market_share",
            "color_bsr_orders_percent",
            "size_bsr_orders_percent"
        )

        self.df_save = self.df_save.withColumn(
            "created_time",
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')
        ).withColumn(
            "updated_time",
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')
        )
        self.df_save = self.df_save.na.fill({
            "ao_range_val1": 0, "ao_range_val2": 0, "ao_range_val3": 0, "ao_range_val4": 0, "ao_range_val5": 0,
            "ao_range_val6": 0, "ao_range_val7": 0, "ao_range_val8": 0, "ao_range_val9": 0,
            "ao_range_val10": 0, "ao_range_val11": 0, "ao_range_val12": 0,
            "ao_range_market_share1": 0.0, "ao_range_market_share2": 0.0, "ao_range_market_share3": 0.0,
            "ao_range_market_share4": 0.0, "ao_range_market_share5": 0.0, "ao_range_market_share6": 0.0,
            "ao_range_market_share7": 0.0, "ao_range_market_share8": 0.0, "ao_range_market_share9": 0.0,
            "ao_range_market_share10": 0.0, "ao_range_market_share11": 0.0, "ao_range_market_share12": 0.0,
            "total_asin_num": 0, "new_asin_num": 0, "orders": 0, "bsr_orders": 0, "aadd_video_num": 0,
            "aadd_no_video_num": 0, "no_aadd_no_video_num": 0, "no_aadd_video_num": 0,
            "launch_time_num1": 0, "launch_time_num2": 0, "launch_time_num3": 0, "launch_time_num4": 0,
            "launch_time_num5": 0, "launch_time_num6": 0, "launch_time_num7": 0, "launch_time_num8": 0,
            "launch_time_market_share1": 0.0, "launch_time_market_share2": 0.0,
            "launch_time_market_share3": 0.0, "launch_time_market_share4": 0.0,
            "launch_time_market_share5": 0.0, "launch_time_market_share6": 0.0,
            "launch_time_market_share7": 0.0, "launch_time_market_share8": 0.0,
            "comments_num1": 0, "comments_num2": 0, "comments_num3": 0, "comments_num4": 0, "comments_num5": 0,
            "comments_num6": 0, "comments_num7": 0, "comments_num8": 0, "comments_num9": 0,
            "comments_num10": 0, "comments_num11": 0, "comments_num12": 0, "comments_num13": 0,
            "comments_num_market_share1": 0.0, "comments_num_market_share2": 0.0,
            "comments_num_market_share3": 0.0, "comments_num_market_share4": 0.0,
            "comments_num_market_share5": 0.0, "comments_num_market_share6": 0.0,
            "comments_num_market_share7": 0.0,
            "comments_num_market_share8": 0.0, "comments_num_market_share9": 0.0,
            "comments_num_market_share10": 0.0, "comments_num_market_share11": 0.0,
            "comments_num_market_share12": 0.0, "comments_num_market_share13": 0.0
        })
        # 预留字段补全
        self.df_save = self.df_save.withColumn(
            "re_string_field1", F.lit("null")
        ).withColumn(
            "re_string_field2", F.lit("null")
        ).withColumn(
            "re_string_field3", F.lit("null")
        ).withColumn(
            "re_int_field1", F.lit(0)
        ).withColumn(
            "re_int_field2", F.lit(0)
        ).withColumn(
            "re_int_field3", F.lit(0)
        ).withColumn(
            "site_name", F.lit(self.site_name)
        ).withColumn(
            "date_type", F.lit(self.date_type)
        ).withColumn(
            "date_info", F.lit(self.date_info)
        )

    def handle_data(self):
        self.get_st_asin_detail()
        self.get_st_attribute()
        self.get_img_type()
        self.get_asin_count()
        self.get_price_range()
        self.get_ao_range()
        self.get_st_buy_box()
        self.get_st_color()
        self.get_launch_time_range()
        self.get_comments_num_range()
        self.get_rating_group()
        self.get_st_asin_seller()
        self.get_top20_asin()
        self.get_st_size()
        self.get_asin_package_num()
        self.handle_data_group()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwtAbaStAnalyticsReport(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()