import os
import sys

from pyspark.sql.types import DoubleType, StringType

sys.path.append(os.path.dirname(sys.path[0]))

from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil, DateTypes
# 上级目录
from pyspark.sql.window import Window
from pyspark.sql import functions as F

from yswg_utils.common_udf import parse_weight_str
from yswg_utils.common_udf import udf_handle_string_null_value
from yswg_utils.common_df import get_node_first_id_df
from utils.redis_utils import RedisUtils


class DimCalAsinDetail(object):

    def __init__(self, site_name, date_type, date_info):
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info

        # 初始化参数
        self.partitions_by = ['site_name']
        self.partitions_num = CommonUtil.reset_partitions(self.site_name, partitions_num=80)
        app_name = f"{self.__class__.__name__}:{self.site_name}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.hive_table = f'dim_cal_asin_history_detail'
        self.partition_dict = {
            "site_name": site_name
        }
        self.udf_parse_weight_str_reg = self.spark.udf.register("udf_parse_weight_str_reg", self.udf_parse_weight_str, DoubleType())
        self.udf_handle_null_value = self.spark.udf.register("udf_handle_null_value", udf_handle_string_null_value, StringType())

    @staticmethod
    def udf_parse_weight_str(weight_str: str, site_name: str):
        """
        解析重量
        :param weight_str:
        :param site_name:
        :return:
        """
        if weight_str is None:
            return None
        weight_val, unit = parse_weight_str(weight_str, site_name)
        if weight_val != 'none' and weight_val is not None:
            return float(weight_val)

    def run(self):

        print(f"读取数据中.....")
        if self.date_type == 'all':
            # 读取dim_asin_detail
            sql = f"""select 
                              asin, 
                              asin_img_url, 
                              asin_title,
                              asin_title_len,
                              asin_category_desc,
                              asin_rank,
                              asin_volume,
                              asin_weight,
                              asin_color,
                              asin_size,
                              asin_style,
                              asin_price,
                              asin_rating,
                              asin_total_comments,
                              asin_material,
                              asin_brand_name,
                              asin_page_inventory,
                              asin_buy_box_seller_type,
                              asin_launch_time,
                              asin_img_num,
                              asin_img_type,
                              asin_is_sale,
                              bsr_cate_1_id,
                              bsr_cate_current_id,
                              asin_is_amazon,
                              asin_is_FBA,
                              asin_is_FBM,
                              asin_is_other,
                              udf_handle_null_value(node_id) as node_id,
                              asin_is_picture,
                              asin_is_video,
                              asin_is_aadd, 
                              date_format(created_time,'{CommonUtil._date_time_format}') as asin_crawl_date
                               from dim_asin_detail 
                              where site_name='{self.site_name}' 
                              and date_type='month' ;
                                """
            self.date_type = 'day_all'
        elif self.date_type in (DateTypes.week.name, DateTypes.month.name, DateTypes.month_week.name):
            sql = f"""select 
                              asin, 
                              asin_img_url, 
                              asin_title,
                              asin_title_len,
                              asin_category_desc,
                              asin_rank,
                              asin_volume,
                              asin_weight,
                              asin_color,
                              asin_size,
                              asin_style,
                              asin_price,
                              asin_rating,
                              asin_total_comments,asin_material,
                              asin_brand_name,
                              asin_page_inventory,
                              asin_buy_box_seller_type,
                              asin_launch_time,
                              asin_img_num,
                              asin_img_type,
                              asin_is_sale,
                              bsr_cate_1_id,
                              bsr_cate_current_id,
                              asin_is_amazon,
                              asin_is_FBA,
                              asin_is_FBM,
                              asin_is_other,
                              udf_handle_null_value(node_id) as node_id,
                              asin_is_picture,
                              asin_is_video,
                              asin_is_aadd, 
                              date_format(created_time,'{CommonUtil._date_time_format}') as asin_crawl_date
                               from dim_asin_detail 
                              where site_name='{self.site_name}' 
                              and date_type='{self.date_type}' 
                              and date_info = '{self.date_info}';
                              """
        else:
            sql = f"""
                        select 
                              asin, 
                              asin_img_url, 
                              asin_title,
                              asin_title_len,
                              asin_category_desc,
                              asin_rank,
                              asin_volume,
                              asin_weight,
                              asin_color,
                              asin_size,
                              asin_style,
                              asin_price,
                              asin_rating,
                              asin_total_comments,asin_material,
                              asin_brand_name,
                              asin_page_inventory,
                              asin_buy_box_seller_type,
                              asin_launch_time,
                              asin_img_num,
                              asin_img_type,
                              asin_is_sale,
                              bsr_cate_1_id,
                              bsr_cate_current_id,
                              asin_is_amazon,
                              asin_is_FBA,
                              asin_is_FBM,
                              asin_is_other,
                              node_id,
                              asin_is_picture,
                              asin_is_video,
                              asin_is_aadd, 
                              date_format(created_time,'{CommonUtil._date_time_format}') as asin_crawl_date
                               from dim_asin_detail 
                              where site_name='{self.site_name}' 
                              limit 0
            """

        print("======================整合搜索词 day asin 中... sql如下======================")
        print(sql)
        df_asin_detail = self.spark.sql(sqlQuery=sql)
        self_asin_sql = None
        if self.date_type == DateTypes.day.name:
            self_asin_sql = f"""
select asin                                  as asin,
	   img_url                               as asin_img_url,
	   title                                 as asin_title,
	   title_len                             as asin_title_len,
	   category                              as asin_category_desc,
	   rank                                  as asin_rank,
	   volume                                as asin_volume,
	   udf_parse_weight_str_reg(weight_str,'{self.site_name}')  as asin_weight,
	   null                                  as asin_color,
	   null                                  as asin_size,
	   null                                  as asin_style,
	   price                                 as asin_price,
	   rating                                as asin_rating,
	   total_comments                        as asin_total_comments,
	   material                              as asin_material,
	   brand                                 as asin_brand_name,
	   page_inventory                        as asin_page_inventory,
	   buy_box_seller_type                   as asin_buy_box_seller_type,
	   launch_time                           as asin_launch_time,
	   img_num                               as asin_img_num,
	   img_type                              as asin_img_type,
	   null                                  as asin_is_sale,
	   null                                  as bsr_cate_1_id,
	   null                                  as bsr_cate_current_id,
	   if(buy_box_seller_type == 1, 1, 0)    as asin_is_amazon,
	   if(buy_box_seller_type == 2, 1, 0)    as asin_is_FBA,
	   if(buy_box_seller_type == 3, 1, 0)    as asin_is_FBM,
	   if(buy_box_seller_type == 4, 1, 0)    as asin_is_other,
	   node_id,
	   if(locate(1, img_type) > 0, 1, 0)     as asin_is_picture,
	   if(locate(2, img_type) > 0, 1, 0)     as asin_is_video,
	   if(locate(3, img_type) > 0, 1, 0)     as asin_is_aadd,
	   date_format(created_at, '{CommonUtil._date_time_format}') as asin_crawl_date
from ods_self_asin_detail
where site_name = '{self.site_name}'
  and date_type = '{self.date_type}'
  and date_info = '{self.date_info}';
            """
        elif self.date_type == "day_all":
            self_asin_sql = f"""
            select asin                                         as asin,
	   img_url                                                  as asin_img_url,
	   title                                                    as asin_title,
	   title_len                                                as asin_title_len,
	   category                                                 as asin_category_desc,
	   rank                                                     as asin_rank,
	   volume                                                   as asin_volume,
	   udf_parse_weight_str_reg(weight_str, '{self.site_name}') as asin_weight,
	   null                                                     as asin_color,
	   null                                                     as asin_size,
	   null                                                     as asin_style,
	   price                                                    as asin_price,
	   rating                                                   as asin_rating,
	   total_comments                                           as asin_total_comments,
	   material                                                 as asin_material,
	   brand                                                    as asin_brand_name,
	   page_inventory                                           as asin_page_inventory,
	   buy_box_seller_type                                      as asin_buy_box_seller_type,
	   launch_time                                              as asin_launch_time,
	   img_num                                                  as asin_img_num,
	   img_type                                                 as asin_img_type,
	   null                                                     as asin_is_sale,
	   null                                                     as bsr_cate_1_id,
	   null                                                     as bsr_cate_current_id,
	   if(buy_box_seller_type == 1, 1, 0)                       as asin_is_amazon,
	   if(buy_box_seller_type == 2, 1, 0)                       as asin_is_FBA,
	   if(buy_box_seller_type == 3, 1, 0)                       as asin_is_FBM,
	   if(buy_box_seller_type == 4, 1, 0)                       as asin_is_other,
	   if(locate(1, img_type) > 0, 1, 0)                        as asin_is_picture,
	   node_id,
	   if(locate(2, img_type) > 0, 1, 0)                        as asin_is_video,
	   if(locate(3, img_type) > 0, 1, 0)                        as asin_is_aadd,
	   date_format(created_at, '{CommonUtil._date_time_format}')           as asin_crawl_date
from (
		 select *,
				row_number() over (partition by asin order by date_info desc) as row_number
		 from ods_self_asin_detail
		 where site_name = '{self.site_name}'
		   and date_type = '{DateTypes.day.name}'
	 )
where row_number = 1;
            """

        if self_asin_sql is not None:
            print("======================整合day asin 中sql如下======================")
            print(self_asin_sql)
            df_self_asin_detail = self.spark.sql(sqlQuery=self_asin_sql)
            # 合并要更新的数据
            df_asin_detail = df_asin_detail.unionByName(df_self_asin_detail, allowMissingColumns=False)
            pass

        # 判断是否有数据需要整合
        if df_asin_detail.first() == None:
            print("============================无数据跳过===================================")
            return

        print("======================获取dim_bsr_category_tree first_id======================")
        df_node_cate = get_node_first_id_df(self.site_name, self.spark)
        df_asin_detail = df_asin_detail.join(
            df_node_cate, on=['node_id'], how='left'
        )

        # 读取dim_cal_asin_history_detail
        sql = f"""select 
              asin, 
              asin_img_url, 
              asin_title,
              asin_title_len,
              asin_category_desc,
              asin_rank,
              asin_volume,
              asin_weight,
              asin_color,
              asin_size,
              asin_style,
              asin_price,
              asin_rating,
              asin_total_comments,
              asin_material,
              asin_brand_name,
              asin_page_inventory,
              asin_buy_box_seller_type,
              asin_launch_time,
              asin_img_num,
              asin_img_type,
              asin_is_sale,
              bsr_cate_1_id,
              bsr_cate_current_id,
              asin_is_amazon,
              asin_is_FBA,
              asin_is_FBM,
              asin_is_other,
              asin_is_picture,
              asin_is_video,
              asin_is_aadd, 
              asin_crawl_date,
              node_id,
              category_first_id
               from dim_cal_asin_history_detail 
               where site_name='{self.site_name}' ;
              """
        df_asin_cal_detail = self.spark.sql(sqlQuery=sql)
        print("======================查询sql如下======================")
        print(sql)
        df_asin_detail = self.handle_df_duplicated(df_asin_detail, df_asin_cal_detail)
        df_save = self.handle_column(df_asin_detail).repartition(self.partitions_num)
        # print("self.df_save", df_save.show(10, truncate=False))
        # quit()
        CommonUtil.save_or_update_table(spark_session=self.spark, hive_tb_name=self.hive_table,
                                        partition_dict=self.partition_dict, df_save=df_save, drop_exist_tmp_flag=False)
        print("success")

    # 根据asin去重,取dt最大的asin保留
    def handle_df_duplicated(self, df_asin_detail, df_asin_cal_detail):
        print("针对asin进行数据去重...")
        # 将新老数据进行合并
        # df_asin_detail = df_asin_detail.union(df_asin_cal_detail)
        df_asin_detail = df_asin_detail.unionByName(df_asin_cal_detail, allowMissingColumns=True)

        # asin窗口内排序,按照dt降序
        window = Window.partitionBy(['asin']).orderBy(
            df_asin_detail.asin_crawl_date.desc_nulls_last()
        )
        df_asin_detail = df_asin_detail.withColumn("sort_top", F.row_number().over(window=window))
        # 取按asin分组的组内第一条,就是去重后的最新asin_detail
        df_asin_detail = df_asin_detail.filter("sort_top=1")

        # 去除掉排序字段
        df_asin_detail = df_asin_detail.drop("asin_dt_top", "dt")

        return df_asin_detail

    def handle_column(self, df_asin_detail):

        df_save = df_asin_detail.select("asin",
                                        "asin_title",
                                        F.col("asin_title_len").cast('int').alias('asin_title_len'),
                                        "asin_category_desc",
                                        F.col("asin_rank").cast('int').alias('asin_rank'),
                                        self.udf_handle_null_value("asin_volume").alias("asin_volume"),
                                        F.col("asin_weight").cast('double').alias('asin_weight'),
                                        self.udf_handle_null_value("asin_color").alias("asin_color"),
                                        self.udf_handle_null_value("asin_size").alias("asin_size"),
                                        self.udf_handle_null_value("asin_style").alias("asin_style"),
                                        "asin_price",
                                        "asin_rating",
                                        F.col("asin_total_comments").cast('int').alias('asin_total_comments'),
                                        self.udf_handle_null_value("asin_material").alias("asin_material"),
                                        self.udf_handle_null_value("asin_brand_name").alias("asin_brand_name"),
                                        "bsr_cate_1_id",
                                        "bsr_cate_current_id",
                                        F.col("asin_page_inventory").cast('int').alias('asin_page_inventory'),
                                        F.col("asin_buy_box_seller_type").cast('int').alias('asin_buy_box_seller_type'),
                                        "asin_is_amazon",
                                        "asin_is_fba",
                                        "asin_is_fbm",
                                        "asin_is_other",
                                        F.col("asin_is_sale").cast('int').alias('asin_is_sale'),
                                        "asin_launch_time",
                                        F.col("asin_img_num").cast('int').alias('asin_img_num'),
                                        "asin_img_type",
                                        "asin_is_picture",
                                        F.col("asin_is_video").cast('int').alias('asin_is_video'),
                                        F.col("asin_is_aadd").cast('int').alias('asin_is_aadd'),
                                        self.udf_handle_null_value("asin_img_url").alias("asin_img_url"),
                                        "asin_crawl_date",
                                        "node_id",
                                        "category_first_id"
                                        )

        # 预留字段补全
        df_save = df_save.withColumn("re_string_field1", F.lit(None))
        df_save = df_save.withColumn("re_string_field2", F.lit(None))
        df_save = df_save.withColumn("re_string_field3", F.lit(None))
        df_save = df_save.withColumn("re_int_field1", F.lit(None))
        df_save = df_save.withColumn("re_int_field2", F.lit(None))
        df_save = df_save.withColumn("re_int_field3", F.lit(None))

        # 分区字段补全
        df_save = df_save.withColumn("site_name", F.lit(self.site_name))
        return df_save


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)  # 参数1:站点
    date_type = CommonUtil.get_sys_arg(2, None)  # 参数2:类型:week/4_week/month/quarter/day
    date_info = CommonUtil.get_sys_arg(3, None)  # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
    lock_name = "dim_cal_asin_history_detail"
    if date_type == "all":
        # 如果执行数据为all的情况,非自然解锁情况,则需锁设定该表90分钟
        lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=90 * 60, retry_flag=True, retry_time=10 * 60)
    else:
        lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=30 * 60, retry_flag=True, retry_time=10 * 60)
    if lock_flag:
        try:
            obj = DimCalAsinDetail(site_name, date_type, date_info)
            obj.run()
        finally:
            # 执行完成后释放锁
            RedisUtils.release_redis_lock(lock_name)