import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from pyspark.sql.types import MapType, StringType, IntegerType

"""
计算asin某个时间纬度内是否上过nsr榜单
"""


class DwdNsrAsinRank(object):

    def __init__(self, site_name, date_info):
        self.site_name = site_name
        self.date_info = date_info
        # 默认数据范围是30天
        self.date_type = "last30day"

        # 30day前
        self.last_30_day = CommonUtil.get_day_offset(date_info, -30)
        # 7 day前
        self.last_7_day = CommonUtil.get_day_offset(date_info, -7)
        self.current_day = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        #  注册本地静态方法 udf 返回新函数
        self.udf_calc_rank_reg = self.spark.udf.register("udf_calc_rank", self.udf_calc_rank, MapType(StringType(), IntegerType()))

        self.hive_tb = "dwd_nsr_asin_rank"
        hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwd/{self.hive_tb}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_file_in_folder(hdfs_path)

    @staticmethod
    def udf_calc_rank(dayArr: list, rankArr: list):
        """
        判断 是否新上榜等
        :return:
        """
        bsr_count = len(rankArr)
        is_30_day_flag = 1
        is_1_day_flag = 0
        is_7_day_flag = 0
        last_7_day = dayArr[0]
        start_day = dayArr[1]
        end_day = dayArr[2]
        for item in rankArr:
            date_info = item[1] + ""

            if date_info >= last_7_day:
                is_7_day_flag = 1

            if date_info >= start_day:
                is_30_day_flag = 1

            if date_info >= end_day:
                is_1_day_flag = 1

        return {
            "bsr_count": bsr_count,
            "is_30_day_flag": is_30_day_flag,
            "is_1_day_flag": is_1_day_flag,
            "is_7_day_flag": is_7_day_flag,
        }

    def run(self):
        sql = f"""
            select dbarh.asin,
                   dbarh.category_id,
                   dbarh.bsr_rank,
                   dbarh.date_info,
                   tmp.asin_launch_time
            from dim_nsr_asin_rank_history dbarh
                     left join (
                select asin, asin_launch_time
                from dim_cal_asin_history_detail
                where site_name = "{self.site_name}"
            ) tmp on tmp.asin = dbarh.asin
            where dbarh.site_name = "{site_name}"
              and dbarh.date_info >= "{self.last_30_day}"
              and dbarh.date_info <= "{self.current_day}"
"""
        print("======================查询sql如下======================")
        print(sql)

        last_90_day = CommonUtil.get_day_offset(date_info, -90)

        df_all = self.spark.sql(sql)

        df_all = df_all \
            .groupBy(["category_id", "asin"]) \
            .agg(
            self.udf_calc_rank_reg(
                F.array(F.lit(self.last_7_day), F.lit(self.last_30_day), F.lit(self.current_day)),
                F.collect_set(F.array([df_all['bsr_rank'], df_all['date_info']]))
            ).alias("resultMap"),

            # 最新 bsr_rank
            F.max(F.struct("date_info", "bsr_rank")).alias("last_row"),

            # 近30天进入过BSR榜单的产品中,上架时间在90天以内的产品 asin_launch_time 为 null 表示是新上架的
            F.when(F.first("asin_launch_time") < F.lit(last_90_day), F.lit(0)).otherwise(1).alias("is_asin_new"),

        )

        df_save = df_all \
            .select(
            F.col("asin"),
            F.col("category_id"),
            F.col("last_row").getField("bsr_rank").alias("bsr_rank"),
            F.col("resultMap").getField("bsr_count").alias("bsr_count"),
            F.col("resultMap").getField("is_1_day_flag").alias("is_1_day_flag"),
            F.col("resultMap").getField("is_7_day_flag").alias("is_7_day_flag"),
            F.col("resultMap").getField("is_30_day_flag").alias("is_30_day_flag"),
            F.col("is_asin_new"),
            # 近30天进入过NSR榜单的产品中,30天内是首次上榜的产品 即30天上榜次数为1
            F.when(
                F.col("resultMap").getField("bsr_count") == 1, F.lit(1)
            ).otherwise(0).alias("is_asin_bsr_new"),
            # 最近排名上榜时间
            F.col("last_row").getField("date_info").alias("last_bsr_day"),
            F.lit(self.site_name).alias("site_name"),
            F.lit(self.date_type).alias("date_type"),
            F.lit(self.date_info).alias("date_info")
        )
        # 分区数量调整为2个
        df_save = df_save.repartition(2)
        partition_by = ["site_name", "date_type", "date_info"]
        print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
        df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_info = CommonUtil.get_sys_arg(2, None)
    obj = DwdNsrAsinRank(site_name, date_info)
    obj.run()