import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from pyspark.sql import functions as F, Window
from utils.common_util import CommonUtil, DateTypes
from utils.spark_util import SparkUtil
from pyspark.sql.types import BooleanType
from yswg_utils.common_df import get_self_asin_df
from yswg_utils.common_df import get_bsr_category_tree_df, get_asin_unlanuch_df
from yswg_utils.common_udf import category_craw_flag

"""
聚合nsr榜单Asin
"""


class DwtBsrAsinDetailAll(object):

    def __init__(self, site_name, date_info):
        self.site_name = site_name
        self.date_info = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.hive_tb = "dwt_nsr_asin_detail_all"
        self.current_month = CommonUtil.reformat_date(self.date_info, "%Y-%m-%d", "%Y-%m", )
        self.udf_category_craw_flag = F.udf(category_craw_flag, BooleanType())
        pass

    def run(self):
        df_dwt_flow_asin_part = CommonUtil.select_partitions_df(self.spark, "dwt_flow_asin")

        dwt_flow_asin_last_month = df_dwt_flow_asin_part \
            .filter(f"date_type = '{DateTypes.month.name}' and site_name = '{self.site_name}' ") \
            .selectExpr("max(date_info)").rdd.flatMap(lambda it: it).collect()[0]

        if date_info <= '2023-08-14':
            sql = f"""
with asin_all as (
	select *,
           row_number() over ( order by site_name) as id
	from dwd_nsr_asin_rank
	where site_name = '{self.site_name}'
	  and date_type = 'last30day'
	  and date_info = '{self.date_info}'
),
	 asin_detail as (
		 select *
		 from dwt_nsr_asin_detail
		 where site_name = '{self.site_name}'
		   and date_info = '{self.current_month}'
	 )
select asin_all.asin,
	   category_id,
	   bsr_rank,
	   is_1_day_flag,
	   is_7_day_flag,
	   is_30_day_flag,
	   bsr_count,
	   is_asin_new,
	   is_asin_bsr_new,
	   last_bsr_day,
	   title,
	   img_url,
	   img_type,
	   ao_val,
	   rating,
	   total_comments,
	   bsr_orders,
	   bsr_orders_change,
	   price,
	   weight,
	   launch_time,
	   brand_name,
	   buy_box_seller_type,
	   account_name,
	   volume,
	   last_update_time,
	   asin_air_freight_gross_margin,
	   asin_ocean_freight_gross_margin
from asin_all
		 left join asin_detail on asin_all.asin = asin_detail.asin;
"""
        else:
            sql = f"""
    with asin_all as (
        select asin,
               category_id,
               bsr_rank,
               is_1_day_flag,
               is_7_day_flag,
               is_30_day_flag,
               bsr_count,
               is_asin_new,
               is_asin_bsr_new,
               last_bsr_day,
               row_number() over ( order by site_name) as id
        from dwd_nsr_asin_rank
        where site_name = '{self.site_name}'
          and date_type = 'last30day'
          and date_info = '{self.date_info}'
    ),
         account_name_tb as (
             select asin,
                    first(fd_account_name) as account_name
             from dim_fd_asin_info
             where site_name = '{self.site_name}'
             group by asin
         ),
         asin_his as (
             select asin,
                    asin_title               as title,
                    asin_img_url             as img_url,
                    asin_img_type            as img_type,
                    asin_rating              as rating,
                    asin_total_comments      as total_comments,
                    asin_price               as price,
                    asin_weight              as weight,
                    asin_launch_time         as launch_time,
                    asin_volume              as volume,
                    asin_brand_name          as brand_name,
                    asin_buy_box_seller_type as buy_box_seller_type,
                    asin_crawl_date          as last_update_time
             from dim_cal_asin_history_detail
             where site_name = '{self.site_name}'
         ),
         flow_asin as (
             select asin,
                    asin_ao_val                     as ao_val,
                    bsr_orders                      as bsr_orders,
                    asin_bsr_orders_change          as bsr_orders_change,
                    asin_air_freight_gross_margin   as asin_air_freight_gross_margin,
                    asin_ocean_freight_gross_margin as asin_ocean_freight_gross_margin
             from dwt_flow_asin
             where site_name = '{self.site_name}'
               and date_type = '{DateTypes.month.name}'
               and date_info = '{dwt_flow_asin_last_month}'
         )
    
    select asin_all.asin,
           category_id,
           bsr_rank,
           is_1_day_flag,
           is_7_day_flag,
           is_30_day_flag,
           bsr_count,
           is_asin_new,
           is_asin_bsr_new,
           last_bsr_day,
           title,
           img_url,
           img_type,
           ao_val,
           rating,
           total_comments,
           bsr_orders,
           bsr_orders_change,
           price,
           weight,
           launch_time,
           brand_name,
           buy_box_seller_type,
           account_name,
           volume,
           last_update_time,
           asin_air_freight_gross_margin,
           asin_ocean_freight_gross_margin
    from asin_all
             left join account_name_tb on asin_all.asin = account_name_tb.asin
             left join flow_asin on asin_all.asin = flow_asin.asin
             left join asin_his on asin_all.asin = asin_his.asin
                """
        print("======================查询sql如下======================")
        print(sql)

        df_all = self.spark.sql(sql)

        df_self_asin = get_self_asin_df(self.site_name, self.spark).select(
            F.col("asin"),
            F.lit(1).alias("asin_type"),
        )
        category_df = get_bsr_category_tree_df(self.site_name, self.spark).select(
            F.col("category_id"),
            F.col("category_first_id"),
        )
        df_unlanuch = get_asin_unlanuch_df(self.site_name, self.spark)

        df_all = df_all \
            .join(df_self_asin, on=["asin"], how='left') \
            .join(df_unlanuch, on=["asin"], how='left') \
            .join(category_df, on=["category_id"], how='left')

        df_all = df_all.withColumn("crawl_flag", self.udf_category_craw_flag(F.col("category_first_id"), F.col("asin")))
        # 生成id
        df_all = df_all.withColumn("id", F.row_number().over(window=Window.orderBy(F.lit(1))))

        df_all = df_all.select(
            F.col("id"),
            F.col('asin'),
            F.col('category_id'),
            F.col('bsr_rank'),
            F.col('is_1_day_flag'),
            F.col('is_7_day_flag'),
            F.col('is_30_day_flag'),
            F.col('bsr_count'),
            F.col('is_asin_new'),
            F.col('is_asin_bsr_new'),
            F.col('last_bsr_day'),
            F.col('title'),
            F.col('img_url'),
            F.col('img_type'),
            F.col('ao_val'),
            F.col('rating'),
            F.col('total_comments'),
            F.col('bsr_orders'),
            F.col('bsr_orders_change'),
            F.col('price'),
            F.col('weight'),
            F.col('launch_time'),
            F.trim(F.col('brand_name')).alias("brand_name"),
            F.col('buy_box_seller_type'),
            F.col('account_name'),
            F.col('volume'),
            F.col('last_update_time'),
            F.col('asin_air_freight_gross_margin'),
            F.col('asin_ocean_freight_gross_margin'),
            F.col("category_first_id"),

            F.when(F.col("asin_type").isNotNull(), F.lit(1))
                .when(F.col("crawl_flag") == False, F.lit(2))
                .otherwise(0).alias("asin_type"),
            #  asin 下架时间
            F.col("asin_unlaunch_time"),
            F.lit(self.site_name).alias("site_name"),
            F.lit(self.date_info).alias("date_info")
        )

        df_all = df_all.repartition(15)
        partition_dict = {
            "site_name": self.site_name,
            "date_info": self.date_info
        }
        partition_by = list(partition_dict.keys())
        # 自动对齐
        df_all = CommonUtil.format_df_with_template(self.spark, df_all, self.hive_tb, roundDouble=True)
        hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=partition_dict)
        print(f"清除hdfs目录中:{hdfs_path}")
        HdfsUtils.delete_file_in_folder(hdfs_path)
        hive_tb = self.hive_tb
        print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
        df_all.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_info = CommonUtil.get_sys_arg(2, None)
    obj = DwtBsrAsinDetailAll(site_name=site_name, date_info=date_info)
    obj.run()