"""
# author: 方星钧(ffman)
# description: 基于ods_bs_category,ods_bs_category_asin_detail表,计算出dim_asin_bs_category维度的asin的bs分类情况
# table_read_name: ods_bs_category,ods_bs_category_asin_detail
# table_save_name: dim_asin_bs_category
# table_save_level: dim
# version: 1.0
# created_date: 2022-11-03
# updated_date: 2022-11-03
"""

import os
import sys
import re

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from pyspark.sql.window import Window


class DimBsAsinInfo(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2022-1'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        # 初始化self.spark对
        self.db_save = 'dim_asin_bs_category'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.df_save = self.spark.sql("select 1+1;")
        self.df_save_list = []
        self.df_cate = self.spark.sql("select 1+1;")
        self.df_cate_detail = self.spark.sql("select 1+1;")
        self.df_cate_asin_detail = self.spark.sql("select 1+1;")
        self.pattern_1 = ''  # 一级分类正则匹配
        self.pattern_current = ''  # 当前分类正则匹配
        self.node_1_list = []  # 一级分类名称对应的列表
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.reset_partitions(partitions_num=20)
        self.u_cate_1_en_name = self.spark.udf.register("u_cate_1_en_name", self.udf_cate_1_en_name, StringType())
        self.u_cate_current_en_name = self.spark.udf.register("u_cate_current_en_name", self.udf_cate_current_en_name,
                                                              StringType())

    @staticmethod
    def udf_cate_1_en_name(best_sellers_rank, site_name, cate_1_pattern):
        if site_name in ['us', 'uk']:
            pattern_1 = f"See Top 100 in ({cate_1_pattern})"
        elif site_name in ['de']:
            pattern_1 = f"Siehe Top 100 in ({cate_1_pattern})"
        elif site_name in ['es']:
            pattern_1 = f"Ver el Top 100 en ({cate_1_pattern})"
        elif site_name in ['fr']:
            pattern_1 = f"Voir les 100 premiers en ({cate_1_pattern})"
        elif site_name in ['it']:
            pattern_1 = f"Visualizza i Top 100 nella categoria ({cate_1_pattern})"
        else:
            pattern_1 = ''
        result_pattern = re.findall(pattern_1, best_sellers_rank)[-1] if re.findall(pattern_1,
                                                                                    best_sellers_rank) else ''
        if result_pattern == '':
            result_pattern = re.findall(cate_1_pattern, best_sellers_rank)[-1] if re.findall(cate_1_pattern,
                                                                                             best_sellers_rank) else ''
        return result_pattern

    @staticmethod
    def udf_cate_current_en_name(best_sellers_rank, site_name):
        if site_name in ['us', 'uk', 'de', 'it']:
            pattern_current = f"(\d+ in )"
        elif site_name in ['es', 'fr']:
            # pattern_current = f"\d+[.,]{0,}\d{0,} en "
            pattern_current = f"(\d+ en )"
        else:
            pattern_current = ''
        result_pattern = re.findall(pattern_current, best_sellers_rank)[-1] if re.findall(pattern_current,
                                                                                          best_sellers_rank) else ''
        r_id = best_sellers_rank.rfind(result_pattern)
        r_id_2 = best_sellers_rank.rfind(" (")
        if r_id != -1:
            if r_id_2 != -1 and r_id_2 > r_id:
                return best_sellers_rank[(r_id + len(result_pattern)):r_id_2]
            else:
                if ")" in best_sellers_rank[(r_id + len(result_pattern)):]:
                    return ''
                else:
                    return best_sellers_rank[r_id + len(result_pattern):]
        else:
            return ''

    def read_data(self):
        sql = f"select id as bs_id, en_name, nodes_num, one_category_id, and_en_name from ods_bs_category where site_name='{self.site_name}';"
        self.df_cate = self.spark.sql(sqlQuery=sql).cache()
        self.df_cate.show(10, truncate=False)
        # sql = f"select * from ods_{self.site_name}_bs_category_detail;"
        # self.df_cate_detail = self.spark.sql(sqlQuery=sql)
        sql = f"select asin, best_sellers_rank, site_name from ods_bs_category_asin_detail " \
              f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
        print("sql:", sql)
        self.df_cate_asin_detail = self.spark.sql(sqlQuery=sql).cache()
        self.df_cate_asin_detail.show(10, truncate=False)

    def handle_data(self):
        self.handle_cate_1()
        self.handle_cate_current()
        self.handle_cate_by_en_name()
        self.df_save = self.df_cate_asin_detail
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
        # self.df_save.show(10, truncate=False)

    def handle_cate_1(self):
        """
        匹配一级分类
        """
        # 1. 通过see top 100 in 进行匹配一级分类
        df_cate_1 = self.df_cate.filter("nodes_num==2").toPandas()
        pattern1_list = df_cate_1.en_name.to_numpy()
        # pattern_list = [f"{cate}" for cate in list(df_cate_1.en_name.to_numpy())]
        pattern = "|".join(pattern1_list)
        self.df_cate_asin_detail = self.df_cate_asin_detail.withColumn(
            "bs_cate_1_pattern",
            F.lit(pattern)
        )
        self.df_cate_asin_detail = self.df_cate_asin_detail.withColumn(
            "bs_cate_1_en_name",
            self.u_cate_1_en_name("best_sellers_rank", "site_name", "bs_cate_1_pattern")
        )
        self.df_cate_asin_detail = self.df_cate_asin_detail.drop("bs_cate_1_pattern")
        # self.df_cate_asin_detail.show(10)
        print(self.df_cate_asin_detail.filter("bs_cate_1_en_name =''").count(), self.df_cate_asin_detail.count())

    def handle_cate_current(self):
        """
        当前分类:进行正则匹配的规则
        """
        # df_cate_asin['rank_cuurent'] = df_cate_asin.best_sellers_rank.apply(
        #     lambda x: re.findall('\d+[.,]{0,}\d{0,} in ', x)[-1] if re.findall('\d+[.,]{0,}\d{0,} in ', x) else np.nan)
        self.df_cate_asin_detail = self.df_cate_asin_detail.withColumn(
            "bs_cate_current_en_name",
            self.u_cate_current_en_name('best_sellers_rank', 'site_name')
        )
        # self.df_cate_asin_detail.show(10)
        print(self.df_cate_asin_detail.filter("bs_cate_current_en_name =''").count(), self.df_cate_asin_detail.count())

    def handle_cate_by_en_name(self):
        print("1. 通过en_name匹配bs_cate_1_id和bs_cate_current_id")
        df_cate_1 = self.df_cate.filter("nodes_num=2").select("bs_id", "en_name")
        df_cate_1 = df_cate_1.withColumnRenamed("en_name", "bs_cate_1_en_name").withColumnRenamed("bs_id",
                                                                                                  "bs_cate_1_id")
        df_cate_current = self.df_cate.filter("nodes_num>2").select("bs_id", "en_name")
        df_cate_current = df_cate_current.withColumnRenamed("en_name", "bs_cate_current_en_name").withColumnRenamed(
            "bs_id", "bs_cate_current_id")
        self.df_cate_asin_detail = self.df_cate_asin_detail.join(
            df_cate_1, on=['bs_cate_1_en_name'], how='left'
        )
        # print("self.df_cate_asin_detail1:", self.df_cate_asin_detail.count())
        self.df_cate_asin_detail = self.df_cate_asin_detail.join(
            df_cate_current, on=['bs_cate_current_en_name'], how='left'
        )
        print("2. 当bs_cate_current_id是null的时候, 通过当前分类的别名去匹配bs_cate_current_id")
        df_current_id_not_null = self.df_cate_asin_detail.filter("bs_cate_current_id is not null")
        df_current_id_null = self.df_cate_asin_detail.filter("bs_cate_current_id is null").drop("bs_cate_current_id")
        df_cate_and_en_name = self.df_cate.filter("nodes_num>2").select("and_en_name", "bs_id") \
            .withColumnRenamed("and_en_name", "bs_cate_current_en_name") \
            .withColumnRenamed("bs_id", "bs_cate_current_id")
        df_current_id_null = df_current_id_null.join(
            df_cate_and_en_name, on="bs_cate_current_en_name", how='left'
        )
        self.df_cate_asin_detail = df_current_id_not_null.unionByName(
            df_current_id_null, allowMissingColumns=True
        )
        # 3. 匹配1级分类为null的情况
        print("3.1 根据asin, bs_cate_1_id去重, null值在后")
        # print("3.2 当bs_cate_1_id为null时, 通过bs_cate_current_id匹配bs_cate_1_id")  # 没有一级分类,就不进行匹配
        window = Window.partitionBy(['asin', 'bs_cate_1_id']).orderBy(
            self.df_cate_asin_detail.bs_cate_current_id.asc_nulls_last(),
        )
        self.df_cate_asin_detail = self.df_cate_asin_detail.withColumn(
            "asin_top", F.row_number().over(window=window)
        )
        self.df_cate_asin_detail = self.df_cate_asin_detail.filter("asin_top=1")
        self.df_cate_asin_detail = self.df_cate_asin_detail.drop("asin_top")

        # 暂时去掉通过当前分类寻找一级分类
        # df_1_id_not_null = self.df_cate_asin_detail.filter("bs_cate_1_id is not null")
        # df_1_id_null = self.df_cate_asin_detail.filter("bs_cate_1_id is null").drop("bs_cate_1_id")
        # # print("df_1_id_null:", df_1_id_null.show(10))
        # df_cate_1_id = self.df_cate.filter("nodes_num>2").select("bs_id", "one_category_id") \
        #     .withColumnRenamed("bs_id", "bs_cate_current_id") \
        #     .withColumnRenamed("one_category_id", "bs_cate_1_id")
        # df_1_id_null = df_1_id_null.join(
        #     df_cate_1_id, on="bs_cate_current_id", how='left'
        # )
        # self.df_cate_asin_detail = df_1_id_not_null.unionByName(
        #     df_1_id_null, allowMissingColumns=True
        # )
        # # print("df_1_id_not_null, df_1_id_null:", df_1_id_not_null.count(), df_1_id_null.count())
        # # print("df_1_id_null:", df_1_id_null.show(10))
        # # print("self.df_cate_asin_detail:", self.df_cate_asin_detail.count(), self.df_cate_asin_detail.show(10))

        print("4. 重命名字段名称, 和hive表的字段名称保持一致")

        self.df_cate_asin_detail = self.df_cate_asin_detail. \
            withColumnRenamed("best_sellers_rank", "asin_bs_sellers_rank"). \
            withColumnRenamed("bs_cate_1_id", "asin_bs_cate_1_id"). \
            withColumnRenamed("bs_cate_current_id", "asin_bs_cate_current_id"). \
            withColumnRenamed("bs_cate_1_en_name", "asin_bs_cate_1_en_name"). \
            withColumnRenamed("bs_cate_current_en_name", "asin_bs_cate_current_en_name")


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DimBsAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()