"""
   @Author      : HuangJian
   @Description : 关键词与Asin详情维表
   @SourceTable :
                  ①ods_asin_keep_date
                  ②ods_asin_variat
                  ③ods_asin_detail
                  ④dwd_bs_category_asin
   @SinkTable   : dim_asin_detail
   @CreateTime  : 2022/11/14 15:56
   @UpdateTime  : 2022/11/14 15:56
"""

import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录


from pyspark.sql.window import Window
from pyspark.sql import functions as F
from utils.spark_util import SparkUtil
from pyspark.sql.types import *
from yswg_utils.common_udf import udf_handle_string_null_value as myUDF
from yswg_utils.common_df import get_node_first_id_df, get_first_id_from_category_desc_df
from utils.common_util import CommonUtil, DateTypes
from yswg_utils.common_udf import udf_new_asin_flag
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil
from datetime import datetime
from pyspark.storagelevel import StorageLevel
from utils.DorisHelper import DorisHelper
from yswg_utils.common_udf import udf_get_package_quantity_with_flag as udf_get_package_quantity, udf_parse_seller_json, udf_parse_amazon_orders


class DimAsinDetail(object):

    def __init__(self, site_name, date_type, date_info):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.hive_tb = f'dim_asin_detail'
        self.partition_dict = {
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
        # 落表路径校验
        self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)
        # 创建spark_session对象相关
        app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.complete_date_info_tuple = self.get_complete_week_tuple()
        self.date_sql = self.date_sql_padding()
        self.launch_time_upper_limit = self.get_effective_launch_time()
        self.launch_time_lower_limit = "1995-01-01"
        self.cal_date = CommonUtil.get_calDay_by_dateInfo(self.spark, self.date_type, self.date_info)
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.partition_num = CommonUtil.reset_partitions(self.site_name, partitions_num=80)
        # Doris相关参数
        self.doris_db = "selection"
        self.parent_asin_latest_detail_table = f"{self.site_name}_parent_asin_latest_detail"
        # 自定义df相关
        self.df_asin_keep_date = self.spark.sql(f"select 1+1;")
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_asin_label = self.spark.sql("select 1+1;")
        self.df_asin_weight_and_volume = self.spark.sql("select 1+1;")
        self.df_asin_new_cate = self.spark.sql("select 1+1;")
        self.df_user_package_num = self.spark.sql(f"select 1+1;")
        self.df_self_asin = self.spark.sql(f"select 1+1;")
        self.df_asin_category = self.spark.sql(f"select 1+1;")
        self.df_asin_variat = self.spark.sql(f"select 1+1;")
        # 调用公用udf函数
        self.udf_new_asin_flag = F.udf(udf_new_asin_flag, IntegerType())
        self.handle_string_num_value = F.udf(myUDF, StringType())
        seller_schema = StructType([
            StructField("buy_box_seller_type", IntegerType(), True),
            StructField("account_name", StringType(), True),
            StructField("account_id", StringType(), True)
        ])
        self.u_parse_seller_info = self.spark.udf.register('u_parse_seller_info', udf_parse_seller_json, seller_schema)
        package_schema = StructType([
            StructField("parse_package_quantity", IntegerType(), True),
            StructField("is_package_quantity_abnormal", IntegerType(), True),
        ])
        self.u_judge_package_quantity = self.spark.udf.register('u_judge_package_quantity', udf_get_package_quantity, package_schema)
        self.u_parse_amazon_orders = F.udf(udf_parse_amazon_orders, IntegerType())

    def get_complete_week_tuple(self):
        complete_date_info_tuple = None
        df_date = self.spark.sql(f"select * from dim_date_20_to_30 ;")
        df = df_date.toPandas()
        if self.date_type == 'week':
            complete_date_info_tuple = f"('{self.date_info}')"
        elif self.date_type == '4_week':
            print(self.date_info)
            df_loc = df.loc[(df.year_week == f"{self.date_info}") & (df.week_day == 1)]
            cur_id = list(df_loc.id)[0]
            df_loc = df.loc[df.id == int(cur_id)]
            week1 = list(df_loc.year_week)[0]
            df_loc = df.loc[df.id == int(cur_id) - 7]
            week2 = list(df_loc.year_week)[0]
            df_loc = df.loc[df.id == int(cur_id) - 14]
            week3 = list(df_loc.year_week)[0]
            df_loc = df.loc[df.id == int(cur_id) - 21]
            week4 = list(df_loc.year_week)[0]
            complete_date_info_tuple = (week1, week2, week3, week4)
        elif self.date_type in ['month', 'month_week']:
            df_loc = df.loc[(df.year_month == f"{self.date_info}") & (df.week_day == 1)]
            complete_date_info_tuple = tuple(df_loc.year_week)
        print("self.complete_date_info_tuple:", complete_date_info_tuple)
        return complete_date_info_tuple

    def date_sql_padding(self):
        if 'us' == self.site_name:
            if self.date_type == DateTypes.month_week.name:
                date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'"
            elif self.date_type == DateTypes.month.name and self.date_info >= '2023-10':
                date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'"
            else:
                date_sql = f"and date_type='week' and date_info in {self.complete_date_info_tuple}"
        elif self.site_name in ['uk', 'de']:
            if self.date_type in [DateTypes.month.name, DateTypes.month_week.name] and self.date_info >= '2024-05':
                date_sql = f" and date_type='{self.date_type}' and date_info='{self.date_info}'"
            else:
                date_sql = f" and date_type='week' and date_info in {self.complete_date_info_tuple}"
        print(date_sql)
        return date_sql

    def get_effective_launch_time(self):
        year_upper_limit = datetime.now().year + 1
        launch_time_upper_limit = str(year_upper_limit) + "-01-01"
        return launch_time_upper_limit

    # 读取数据
    def read_data(self):
        # 获取dim层的dim_asin_launchtime_info
        print("1. 获取dim层的dim_asin_launchtime_info填充的上架日期")
        sql = f"""select asin, asin_launch_time as populated_asin_launch_time from dim_asin_launchtime_info where site_name='{self.site_name}'  and asin_launch_time is not null"""
        print(sql)
        self.df_asin_keep_date = self.spark.sql(sqlQuery=sql)
        self.df_asin_keep_date = self.df_asin_keep_date.repartition(100).persist(StorageLevel.DISK_ONLY)
        self.df_asin_keep_date.show(10, truncate=False)
        print("2. 获取ods_asin_detail")
        sql = f"""
            select asin, img_url as asin_img_url, lower(title) as asin_title, title_len as asin_title_len, 
            price as asin_price, rating as asin_rating, total_comments as asin_total_comments, 
            page_inventory as asin_page_inventory, category as asin_category_desc, 
            launch_time as crawl_asin_launch_time, img_num as asin_img_num, img_type as asin_img_type, 
            category_state as asin_category_state, material as asin_material, lower(brand) as asin_brand_name, 
            activity_type as asin_activity_type, one_two_val as act_one_two_val, three_four_val as act_three_four_val, 
            five_six_val as act_five_six_val, eight_val as act_eight_val, one_star, two_star, three_star, four_star, 
            five_star, low_star, together_asin, ac_name, node_id, data_type as asin_data_type, variat_list, 
            `describe` as asin_describe, follow_sellers as asin_follow_sellers, product_description, 
            image_view as asin_image_view, spider_int as asin_spider_num, buy_sales, lob_asin_json as asin_lob_info, 
            REGEXP_REPLACE(seller_json, chr(10), '') as seller_json, buy_box_seller_type as asin_buy_box_seller_type, customer_reviews_json, parent_asin, img_list, 
            created_at as created_time, updated_at as updated_time, updated_at as dt, variat_num as variation_num 
            from ods_asin_detail where site_name='{self.site_name}' {self.date_sql}"""
        print(sql)
        self.df_asin_detail = self.spark.sql(sqlQuery=sql)
        self.df_asin_detail = self.df_asin_detail.repartition(100).persist(StorageLevel.DISK_ONLY)
        self.df_asin_detail.show(10, truncate=False)
        print("3. 读取asin体积重量相关信息")
        sql = f"""
                SELECT asin, asin_volume, asin_volume_type, asin_length_sorted as asin_length, 
                asin_width_sorted as asin_width, asin_height_sorted as asin_height, 
                asin_weight, asin_weight_str, asin_weight_type FROM dim_asin_stable_info WHERE site_name='{self.site_name}'"""
        print(sql)
        self.df_asin_weight_and_volume = self.spark.sql(sqlQuery=sql)
        self.df_asin_weight_and_volume = self.df_asin_weight_and_volume.repartition(100).persist(StorageLevel.DISK_ONLY)
        self.df_asin_weight_and_volume.show(10, truncate=False)
        print("4. 获取用户修改打包数量信息")
        pg_con_info = DBUtil.get_connection_info("postgresql", "us")
        sql = f"""
               WITH ranked_edit_logs AS (SELECT edit_key_id, lower(val_related_info) AS val_related_info, val_after, 
               ROW_NUMBER() OVER (PARTITION BY edit_key_id ORDER BY create_time DESC) AS rn FROM sys_edit_log 
               WHERE module = '流量选品' AND filed = 'package_quantity' and site_name='{self.site_name}')
               SELECT edit_key_id AS asin, val_related_info AS asin_title, cast(val_after as int) AS user_package_num, 
               0 AS user_is_package_quantity_abnormal FROM ranked_edit_logs WHERE rn = 1"""
        print(sql)
        if pg_con_info is not None:
            df_user_package_num = SparkUtil.read_jdbc_query(session=self.spark, url=pg_con_info['url'], pwd=pg_con_info['pwd'], username=pg_con_info['username'], query=sql)
            self.df_user_package_num = F.broadcast(df_user_package_num)
            self.df_user_package_num.show(10, truncate=False)
        print("5.读取ods_self_asin,获得公司内部asin信息")
        sql = f"""select asin, 1 as asin_is_self from ods_self_asin where site_name='{self.site_name}' group by asin"""
        print("sql:" + sql)
        df_self_asin = self.spark.sql(sqlQuery=sql)
        self.df_self_asin = F.broadcast(df_self_asin)
        self.df_self_asin.show(10, truncate=False)
        print("6. node_id对应的头部分类信息")
        self.df_asin_new_cate = get_node_first_id_df(self.site_name, self.spark)
        self.df_asin_new_cate = self.df_asin_new_cate.filter('node_id is not null').persist(StorageLevel.DISK_ONLY)
        self.df_asin_new_cate.show(10, truncate=False)
        print("7. 获取asin的标签信息")
        sql = f"""
            select asin, asin_label_list, asin_label_type as asin_is_movie from dim_asin_label 
            where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
        """
        print("sql:" + sql)
        self.df_asin_label = self.spark.sql(sqlQuery=sql)
        self.df_asin_label = self.df_asin_label.repartition(100).persist(StorageLevel.DISK_ONLY)
        self.df_asin_label.show(10, truncate=False)
        print("8. 获取分类id和分类名称的对应关系")
        self.df_asin_category = get_first_id_from_category_desc_df(self.site_name, self.spark)
        self.df_asin_category = self.df_asin_category.withColumn(
            "category_first_name", F.lower("category_first_name")
        ).repartition(100).persist(StorageLevel.DISK_ONLY)
        self.df_asin_category.show(10, truncate=False)
        if self.date_type in ['month', 'month_week'] and self.date_info < '2024-06':
            sql = f"""
            SELECT asin, parent_asin, color as asin_color, `size` as asin_size, style as asin_style, 
            CASE WHEN state = 1 THEN 1 WHEN state = 2 THEN 0 ElSE NULL END as asin_is_sale, updated_time 
            FROM dim_asin_variation_info where site_name='{self.site_name}' and created_date >='2024-05-21'
            """
            print("sql:" + sql)
            self.df_asin_variat = self.spark.sql(sqlQuery=sql)
            self.df_asin_variat = self.df_asin_variat.repartition(100).persist(StorageLevel.DISK_ONLY)
            self.df_asin_variat.show(10, truncate=False)
        else:
            pass

    # asin详情数据去重
    def handle_df_duplicated(self):
        # 12月数据标记多个data_type用于区分改asin由什么场景抓取
        df_asin_datatype_handle = self.df_asin_detail.select("asin", "asin_data_type")
        df_asin_datatype_handle_agg = df_asin_datatype_handle.groupby(["asin"]).agg(
            F.concat_ws(",", F.collect_set("asin_data_type")).alias("asin_data_type"))
        # asin窗口内排序,按照dt降序
        window = Window.partitionBy(['asin']).orderBy(self.df_asin_detail.dt.desc_nulls_last())
        self.df_asin_detail = self.df_asin_detail.withColumn("dt_rank", F.row_number().over(window=window))
        # 取按asin分组的组内第一条,就是去重后的最新asin_detail
        self.df_asin_detail = self.df_asin_detail.filter("dt_rank=1").drop("dt", "dt_rank", "asin_data_type")
        self.df_asin_detail = self.df_asin_detail.repartition(100)
        df_asin_datatype_handle_agg = df_asin_datatype_handle_agg.repartition(100)
        self.df_asin_detail = self.df_asin_detail.join(df_asin_datatype_handle_agg, on=['asin'], how='left').persist(StorageLevel.MEMORY_ONLY)
        print("asin详情去重后的数量为: ", self.df_asin_detail.count())
        if self.date_type == 'month' and self.date_info == '2024-03':
            sql = f"""
                select asin from dwd_asin_measure where site_name='{self.site_name}' and date_type='{self.date_type}'
                and date_info='{self.date_info}' group by asin"""
            df_effective_asin = self.spark.sql(sql)
            self.df_asin_detail = self.df_asin_detail.join(df_effective_asin, on=['asin'], how='inner')

    # 处理asin的亚马逊月销信息
    def handle_asin_bought_month(self):
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_bought_month", self.u_parse_amazon_orders(self.df_asin_detail.buy_sales))

    # 处理asin的lob_info信息
    def handle_asin_lob_info(self):
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "is_contains_lob_info", F.when(F.col("asin_lob_info").isNotNull(), F.lit(1)).otherwise(F.lit(0)))
        df_asin_detail_parsed = self.df_asin_detail.withColumn(
            "parse_asin_lob", F.when(F.col("is_contains_lob_info") == 1, F.from_json("asin_lob_info", "array<struct<lob_asin:string>>")))
        df_asin_detail_result = df_asin_detail_parsed.withColumn("asin_lob_info", F.expr("transform(parse_asin_lob, x -> x.lob_asin)"))
        self.df_asin_detail = df_asin_detail_result.withColumn(
            "asin_lob_info", F.regexp_replace(F.concat_ws(",", "asin_lob_info"), "[{}]", "")).drop("parse_asin_lob")

    # 处理asin的变体信息
    def handle_asin_variation_attribute(self):
        if self.date_type in ['month', 'month_week'] and self.date_info >= '2024-06':
            print("执行新版的变体信息整合")
            variat_schema = ArrayType(ArrayType(StringType()))
            self.df_asin_detail = self.df_asin_detail.withColumn("variat_list_change", F.from_json(F.col("variat_list"), variat_schema))
            df_asin_with_variation = self.df_asin_detail.filter(F.size("variat_list_change") > 0).\
                select("asin", F.explode("variat_list_change").alias("variant_attribute")).\
                select("asin", F.col("variant_attribute")[0].alias("son_asin"),
                       F.col("variant_attribute")[1].alias("asin_color"),
                       F.col("variant_attribute")[3].alias("asin_size"),
                       F.col("variant_attribute")[4].alias("asin_is_sale"),
                       F.col("variant_attribute")[5].alias("asin_style"))
            df_asin_with_variation = df_asin_with_variation.filter(F.col("asin") == F.col("son_asin")).drop("son_asin", "variant_attribute")
            df_asin_with_variation = df_asin_with_variation.withColumn(
                "asin_is_sale",
                F.when(F.col("asin_is_sale")==1, F.lit(1)).when(F.col("asin_is_sale")==2, F.lit(0)).otherwise(F.lit(None))
            )
            self.df_asin_detail = self.df_asin_detail.join(
                df_asin_with_variation, on=['asin'], how='left'
            )
        elif self.date_type in ['month', 'month_week'] and self.date_info < '2024-06':
            print("执行历史数据的变体信息整合")
            window = Window.partitionBy(self.df_asin_variat.asin).orderBy(
                self.df_asin_variat.updated_time.desc_nulls_last())
            self.df_asin_variat = self.df_asin_variat.withColumn("t_rank", F.row_number().over(window=window))
            self.df_asin_variat = self.df_asin_variat.filter("t_rank = 1").drop("updated_time", "t_rank")
            self.df_asin_variat = self.df_asin_variat.repartition(100)
            self.df_asin_detail = self.df_asin_detail.drop("variation_num", "parent_asin")
            df_asin_variat = self.df_asin_variat.filter("parent_asin is not null").select("asin", "parent_asin")
            df_parent_asin_agg = df_asin_variat.groupby(['parent_asin']).agg(F.count("asin").alias("variation_num"))
            df_parent_asin_agg = df_parent_asin_agg.repartition(100)
            self.df_asin_variat = self.df_asin_variat.join(df_parent_asin_agg, on=['parent_asin'], how='left')
            self.df_asin_detail = self.df_asin_detail.join(self.df_asin_variat, on=['asin'], how='left').cache()
            self.df_asin_detail.show(10, truncate=False)
            self.df_asin_variat.unpersist()
        else:
            pass

    # 处理asin的配送方式信息
    def handle_asin_buy_box_seller_type(self):
        if (self.date_type in ['month', 'month_week'] and self.date_info >= '2024-05') \
                or (self.date_type == '4_week' and self.date_info >= '2024-21'):
            self.df_asin_detail = self.df_asin_detail.withColumn(
                'seller_json',
                F.when(
                    F.length(F.trim(F.regexp_replace('seller_json', chr(10), ''))) == 0, F.lit('')
                       ).otherwise(F.regexp_replace('seller_json', chr(10), '')))
            self.df_asin_detail = self.df_asin_detail.drop("asin_buy_box_seller_type")
            self.df_asin_detail = self.df_asin_detail.withColumn("seller_json_parsed", self.u_parse_seller_info(self.df_asin_detail.seller_json))
            self.df_asin_detail = self.df_asin_detail.withColumn(
                "asin_buy_box_seller_type", self.df_asin_detail.seller_json_parsed.buy_box_seller_type).withColumn(
                "account_name", self.df_asin_detail.seller_json_parsed.account_name).withColumn(
                "account_id", self.df_asin_detail.seller_json_parsed.account_id).drop("seller_json_parsed")
        else:
            self.df_asin_detail = self.df_asin_detail.withColumn("account_id", F.lit(None)).\
                withColumn("seller_json", F.lit(None)).withColumn("account_name", F.lit(None))

    # 处理asin体积重量信息
    def handle_asin_basic_attribute(self):
        self.df_asin_detail = self.df_asin_detail.repartition(100)
        df_weight = self.df_asin_weight_and_volume.filter("asin_weight is not null").select("asin", "asin_weight", "asin_weight_str", "asin_weight_type")
        self.df_asin_detail = self.df_asin_detail.join(df_weight, on=['asin'], how='left')
        if self.site_name == 'us':
            df_volume = self.df_asin_weight_and_volume.filter("asin_volume_type is not null and asin_volume_type not in ('cm', 'none')").select("asin", "asin_volume", "asin_length", "asin_width", "asin_height")
        else:
            df_volume = self.df_asin_weight_and_volume.filter("asin_volume_type is not null and asin_volume_type not in ('inches', 'none')").select("asin", "asin_volume", "asin_length", "asin_width", "asin_height")
        self.df_asin_detail = self.df_asin_detail.join(df_volume, on=['asin'], how='left').drop("asin_volume_type")
        self.df_asin_weight_and_volume.unpersist()

    # 打包数量解析
    def get_package_quantity(self):
        self.df_asin_detail = self.df_asin_detail.repartition(100)
        self.df_user_package_num = self.df_user_package_num.repartition(100)
        self.df_asin_detail = self.df_asin_detail.withColumn("variat_attribute", F.concat_ws("&&&%", F.col("asin_color"), F.col("asin_style"), F.col("asin_size"), F.col("asin_material")))
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "title_parse", self.u_judge_package_quantity(self.df_asin_detail.asin_title)).withColumn(
            "variat_parse", self.u_judge_package_quantity(self.df_asin_detail.variat_attribute))
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "title_package_quantity", self.df_asin_detail.title_parse.getField("parse_package_quantity")).withColumn(
            "variat_package_quantity", self.df_asin_detail.variat_parse.getField("parse_package_quantity")).withColumn(
            "title_package_quantity_is_abnormal", self.df_asin_detail.title_parse.getField("is_package_quantity_abnormal")
        ).withColumn(
            "variat_package_quantity_is_abnormal", self.df_asin_detail.variat_parse.getField("is_package_quantity_abnormal")
        ).drop("title_parse", "variat_parse", "variat_attribute")
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "package_quantity", F.expr(""" CASE
                     WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity
                     WHEN title_package_quantity is not null THEN title_package_quantity
                     ELSE 1 END""")).withColumn(
            "is_package_quantity_abnormal", F.expr("""CASE
                     WHEN title_package_quantity is null  and variat_package_quantity is not null THEN variat_package_quantity_is_abnormal
                     WHEN title_package_quantity is not null THEN title_package_quantity_is_abnormal
                     ELSE 2 END""")).drop("title_package_quantity", "variat_package_quantity", "title_package_quantity_is_abnormal", "variat_package_quantity_is_abnormal")
        self.df_asin_detail = self.df_asin_detail.join(self.df_user_package_num, on=['asin', 'asin_title'], how='left')
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "package_quantity", F.coalesce(F.col("user_package_num"), F.col("package_quantity"))).withColumn(
            "is_package_quantity_abnormal", F.coalesce(F.col("user_is_package_quantity_abnormal"), F.col("is_package_quantity_abnormal"))).\
            drop("user_package_num", "user_is_package_quantity_abnormal").persist(StorageLevel.MEMORY_ONLY)
        print("打包数量解析完毕")
        self.df_asin_detail.select("asin", "asin_title", "package_quantity", "is_package_quantity_abnormal").show(10, truncate=False)
        self.df_user_package_num.unpersist()

    # 补充asin的一级分类ID(通过node_id以及分类描述补充)
    def handle_asin_top_category(self):
        self.df_asin_detail = self.df_asin_detail.join(self.df_asin_new_cate, on=['node_id'], how='left')
        self.df_asin_detail = self.df_asin_detail.withColumn("category_id", F.col("node_id"))
        df_asin_with_category_desc = self.df_asin_detail.filter("category_first_id is null and asin_category_desc is not null").select("asin", "asin_category_desc")
        df_asin_with_category_desc = df_asin_with_category_desc.withColumn(
            "asin_category_split", F.split(F.col("asin_category_desc"), "›")
        ).withColumn("category_first_name", F.lower(F.col("asin_category_split").getItem(0))).drop("asin_category_split", "asin_category_desc")
        df_asin_with_category_desc = df_asin_with_category_desc.join(self.df_asin_category, on=['category_first_name'], how='inner')
        df_asin_with_category_desc = df_asin_with_category_desc.withColumnRenamed("category_first_id", "category_first_id_with_name").drop("category_first_name")
        self.df_asin_detail = self.df_asin_detail.join(df_asin_with_category_desc, on=['asin'], how='left')
        self.df_asin_detail = self.df_asin_detail.withColumn("category_first_id", F.coalesce(F.col("category_first_id"), F.col("category_first_id_with_name"))).drop("category_first_id_with_name")
        self.df_asin_new_cate.unpersist()
        self.df_asin_category.unpersist()

    # 处理asin上架时间信息
    def handle_asin_launch_time(self):
        # 根据asin,且launch_time为空的,去找keep_date补全launch_time
        self.df_asin_detail = self.df_asin_detail.repartition(100)
        self.df_asin_detail = self.df_asin_detail.join(self.df_asin_keep_date, on='asin', how='left')
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "crawl_asin_launch_time", F.when(
                (F.col("crawl_asin_launch_time") <= self.launch_time_upper_limit) &
                (F.col("crawl_asin_launch_time") >= self.launch_time_lower_limit),
                F.col("crawl_asin_launch_time")
            ).otherwise(F.lit(None))
        )
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "asin_launch_time", F.when((F.isnull("crawl_asin_launch_time")) | (F.col("crawl_asin_launch_time") == 'null'), F.col("populated_asin_launch_time")).otherwise(F.col("crawl_asin_launch_time")))
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "asin_launch_time", F.when(
                (F.col("asin_launch_time") <= self.launch_time_upper_limit) &
                (F.col("asin_launch_time") >= self.launch_time_lower_limit),
                F.col("asin_launch_time")
            ).otherwise(F.lit(None))
        )
        self.df_asin_keep_date.unpersist()

    # 处理asin各类型信息
    def handle_asin_flag(self):
        # 生成is_asin_new字段(是否asin新品标记)
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "asin_is_new", self.udf_new_asin_flag(F.col('asin_launch_time'), F.lit(self.cal_date)))\
            .withColumn("asin_is_aadd", F.expr(f"""CASE WHEN INSTR(asin_img_type, '3') > 0 THEN 1 ELSE 0 END"""))\
            .withColumn("asin_is_video", F.expr(f"""CASE WHEN INSTR(asin_img_type, '2') > 0 THEN 1 ELSE 0 END"""))\
            .withColumn("asin_is_picture", F.expr(f"""CASE WHEN INSTR(asin_img_type, '1') > 0 THEN 1 ELSE 0 END"""))\
            .withColumn("asin_is_amazon", F.expr(f"""CASE WHEN asin_buy_box_seller_type == 1 THEN 1 ELSE 0 END"""))\
            .withColumn("asin_is_fba", F.expr(f"""CASE WHEN asin_buy_box_seller_type == 2 THEN 1 ELSE 0 END"""))\
            .withColumn("asin_is_fbm", F.expr(f"""CASE WHEN asin_buy_box_seller_type == 3 THEN 1 ELSE 0 END"""))\
            .withColumn("asin_is_other", F.expr(f"""CASE WHEN asin_buy_box_seller_type == 4 THEN 1 ELSE 0 END"""))\
            .withColumn("asin_is_brand", F.when((F.col("asin_brand_name").cast("string") != 'null') &
                                                (F.col("asin_brand_name").cast("string") != 'none'), 1).otherwise(F.lit(0)))\
            .withColumn("asin_quantity_variation_type", F.when((F.lower(F.col("asin_size").cast("string")) != 'null') &
                               (F.lower(F.col("asin_size").cast("string")) != 'none') &
                               (F.lower(F.col("asin_size").cast("string")).contains('quantity')), 1).otherwise(F.lit(0)))
        if self.site_name == 'us':
            pg_sql = f"""
            select asin_brand_name, 1 as asin_is_alarm from (select lower(trim(brand_name)) as asin_brand_name from brand_alert_erp where brand_name is not null) t group by asin_brand_name"""
            db_type = "postgresql_cluster"
            con_info = DBUtil.get_connection_info(db_type=db_type, site_name=self.site_name)
            if con_info is not None:
                df_alarm_brand = SparkUtil.read_jdbc_query(
                    session=self.spark, url=con_info['url'], pwd=con_info['pwd'], username=con_info['username'], query=pg_sql)
                df_alarm_brand = df_alarm_brand.repartition(100)
                self.df_asin_detail = self.df_asin_detail.join(df_alarm_brand, on=['asin_brand_name'], how='left')
        else:
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_is_alarm", F.lit(0))
        self.df_asin_detail = self.df_asin_detail.na.fill({"asin_is_alarm": 0})
        # 处理是否内部asin信息
        self.df_asin_detail = self.df_asin_detail.join(self.df_self_asin, on=['asin'], how='left')
        self.df_asin_detail = self.df_asin_detail.na.fill({"asin_is_self": 0})
        self.df_self_asin.unpersist()

    # 处理影视标签字段
    def handle_asin_label(self):
        self.df_asin_detail = self.df_asin_detail.join(self.df_asin_label, on=['asin'], how='left')
        self.df_asin_label.unpersist()

    # 处理asin小图信息
    def handle_asin_img_info(self):
        if self.date_type in ['month', 'month_week'] and self.date_info >= '2024-06':
            img_schema = ArrayType(ArrayType(StringType()))
            df_asin_with_img = self.df_asin_detail.withColumn("img_list", F.from_json(F.col("img_list"), img_schema)).filter(F.size("img_list") > 0).\
                select("asin", F.explode("img_list").alias("img_attributes")).\
                select("asin", F.col("img_attributes")[1].alias("img_url"), F.col("img_attributes")[2].alias("img_order_by"),
                       F.col("img_attributes")[3].alias("data_type"))
            df_asin_with_img_agg = df_asin_with_img.groupby("asin").agg(
                F.to_json(F.collect_list(F.struct(F.col("img_url"), F.col("img_order_by"), F.col("data_type")))).alias("img_list")
            )
            self.df_asin_detail = self.df_asin_detail.drop("img_list")
            self.df_asin_detail = self.df_asin_detail.join(df_asin_with_img_agg, on=['asin'], how='left')
        else:
            pass

    # 处理parent_asin下最新变体信息
    def handle_latest_variation_info(self):
        if self.date_type in ['month', 'month_week'] and self.date_info >= '2024-06':
            max_report_sql = f"""
                SELECT MAX(date_info) as table_date_info FROM {self.doris_db}.{self.parent_asin_latest_detail_table}
            """
            df_date_info = DorisHelper.spark_import_with_sql(self.spark, query=max_report_sql)
            table_date_info = df_date_info.take(1)[0]['table_date_info']
            print("doris中记录最新的日期为:", table_date_info)
            if self.date_info >= table_date_info:
                df_asin_variat = self.df_asin_detail.filter("parent_asin is not null").select("parent_asin", "variat_list_change", "created_time")
                latest_asin_window = Window.partitionBy('parent_asin').orderBy(
                    F.desc_nulls_last("created_time")
                )
                df_asin_variat = df_asin_variat.withColumn("p_rank", F.row_number().over(window=latest_asin_window))
                df_asin_variat = df_asin_variat.filter("p_rank = 1").drop("p_rank")
                df_asin_variat =df_asin_variat.filter(F.size("variat_list_change") > 0). \
                    select("parent_asin", "created_time", F.explode("variat_list_change").alias("variant_attribute")). \
                    select("parent_asin", "created_time", F.col("variant_attribute")[0].alias("asin"),
                           F.col("variant_attribute")[1].alias("color"), F.col("variant_attribute")[3].alias("size"),
                           F.col("variant_attribute")[5].alias("style"))
                df_asin_variat_agg = df_asin_variat.groupby(['parent_asin']).agg(
                    F.first("created_time").alias("asin_crawl_date"),
                    F.concat_ws(',', F.collect_list("asin")).alias("variation_info"),
                    F.to_json(F.collect_list(F.struct(F.col("color"), F.col("size"), F.col("style")))).alias("attr_info")
                )
                print("导出父ASIN最新变体信息到doris:")
                df_doris = df_asin_variat_agg.select(
                    "parent_asin", F.lit(self.date_info).alias("date_info"), "asin_crawl_date", "variation_info", "attr_info")
                table_columns="parent_asin, date_info, asin_crawl_date, variation_info, attr_info"
                DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns)
            else:
                print("不用导出旧数据到doris中")
                pass
        else:
            pass

    # 字段标准化及存储
    def df_save(self):
        df_save = self.df_asin_detail \
            .select("asin",
                    self.handle_string_num_value('asin_title').alias('asin_title'),
                    "asin_title_len",
                    F.when(F.col('asin_price') < 0, F.lit(None)).otherwise(F.col('asin_price')).alias('asin_price'),
                    "asin_rating", "asin_total_comments", "asin_buy_box_seller_type", "asin_page_inventory",
                    self.handle_string_num_value('asin_category_desc').alias('asin_category_desc'),
                    self.handle_string_num_value('asin_volume').alias('asin_volume'),
                    "asin_weight", "asin_color", "asin_size", "asin_style", "asin_is_sale",
                    F.lit(None).alias("asin_rank"),
                    self.handle_string_num_value('asin_launch_time').alias('asin_launch_time'),
                    "asin_is_new", "asin_img_num",
                    self.handle_string_num_value('asin_img_type').alias('asin_img_type'),
                    "asin_category_state",
                    self.handle_string_num_value('asin_material').alias('asin_material'),
                    self.handle_string_num_value('asin_brand_name').alias('asin_brand_name'),
                    F.lit(None).alias("bsr_cate_1_id"),
                    F.lit(None).alias("bsr_cate_current_id"),
                    self.handle_string_num_value('asin_activity_type').alias('asin_activity_type'),
                    "act_one_two_val", "act_three_four_val", "act_five_six_val", "act_eight_val",
                    F.lit(None).alias("qa_num"),
                    "one_star", "two_star", "three_star", "four_star", "five_star", "low_star",
                    self.handle_string_num_value('together_asin').alias('together_asin'),
                    self.handle_string_num_value('ac_name').alias('ac_name'),
                    self.handle_string_num_value('node_id').alias('node_id'),
                    "asin_data_type",
                    F.lit(None).alias('sp_num'),
                    self.handle_string_num_value('asin_describe').alias('asin_describe'),
                    "asin_is_amazon", "asin_is_fba", "asin_is_fbm", "asin_is_other", "asin_is_picture", "asin_is_video",
                    "asin_is_aadd",
                    self.handle_string_num_value('asin_img_url').alias('asin_img_url'),
                    "account_id", "account_name", "variation_num", "asin_is_brand", "asin_is_alarm", "created_time",
                    "updated_time", "parent_asin", "asin_is_movie",
                    self.handle_string_num_value('asin_label_list').alias('asin_label_list'),
                    self.handle_string_num_value('asin_weight_type').alias('asin_weight_type'),
                    self.handle_string_num_value('asin_weight_str').alias('asin_weight_str'),
                    F.lit(None).alias('asin_package_quantity'),
                    F.lit(None).alias('asin_pattern_name'),
                    "category_id", "category_first_id",
                    F.lit(None).alias("buy_data_bought_month"),
                    F.lit(None).alias("buy_data_bought_week"),
                    F.lit(None).alias("buy_data_viewed_month"),
                    F.lit(None).alias("buy_data_viewed_week"),
                    "crawl_asin_launch_time", "populated_asin_launch_time", "asin_follow_sellers", "asin_image_view",
                    "product_description", "asin_spider_num", "asin_lob_info", "is_contains_lob_info",
                    "package_quantity", "is_package_quantity_abnormal", "asin_quantity_variation_type", "seller_json",
                    "asin_bought_month", "asin_length", "asin_width", "asin_height", "asin_is_self",
                    "customer_reviews_json", "img_list", "variat_list",
                    F.lit(self.site_name).alias('site_name'),
                    F.lit(self.date_type).alias('date_type'),
                    F.lit(self.date_info).alias('date_info')).persist(StorageLevel.MEMORY_ONLY)
        print("dim_asin_detail处理完毕, 最后的数据量为: ", df_save.count())
        df_save = df_save.filter(F.length(F.col("asin")) <= 10)
        df_save = df_save.repartition(100)
        df_save.show(10, truncate=False)
        print(f"清除hdfs目录中:{self.hdfs_path}")
        HdfsUtils.delete_file_in_folder(self.hdfs_path)
        partition_by = ["site_name", "date_type", "date_info"]
        print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}")
        df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=self.partitions_by)
        print("success")

    def run(self):
        # 读取数据
        self.read_data()
        # asin详情数据去重
        self.handle_df_duplicated()
        # 处理asin的亚马逊月销信息
        self.handle_asin_bought_month()
        # 处理asin的lob_info信息
        self.handle_asin_lob_info()
        # 处理asin的变体信息
        self.handle_asin_variation_attribute()
        # 处理asin的配送方式信息
        self.handle_asin_buy_box_seller_type()
        # 处理asin体积重量信息
        self.handle_asin_basic_attribute()
        # 打包数量解析
        self.get_package_quantity()
        # 获取node_id对应的分类信息(头部分类)
        self.handle_asin_top_category()
        # 处理asin上架时间信息
        self.handle_asin_launch_time()
        # 处理asin各类型信息
        self.handle_asin_flag()
        # 处理影视标签字段
        self.handle_asin_label()
        # 处理asin小图信息
        self.handle_asin_img_info()
        # 处理parent_asin下最新变体信息
        self.handle_latest_variation_info()
        # 字段标准化及存储
        self.df_save()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter/day
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
    handle_obj = DimAsinDetail(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()