"""
   @Author      : wangrui
   @Description : 流量选品
   @SourceTable :
                  dwd_asin_measure
                  dim_asin_detail
                  ods_bsr_end
                  dim_asin_bs_category
                  dim_fd_asin_info
                  dim_asin_volume


   @SinkTable   : dwt_flow_asin
   @CreateTime  : 2023/01/10 07:55
   @UpdateTime  : 2023/01/10 07:55
"""
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from datetime import datetime, timedelta
from pyspark.sql import functions as F
from utils.db_util import DBUtil
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import *
from utils.DorisHelper import DorisHelper


class DwtFlowAsin(Templates):
    def __init__(self, site_name="us", date_type="week", date_info="2022-1"):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.hive_tb = f"dwt_flow_asin"
        self.partition_dict = {
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
        self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)
        self.spark = self.create_spark_object(
            app_name=f"{self.hive_tb}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.previous_date = self.udf_get_previous_last_30_day(self)
        self.current_date = self.udf_get_current_time(self)
        # doris相关配置
        self.doris_db = "test" if self.test_flag == "test" else "selection"
        self.asin_latest_detail_table = f"{self.site_name}_asin_latest_detail"
        # 写入、分区初始化
        self.df_save = self.spark.sql(f"select 1+1;")
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.reset_partitions(60)
        self.launch_time_interval_dict = self.get_launch_time_interval_dict()
        # 初始化全局df
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_asin_measure = self.spark.sql(f"select 1+1;")
        self.df_bsr_end = self.spark.sql(f"select 1+1;")
        self.df_asin_bs_category = self.spark.sql(f"select 1+1;")
        self.df_fd_asin_info = self.spark.sql(f"select 1+1;")
        self.df_flow_asin_last = self.spark.sql(f"select 1+1;")
        self.df_title_matching_degree = self.spark.sql(f"select 1+1;")

    @staticmethod
    def udf_get_previous_last_30_day(self):
        self.df_date = self.spark.sql(f"select * from dim_date_20_to_30 ;")
        df = self.df_date.toPandas()
        if self.date_type == 'last30day':
            df_loc = df.loc[df.date == f'{self.date_info}']
            current_date_id = list(df_loc.id)[0]
            original_date_id = int(current_date_id) - 30
            df_loc = df.loc[df.id == original_date_id]
            original_year_month = list(df_loc.year_month)[0]
            df_loc = df.loc[(df.year_month == original_year_month) & (df.day == 1)]
            original_year_month_id = list(df_loc.id)[0]
            previous_year_month_id = int(original_year_month_id) - 1
            df_loc = df.loc[df.id == previous_year_month_id]
            previous_date = list(df_loc.year_month)[0]
            return previous_date
        elif self.date_type in ['month', 'month_week']:
            df_loc = df.loc[(df.year_month == f'{self.date_info}') & (df.day == 1)]
            current_month_id = list(df_loc.id)[0]
            previous_month_id = int(current_month_id) - 1
            df_loc = df.loc[df.id == previous_month_id]
            previous_date = list(df_loc.year_month)[0]
            return previous_date
        elif self.date_type == '4_week':
            df_loc = df.loc[(df.year_week == f'{self.date_info}') & (df.week_day == 1)]
            current_4_week_id = list(df_loc.id)[0]
            df_loc = df.loc[df.id == int(current_4_week_id) - 21]
            current_4_week_month = list(df_loc.year_month)[0]
            df_loc = df.loc[(df.year_month == current_4_week_month) & (df.day == 1)]
            current_4_week_month_id = list(df_loc.id)[0]
            previous_4_week_month_id = int(current_4_week_month_id) - 1
            df_loc = df.loc[df.id == previous_4_week_month_id]
            previous_date = list(df_loc.year_month)[0]
            return previous_date

    @staticmethod
    def udf_get_current_time(self):
        self.df_date = self.spark.sql(f"select * from dim_date_20_to_30 ;")
        df = self.df_date.toPandas()
        if self.date_type == 'month':
            df_loc = df.loc[(df.year_month == f'{self.date_info}') & (df.day == 25)]
            current_id = list(df_loc.id)[0]
            df_loc = df.loc[df.id == current_id]
            current_date = list(df_loc.date)[0]
            return current_date
        elif self.date_type in ['week', '4_week']:
            df_loc = df.loc[(df.year_week == f'{self.date_info}') & (df.week_day == 7)]
            current_id = list(df_loc.id)[0]
            df_loc = df.loc[df.id == current_id]
            current_date = list(df_loc.date)[0]
            return current_date
        elif self.date_type in ['last30day', 'day']:
            return self.date_info
        elif self.date_type in ['month_week']:
            return str(datetime.now().date())

    @staticmethod
    def get_launch_time_interval_dict():
        cur_date = datetime.now().date()
        return {
            "one_month": (cur_date + timedelta(days=-30)).strftime('%Y-%m-%d'),
            "three_month": (cur_date + timedelta(days=-90)).strftime('%Y-%m-%d'),
            "six_month": (cur_date + timedelta(days=-180)).strftime('%Y-%m-%d'),
            "twelve_month": (cur_date + timedelta(days=-360)).strftime('%Y-%m-%d'),
            "twenty_four_month": (cur_date + timedelta(days=-720)).strftime('%Y-%m-%d'),
            "thirty_six_month": (cur_date + timedelta(days=-1080)).strftime('%Y-%m-%d')
        }

    @staticmethod
    def calculate_change(current_col, previous_col):
        rise_col = F.col(current_col) - F.col(previous_col)
        change_col = F.when((F.col(previous_col).isNotNull()) & (F.col(previous_col) != 0),
                            F.round((F.col(current_col) - F.col(previous_col)) / F.col(previous_col), 4)
                            ).otherwise(None)
        return rise_col, change_col

    def read_data(self):
        print("1.获取dwd_asin_measure,得到各种类型的统计、ao值、自然流量占比、月销信息、bsr销量信息")
        sql = f"""
            select asin, round(asin_ao_val, 3) as asin_ao_val, round(asin_ao_val_matrix, 3) as matrix_ao_val, 
            asin_zr_counts, asin_sp_counts, (asin_sb1_counts + asin_sb2_counts) as asin_sb_counts, 
            asin_sb3_counts as asin_vi_counts, asin_bs_counts, asin_ac_counts, asin_tr_counts, asin_er_counts, asin_st_counts, 
            asin_bsr_orders as bsr_orders, asin_amazon_orders, round(asin_zr_flow_proportion, 3) as zr_flow_proportion, 
            round(asin_flow_proportion_matrix, 3) as matrix_flow_proportion 
            from dwd_asin_measure where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'"""
        print("sql:" + sql)
        self.df_asin_measure = self.spark.sql(sqlQuery=sql)
        self.df_asin_measure = self.df_asin_measure.repartition(60).persist(StorageLevel.DISK_ONLY)
        self.df_asin_measure.show(10, truncate=False)
        print("2.获取dim_asin_detail,得到asin详情")
        sql = f"""
            select asin, asin_img_url, lower(asin_title) as asin_title, asin_title_len, asin_price, asin_rating, asin_total_comments, 
            asin_buy_box_seller_type, seller_json, asin_page_inventory, asin_category_desc, asin_volume, asin_weight, asin_length, asin_width, asin_height, 
            asin_color, asin_size, asin_style, asin_is_sale, asin_launch_time, asin_is_new, asin_img_num, asin_img_type, asin_material, 
            lower(asin_brand_name) as asin_brand_name, asin_activity_type, act_one_two_val, act_three_four_val, act_five_six_val, act_eight_val, 
            one_star,two_star, three_star, four_star, five_star, low_star, together_asin, ac_name, variation_num, account_name, account_id, parent_asin, 
            asin_lob_info, is_contains_lob_info, is_package_quantity_abnormal, asin_quantity_variation_type, package_quantity, 
            asin_is_movie as is_movie_label, asin_is_brand as is_brand_label, asin_is_alarm as is_alarm_brand, asin_is_self, 
            date_format(created_time, 'yyyy-MM-dd HH:mm:ss') as asin_crawl_date, asin_bought_month, asin_image_view,  
            case when product_description is not null then 1 else 0 end as is_with_product_description, asin_describe, 
            category_id as top_category_id, category_first_id as top_category_first_id, customer_reviews_json, img_list as img_info, 
            asin_follow_sellers as follow_sellers_count 
            from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"""
        print("sql:" + sql)
        self.df_asin_detail = self.spark.sql(sqlQuery=sql)
        self.df_asin_detail = self.df_asin_detail.repartition(60).persist(StorageLevel.DISK_ONLY)
        self.df_asin_detail.show(10, truncate=False)
        print("3.获取ods_bsr_end,获取有效rank信息")
        sql = f"""select rank as limit_rank, category_id as category_first_id from ods_bsr_end where site_name='{self.site_name}'"""
        print("sql:" + sql)
        df_bsr_end = self.spark.sql(sqlQuery=sql)
        self.df_bsr_end = F.broadcast(df_bsr_end)
        self.df_bsr_end.show(10, truncate=False)
        print("4.获取dim_asin_bs_category,获取分类名称")
        sql = f"""
            select asin, asin_bs_cate_1_rank as first_category_rank, asin_bs_cate_current_rank as current_category_rank, 
            asin_bs_cate_1_id as category_first_id, asin_bs_cate_current_id as category_id 
            from dim_asin_bs_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}'"""
        print("sql:" + sql)
        self.df_asin_bs_category = self.spark.sql(sqlQuery=sql)
        self.df_asin_bs_category = self.df_asin_bs_category.repartition(60).persist(StorageLevel.DISK_ONLY)
        self.df_asin_bs_category.show(10, truncate=False)
        print("5.获取dim_fd_asin_info,得到卖家相关信息")
        if (self.date_type in ['month', 'month_week'] and self.date_info >= '2024-05') or (self.date_type == '4_week' and self.date_info >= '2024-21'):
            sql = f"""
            select fd_unique as account_id, upper(fd_country_name) as seller_country_name from dim_fd_asin_info 
            where site_name='{self.site_name}' and fd_unique is not null group by fd_unique, fd_country_name"""
        else:
            sql = f"""
            select account_id, account_name, seller_country_name, asin 
            from (select fd_unique as account_id, fd_account_name as account_name, upper(fd_country_name) as seller_country_name, asin, 
            ROW_NUMBER() OVER (PARTITION BY asin ORDER BY updated_at DESC) AS t_rank 
            from dim_fd_asin_info where site_name = '{self.site_name}' and fd_unique is not null) tmp
            where tmp.t_rank = 1
            """
        self.df_fd_asin_info = self.spark.sql(sqlQuery=sql)
        self.df_fd_asin_info = self.df_fd_asin_info.repartition(60).persist(StorageLevel.DISK_ONLY)
        self.df_fd_asin_info.show(10, truncate=False)
        print("6.获取上一个最近30天的整合结果")
        sql = f"""
            select asin, round(asin_ao_val, 3) as previous_asin_ao_val, asin_price as previous_asin_price, 
            sales as pervious_sales, variation_num as previous_variation_num, asin_rating as previous_asin_rating, 
            bsr_orders as previous_bsr_orders, asin_total_comments as previous_asin_total_comments, 
            first_category_rank as previous_first_category_rank from dwt_flow_asin 
            where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.previous_date}'
            """
        print("sql:" + sql)
        self.df_flow_asin_last = self.spark.sql(sqlQuery=sql)
        self.df_flow_asin_last = self.df_flow_asin_last.repartition(60).persist(StorageLevel.DISK_ONLY)
        self.df_flow_asin_last.show(10, truncate=False)
        print("7.获取asin的标题匹配度")
        sql = f"""
               select asin, contains_flag from dwd_title_matching_degree where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'
               """
        print("sql:" + sql)
        self.df_title_matching_degree = self.spark.sql(sqlQuery=sql)
        self.df_title_matching_degree = self.df_title_matching_degree.repartition(60).persist(StorageLevel.DISK_ONLY)
        self.df_title_matching_degree.show(10, truncate=False)

    # 处理asin基础属性信息(体积重量相关)及bsr销售额相关信息
    def handle_asin_basic_attribute(self):
        # 处理重量类型
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_weight_type", F.expr("""CASE 
                    WHEN asin_weight BETWEEN 0 AND 0.2 THEN 1
                    WHEN asin_weight BETWEEN 0.2 AND 0.4 THEN 2
                    WHEN asin_weight BETWEEN 0.4 AND 0.6 THEN 3
                    WHEN asin_weight BETWEEN 0.6 AND 1 THEN 4
                    WHEN asin_weight BETWEEN 1 AND 2 THEN 5
                    WHEN asin_weight >= 2 THEN 6
                    ELSE 0 END"""))
        # 处理体积重/毛重相关信息
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "asin_weight_ratio",
            F.when(
                F.col("asin_length").isNotNull() & (F.col("asin_width").isNotNull()) & (F.col("asin_height").isNotNull()) & (F.col("asin_weight") > 0),
                F.round(F.col("asin_length") * F.col("asin_width") * F.col("asin_height") * 3.2774128 / (F.col("asin_weight") * 453.59), 4)
            ).otherwise(F.lit(-1))
        )
        # 处理尺寸类型
        if self.site_name == 'us':
            expr_str = f"""
           CASE WHEN asin_weight > 0 AND asin_weight * 16 <= 16 AND asin_length > 0 AND asin_length <= 15 AND asin_width > 0 AND asin_width <= 12 AND asin_height > 0 AND asin_height <= 0.75 THEN 1 
           WHEN asin_weight > 0 AND asin_weight <= 20 AND asin_length > 0 AND asin_length <= 18 AND  asin_width > 0 AND asin_width <= 14 AND asin_height > 0 AND asin_height <= 8 THEN 2 
           WHEN asin_weight > 0 AND asin_weight <= 70 AND asin_length > 0 AND asin_length <= 60 AND asin_width > 0 AND asin_width <= 30 AND asin_length + asin_length + (asin_width + asin_height) * 2 <= 130  THEN 3 
           WHEN asin_weight > 0 AND asin_weight <= 150 AND asin_length > 0 AND asin_length <= 108 AND asin_length + asin_length + (asin_width + asin_height) * 2 <= 130  THEN 4 
           WHEN asin_weight > 0 AND asin_weight <= 150 AND asin_length > 0 AND asin_length <= 108 AND asin_length + asin_length + (asin_width + asin_height) * 2 <= 165  THEN 5 
           WHEN asin_weight > 150 AND asin_length > 108 AND asin_length + asin_length + (asin_width + asin_height) * 2 > 165  THEN 6 ELSE 0 END"""
        else:
            expr_str = f"""
           CASE
                WHEN asin_weight > 0 AND asin_weight <= 100 AND asin_length > 0 AND asin_length <= 20 AND asin_width > 0 AND asin_width <= 15 AND asin_height > 0 AND asin_height <= 1 THEN 1
                WHEN asin_weight > 0 AND asin_weight <= 500 AND asin_length > 0 AND asin_length <= 33 AND asin_width > 0 AND asin_width <= 23 AND asin_height > 0 AND asin_height <= 2.5 THEN 2
                WHEN asin_weight > 0 AND asin_weight <= 1000 AND asin_length > 0 AND asin_length <= 33 AND asin_width > 0 AND asin_width <= 23 AND asin_height > 0 AND asin_height <= 5 THEN 3
                WHEN asin_weight > 0 AND asin_weight <= 12000 AND asin_length > 0 AND asin_length <= 45 AND asin_width > 0 AND asin_width <= 34 AND asin_height > 0 AND asin_height <= 26 THEN 4
                WHEN asin_weight > 0 AND asin_weight <= 2000 AND asin_length > 0 AND asin_length <= 61 AND asin_width > 0 AND asin_width <= 46 AND asin_height > 0 AND asin_height <= 46 THEN 5
                WHEN asin_length > 0 AND asin_length <= 150 AND asin_length + asin_length + (asin_width + asin_height) <= 300 THEN 6
                WHEN asin_length > 150 AND asin_length + asin_length + (asin_width + asin_height) > 300 THEN 7
               ELSE 0 END"""
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_size_type", F.expr(expr_str)).drop("asin_length", "asin_width", "asin_height")

    # 通过ASIN页面信息处理(评分类型、上架时间类型、价格类型)
    def handle_asin_detail_all_type(self):
        # 评分类型
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "asin_rating_type", F.expr("""CASE WHEN asin_rating >= 4.5 THEN 1 WHEN asin_rating >= 4 AND asin_rating < 4.5 THEN 2 WHEN asin_rating >= 3.5 AND asin_rating < 4 THEN 3
                             WHEN asin_rating >= 3 AND asin_rating < 3.5 THEN 4 WHEN asin_rating < 3 AND asin_rating >= 0 THEN 5 ELSE 0 END"""))
        # 上架时间类型
        one_month = self.launch_time_interval_dict['one_month']
        three_month = self.launch_time_interval_dict['three_month']
        six_month = self.launch_time_interval_dict['six_month']
        twelve_month = self.launch_time_interval_dict['twelve_month']
        twenty_four_month = self.launch_time_interval_dict['twenty_four_month']
        thirty_six_month = self.launch_time_interval_dict['thirty_six_month']
        expr_str = f"""CASE WHEN asin_launch_time >= '{one_month}' THEN 1 
                            WHEN asin_launch_time >= '{three_month}' AND asin_launch_time < '{one_month}' THEN 2 
                            WHEN asin_launch_time >= '{six_month}' AND asin_launch_time < '{three_month}' THEN 3 
                            WHEN asin_launch_time >= '{twelve_month}' AND asin_launch_time < '{six_month}' THEN 4 
                            WHEN asin_launch_time >= '{twenty_four_month}' AND asin_launch_time < '{twelve_month}' THEN 5 
                            WHEN asin_launch_time >= '{thirty_six_month}' AND asin_launch_time < '{twenty_four_month}' THEN 6 
                            WHEN asin_launch_time < '{thirty_six_month}' THEN 7 ELSE 0 END"""
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_launch_time_type", F.expr(expr_str))
        # 价格类型
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "asin_price_type", F.expr("""
            CASE WHEN asin_price IS NOT NULL AND asin_price > 0 AND asin_price < 10 THEN 1 
            WHEN asin_price >= 10 AND asin_price < 15 THEN 2 
            WHEN asin_price >= 15 AND asin_price < 20 THEN 3 
            WHEN asin_price >= 20 AND asin_price < 30 THEN 4 
            WHEN asin_price >= 30 AND asin_price < 50 THEN 5 
            WHEN asin_price >= 50 THEN 6 ELSE 0 END"""))

    # 处理asin分类、排名、排名类型字段、是否有效排名信息
    def handle_asin_category_info(self):
        self.df_asin_detail = self.df_asin_detail.join(self.df_asin_bs_category, on=['asin'], how='left')
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "category_id", F.coalesce(F.col("category_id"), F.col("top_category_id"))
        ).withColumn(
            "category_first_id", F.coalesce(F.col("category_first_id"), F.col("top_category_first_id"))
        ).drop("top_category_id", "top_categoty_first_id")
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "asin_rank_type", F.expr("""
            CASE WHEN first_category_rank IS NOT NULL AND first_category_rank BETWEEN 0 AND 1000 THEN 1 
            WHEN first_category_rank BETWEEN 1000 AND 5000 THEN 2 
            WHEN first_category_rank BETWEEN 5000 AND 10000 THEN 3 
            WHEN first_category_rank BETWEEN 10000 AND 20000 THEN 4 
            WHEN first_category_rank BETWEEN 20000 AND 30000 THEN 5 
            WHEN first_category_rank BETWEEN 30000 AND 50000 THEN 6 
            WHEN first_category_rank BETWEEN 50000 AND 70000 THEN 7 
            WHEN first_category_rank >= 70000 THEN 8 ELSE 0 END"""))
        self.df_asin_detail = self.df_asin_detail.join(self.df_bsr_end, on=['category_first_id'], how='left')
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "bsr_type", F.expr("""CASE WHEN limit_rank is null and category_first_id <= 500000 THEN 1 WHEN limit_rank is not null and category_first_id <= limit_rank THEN 1 ELSE 0 END""")).drop("limit_rank")
        self.df_asin_bs_category.unpersist()

    # 处理asin的bsr销量、亚马逊月销信息、ao值、母体ao、自然流量占比、母体自然流量占比、ao值类型
    def handle_asin_measure(self):
        self.df_asin_detail = self.df_asin_detail.join(self.df_asin_measure, on=['asin'], how='left')
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "sales", F.round(F.col("bsr_orders") * F.col("asin_price"), 2)
        ).withColumn(
            "asin_bought_month", F.coalesce(F.col("asin_bought_month"), F.col("asin_amazon_orders"))
        ).withColumn("asin_ao_val_type", F.expr("""CASE WHEN asin_ao_val BETWEEN 0 AND 0.1 THEN 1 
        WHEN asin_ao_val BETWEEN 0.1 AND 0.2 THEN 2 WHEN asin_ao_val BETWEEN 0.2 AND 0.4 THEN 3 
        WHEN asin_ao_val BETWEEN 0.4 AND 0.8 THEN 4 WHEN asin_ao_val BETWEEN 0.8 AND 1.2 THEN 5 
        WHEN asin_ao_val BETWEEN 1.2 AND 2 THEN 6 WHEN asin_ao_val >= 2 THEN 7 ELSE 0 END""")).drop("asin_amazon_orders")
        self.df_asin_measure.unpersist()

    # 处理配送方式、卖家所在地以及卖家所在地类型
    def handle_seller_country(self):
        if (self.date_type in ['month', 'month_week'] and self.date_info >= '2024-05') or (self.date_type == '4_week' and self.date_info >= '2024-21'):
            self.df_asin_detail = self.df_asin_detail.join(self.df_fd_asin_info, on=['account_id'], how='left')
        else:
            self.df_asin_detail = self.df_asin_detail.drop("account_id", "account_name")
            self.df_asin_detail = self.df_asin_detail.join(self.df_fd_asin_info, on=['asin'], how='left')
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_site_name_type", F.expr("""
        CASE WHEN asin_buy_box_seller_type = 1  THEN 4 
        WHEN asin_buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%US%' THEN 1 
        WHEN asin_buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%CN%' THEN 2 ELSE 3 END"""))
        self.df_fd_asin_info.unpersist()

    # 处理asin的lqs评分
    def handle_asin_lqs_rating(self):
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "category_node_rating", F.when(F.col("category_id").isNotNull(), F.lit(1)).otherwise(F.lit(0))
        ).withColumn(
            "zr_rating", F.when(F.col("asin_zr_counts") > 0, F.lit(0.5)).otherwise(F.lit(0))
        ).withColumn(
            "sp_rating", F.when(F.col("asin_sp_counts") > 0, F.lit(1)).otherwise(F.lit(0))
        ).withColumn(
            "a_add_rating", F.when(F.col("asin_img_type").contains("3"), F.lit(1)).otherwise(F.lit(0))
        ).withColumn(
            "video_rating", F.when(F.col("asin_img_type").contains("2"), F.lit(0.5)).otherwise(F.lit(0))
        ).withColumn(
            "brand_rating", F.when(F.col("is_brand_label") == 1, F.lit(0.2)).otherwise(F.lit(0))
        ).withColumn(
            "product_describe_rating", F.when(F.col("is_with_product_description") == 1, F.lit(0.2)).otherwise(F.lit(0))
        ).withColumn(
            "highlight_rating",
            F.when((F.col("asin_describe").isNotNull()) & (F.size(F.split(F.col("asin_describe"), '\\|-\\|')) <= 4),
                   F.size(F.split(F.col("asin_describe"), '\\|-\\|')) * 0.4)
            .when((F.col("asin_describe").isNotNull()) & (F.size(F.split(F.col("asin_describe"), '\\|-\\|')) > 4),
                  F.lit(1.6))
            .otherwise(F.lit(0))
        ).withColumn(
            "title_len_rating",
            F.when((F.col("asin_title_len") >= 50) & (F.col("asin_title_len") <= 200), F.lit(0.5)).otherwise(F.lit(0))
        ).withColumn(
            "title_brand_rating",
            F.expr(f"""CASE WHEN asin_brand_name is not null AND lower(regexp_replace(asin_title, '[^a-zA-Z0-9\\s]', '')) LIKE lower(regexp_replace(asin_brand_name, '[^a-zA-Z0-9\\s]', '')) || '%' THEN 0.5 ELSE 0 END""")
        ).withColumn(
            "img_num_rating", F.when(F.col("asin_img_num") <= 4, F.col("asin_img_num") * 0.5).when(F.col("asin_img_num") > 4, F.lit(2)).otherwise(F.lit(0))
        ).withColumn(
            "img_enlarge_rating", F.when(F.col("asin_image_view") == 1, F.lit(0.5)).otherwise(F.lit(0))
        )
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "asin_lqs_rating_detail",
            F.to_json(F.struct(F.col("category_node_rating"), F.col("zr_rating"), F.col("sp_rating"),
                               F.col("a_add_rating"), F.col("video_rating"), F.col("brand_rating"),
                               F.col("product_describe_rating"), F.col("highlight_rating"), F.col("title_len_rating"),
                               F.col("title_brand_rating"), F.col("img_num_rating"), F.col("img_enlarge_rating"))
            )
        )
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "asin_lqs_rating",
            F.col("category_node_rating") + F.col("zr_rating") + F.col("sp_rating") + F.col("a_add_rating") + F.col(
                "video_rating") + F.col("brand_rating") + F.col("product_describe_rating") + F.col(
                "highlight_rating") + F.col("title_len_rating") + F.col("title_brand_rating") + F.col(
                "img_num_rating") + F.col("img_enlarge_rating")
        )
        self.df_asin_detail = self.df_asin_detail.\
            drop("is_with_product_description", "asin_describe", "asin_image_view", "category_node_rating", "zr_rating",
                 "sp_rating", "a_add_rating", "video_rating", "brand_rating", "product_describe_rating",
                 "highlight_rating", "title_len_rating", "title_brand_rating", "img_num_rating", "img_enlarge_rating")

    # 处理asin是否隐藏分类信息(US/UK/DE站点通用)以及asin_type信息
    def handle_asin_is_hide(self):
        mysql_con = DBUtil.get_connection_info("mysql", "us")
        sql = f"""
            select category_id_base as category_id, 1 as hide_flag from us_bs_category_hide group by category_id_base
        """
        df_hide_category = SparkUtil.read_jdbc_query(session=self.spark, url=mysql_con['url'], pwd=mysql_con['pwd'],
                                                     username=mysql_con['username'], query=sql)
        self.df_asin_detail = self.df_asin_detail.join(df_hide_category, on=['category_id'], how='left')
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_is_hide", F.expr("""
      CASE WHEN hide_flag = 1 THEN 1 WHEN category_first_id = 'grocery' and category_id != '6492272011' THEN 1 
      WHEN category_id in ('21393128011', '21377129011', '21377127011', '21377130011', '21388218011', '21377132011') THEN 1
      ELSE 0 END""")).drop("hide_flag")
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_is_need", F.expr("""
        CASE WHEN category_first_id in ('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed') THEN 1 
        WHEN asin NOT LIKE 'B0%' THEN 1 
        ELSE 0 END"""))
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_type", F.expr("""
            CASE WHEN asin_is_self=1 THEN 1 WHEN asin_is_need=1 THEN 2 WHEN asin_is_hide=1 THEN 3 ELSE 0 END"""
            )).drop("asin_is_self", "asin_is_need", "asin_is_hide")

    # 处理匹配度
    def handle_title_matching_degree(self):
        window = Window.partitionBy("asin")
        self.df_title_matching_degree = self.df_title_matching_degree.withColumn(
            "asin_count", F.count("asin").over(window))
        # 统计每个asin中,flag为1的数量
        self.df_title_matching_degree = self.df_title_matching_degree.withColumn(
            "contains_count", F.sum("contains_flag").over(window))
        self.df_title_matching_degree = self.df_title_matching_degree.withColumn(
            "title_matching_degree", F.round(F.col("contains_count") / F.col("asin_count"), 4))
        self.df_title_matching_degree = self.df_title_matching_degree.drop("asin_count", "contains_count", "contains_flag")
        self.df_title_matching_degree = self.df_title_matching_degree.drop_duplicates(['asin'])
        self.df_asin_detail = self.df_asin_detail.join(self.df_title_matching_degree, on=['asin'], how='left')
        self.df_title_matching_degree.unpersist()

    # 处理变体ASIN属性(asin维度)的变化率相关信息
    def handle_asin_attribute_change(self):
        self.df_asin_detail = self.df_asin_detail.join(self.df_flow_asin_last, on=['asin'], how='left')
        columns_to_change = [
            ("first_category_rank", "previous_first_category_rank", "asin_rank"),
            ("bsr_orders", "previous_bsr_orders", "asin_bsr_orders"),
            ("asin_rating", "previous_asin_rating", "asin_rating"),
            ("asin_total_comments", "previous_asin_total_comments", "asin_comments"),
            ("variation_num", "previous_variation_num", "asin_variation"),
            ("asin_ao_val", "previous_asin_ao_val", "asin_ao"),
            ("asin_price", "previous_asin_price", "asin_price"),
            ("sales", "pervious_sales", "asin_sales")
        ]
        for current_col, previous_col, suffix in columns_to_change:
            rise_col, change_col = self.calculate_change(current_col, previous_col)
            if suffix == 'asin_ao':
                self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_rise", F.round(rise_col, 3))
            elif suffix in ['asin_price', 'sales']:
                self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_rise", F.round(rise_col, 2))
            elif suffix == 'asin_rating':
                self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_rise", F.round(rise_col, 1))
            else:
                self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_rise", rise_col.cast(IntegerType()))
            self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_change", F.round(change_col, 4))
            self.df_asin_detail.drop(previous_col)
        self.df_flow_asin_last.unpersist()

    # 字段标准化
    def handle_column(self):
        self.df_save = self.df_asin_detail.\
            select("asin", "asin_ao_val", "asin_zr_counts", "asin_sp_counts", "asin_sb_counts", "asin_vi_counts",
                   "asin_bs_counts", "asin_ac_counts", "asin_tr_counts", "asin_er_counts", "bsr_orders",
                   F.lit(None).alias("orders"), "sales", F.lit(None).alias("cate_current_id"),
                   F.lit(None).alias("cate_1_id"), "asin_img_url", "asin_title", "asin_title_len", "asin_price",
                   "asin_rating", "asin_total_comments", "asin_buy_box_seller_type", "asin_page_inventory",
                   "asin_category_desc", "asin_volume", "asin_weight", "asin_color", "asin_size", "asin_style",
                   "asin_is_sale", F.lit(None).alias("asin_rank"), "asin_launch_time", "asin_is_new", "asin_img_num",
                   "asin_img_type", "asin_material", "asin_brand_name", "asin_activity_type", "act_one_two_val",
                   "act_three_four_val", "act_five_six_val", "act_eight_val", F.lit(None).alias("qa_num"), "one_star",
                   "two_star", "three_star", "four_star", "five_star", "low_star", "together_asin", "ac_name",
                   "variation_num", "account_name", "account_id", "seller_country_name", "bsr_type",
                   F.lit(-1).alias("bsr_best_orders_type"), F.lit(None).alias("zr_best_orders_type"), "parent_asin", "asin_rank_rise",
                   "asin_rank_change", "asin_ao_rise", "asin_ao_change", "asin_price_rise", "asin_price_change",
                   F.lit(None).alias("asin_orders_rise"), F.lit(None).alias("asin_orders_change"), "asin_rating_rise",
                   "asin_rating_change", "asin_comments_rise", "asin_comments_change", "asin_bsr_orders_rise",
                   "asin_bsr_orders_change", "asin_sales_rise", "asin_sales_change", "asin_variation_rise",
                   "asin_variation_change", "asin_size_type", "asin_rating_type", "asin_site_name_type",
                   "asin_weight_type", "asin_launch_time_type", "asin_ao_val_type", "asin_rank_type", "asin_price_type",
                   F.lit(None).alias("created_time"), F.lit(None).alias("updated_time"), "asin_lob_info",
                   "customer_reviews_json", "img_info", "is_contains_lob_info", "is_package_quantity_abnormal",
                   "asin_st_counts", "asin_quantity_variation_type", "package_quantity",
                   F.lit(None).alias("sp_type1"), F.lit(None).alias("sp_type2"), F.lit(None).alias("sp_type3"),
                   "is_movie_label", "is_brand_label", "is_alarm_brand", "asin_type", F.lit(None).alias("asin_cost_fee"),
                   F.lit(None).alias("asin_refund_fee"), F.lit(None).alias("asin_adv_fee"),
                   F.lit(None).alias("asin_commission_fee"), F.lit(None).alias("asin_fba_fee"),
                   F.lit(None).alias("asin_freight_air_fee"), F.lit(None).alias("asin_freight_ocean_fee"),
                   F.lit(None).alias("asin_operate_fee"), F.lit(None).alias("asin_air_freight_gross_margin"),
                   F.lit(None).alias("asin_ocean_freight_gross_margin"), "asin_crawl_date",
                   F.lit(None).alias("asin_package_quantity"), F.lit(None).alias("asin_pattern_name"),
                   "category_first_id", "category_id", "first_category_rank", "current_category_rank",
                   "asin_weight_ratio", "asin_bought_month", F.lit(None).alias("buy_data_bought_week"),
                   F.lit(None).alias("buy_data_viewed_month"), F.lit(None).alias("buy_data_viewed_week"),
                   F.lit(None).alias("theme_en"), F.lit(None).alias("theme_label_en"), "asin_lqs_rating",
                   "asin_lqs_rating_detail", "title_matching_degree", "zr_flow_proportion", "matrix_flow_proportion",
                   "matrix_ao_val", "follow_sellers_count", "seller_json",
                   F.lit(self.site_name).alias("site_name"), F.lit(self.date_type).alias("date_type"),
                   F.lit(self.date_info).alias("date_info"))
        self.df_save = self.df_save.na.fill(
            {"asin_zr_counts": 0, "asin_sp_counts": 0, "asin_sb_counts": 0, "asin_vi_counts": 0, "asin_bs_counts": 0,
             "asin_ac_counts": 0, "asin_tr_counts": 0, "asin_er_counts": 0, "asin_title_len": 0,
             "asin_total_comments": 0, "variation_num": 0, "asin_img_num": 0, "act_one_two_val": 0.0,
             "act_three_four_val": 0.0, "act_five_six_val": 0.0, "act_eight_val": 0.0, "one_star": 0,
             "two_star": 0, "three_star": 0, "four_star": 0, "five_star": 0, "low_star": 0, "asin_size_type": 0,
             "asin_rating_type": 0, "asin_site_name_type": 0, "asin_weight_type": 0, "asin_launch_time_type": 0,
             "asin_ao_val_type": 0, "asin_rank_type": 0, "asin_price_type": 0, "asin_quantity_variation_type": 0,
             "package_quantity": 1, "is_movie_label": 0, "is_brand_label": 0, "is_alarm_brand": 0,
             "title_matching_degree": 0.0, "asin_lqs_rating": 0.0, "follow_sellers_count": -1})
        self.df_save = self.df_save.repartition(60).persist(StorageLevel.DISK_ONLY)
        self.df_save = self.df_save.drop_duplicates(['asin']).filter(F.length(F.col("asin"))<=10)
        print("数据量为:", self.df_save.count())
        self.df_save.show(10, truncate=False)

    # 保存数据
    def save_data(self):
        print(f"清除hdfs目录中:{self.hdfs_path}")
        HdfsUtils.delete_file_in_folder(self.hdfs_path)
        partition_by = ["site_name", "date_type", "date_info"]
        print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
        df_save = self.df_save
        df_save = df_save.drop("seller_json")
        df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=self.partitions_by)
        print("save asin_detail success")
        if self.date_type in ['month', 'month_week'] and self.date_info >= '2024-06':
            max_report_sql = f"""
               select max(report_date) as completed_date_info  from workflow_everyday where site_name='us' and date_type='month' and page='流量选品' and status_val=14
            """
            mysql_con = DBUtil.get_connection_info("mysql", "us")
            df_date_info = SparkUtil.read_jdbc_query(session=self.spark, url=mysql_con['url'], pwd=mysql_con['pwd'],
                                                     username=mysql_con['username'], query=max_report_sql)
            completed_date_info = df_date_info.take(1)[0]['completed_date_info']
            if self.date_info > completed_date_info:
                print("往doris存储最新asin详情信息:")
                df_doris = self.df_save.\
                    select("asin", "asin_ao_val", "asin_title", "asin_title_len", "asin_category_desc", "asin_volume",
                           "asin_weight", "asin_launch_time", "asin_brand_name", "one_star", "two_star", "three_star",
                           "four_star", "five_star", "low_star", "account_name", "account_id", "seller_country_name",
                           "category_first_id", "parent_asin", "variation_num", "img_info", "asin_crawl_date", "asin_price",
                           "asin_rating", "asin_total_comments", "matrix_ao_val", "zr_flow_proportion",
                           "matrix_flow_proportion", "date_info", F.col("asin_img_url").alias("img_url"),
                           F.col("category_id").alias("category_current_id"),
                           F.col("first_category_rank").alias("category_first_rank"),
                           F.col("current_category_rank").alias("category_current_rank"), "asin_type",
                           "bsr_orders", F.col("sales").alias("bsr_orders_sale"),
                           F.col("asin_page_inventory").alias("page_inventory"), "asin_bought_month", "seller_json",
                           F.col("asin_buy_box_seller_type").alias("buy_box_seller_type")
                           )
                table_columns = """asin, asin_ao_val, asin_title, asin_title_len, asin_category_desc, asin_volume, 
                          asin_weight, asin_launch_time, asin_brand_name, one_star, two_star, three_star, four_star, five_star, low_star, 
                          account_name, account_id, seller_country_name, category_first_id, parent_asin, variation_num, img_info, 
                          asin_crawl_date, asin_price, asin_rating, asin_total_comments, matrix_ao_val, zr_flow_proportion, matrix_flow_proportion, 
                          date_info, img_url, category_current_id, category_first_rank, category_current_rank, asin_type, bsr_orders, bsr_orders_sale, 
                          page_inventory, asin_bought_month, seller_json, buy_box_seller_type"""
                DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.asin_latest_detail_table, table_columns=table_columns)
                print("save asin_latest_detail success")
            else:
                print("不用导出旧数据到doris中")
                pass

    def handle_data(self):
        self.handle_asin_basic_attribute()
        self.handle_asin_detail_all_type()
        self.handle_asin_category_info()
        self.handle_asin_measure()
        self.handle_seller_country()
        self.handle_asin_lqs_rating()
        self.handle_asin_is_hide()
        self.handle_title_matching_degree()
        self.handle_asin_attribute_change()
        self.handle_column()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwtFlowAsin(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()