"""
   @Author      : HuangJian
   @Description : ABA—关键词与Asin-Asin维度预聚合中间表
   @SourceTable :
                  ①dwd_st_asin_info
                  ②dim_st_asin_info
                  ③dim_asin_detail
                  ④dim_seller_asin_info
                  ⑤
   @SinkTable   : dws_aba_st_analytics_day
   @CreateTime  : 2022/11/21 15:56
   @UpdateTime  : 2022/11/21 15:56
"""

import os
import sys
from datetime import date, timedelta
import re
from functools import reduce

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType


class DwsAbaStAnalyticsDay(Templates):
    def __init__(self, site_name="us", date_type="week", date_info="2022-1"):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f"dws_aba_st_analytics_day"
        self.spark = self.create_spark_object(app_name=f"{self.db_save} {self.site_name}, {self.date_info}")
        self.df_date = self.get_year_week_tuple()
        self.year_week = self.get_year_week()

        # 写入、分区初始化
        self.df_save = self.spark.sql(f"select 1+1;")
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.reset_partitions(partitions_num=10)

        # 初始化全局df
        self.df_aba_st_analytics = self.spark.sql(f"select 1+1;")
        self.df_st_asin_info = self.spark.sql(f"select 1+1;")
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_seller_asin_info = self.spark.sql(f"select 1+1;")
        self.df_st_asin_join = self.spark.sql(f"select 1+1;")
        self.df_st_asin_orders_info = self.spark.sql(f"select 1+1;")

        # 注册自定义udf函数
        self.u_year_week = self.spark.udf.register('u_year_week', self.udf_year_week, StringType())
        self.u_get_volume = self.spark.udf.register('u_get_volume', self.udf_get_volume, DoubleType())
        self.u_get_image_type = self.spark.udf.register('u_get_image_type', self.udf_get_image_type, IntegerType())
        self.u_title_contains = self.spark.udf.register('u_title_contains', self.udf_title_contains, IntegerType())

    def get_year_week(self):
        # 根据日期获取当前周
        if self.date_type == "day":
            sql = f"select year_week from dim_date_20_to_30 where `date`='{self.date_info}'"
            df = self.spark.sql(sqlQuery=sql).toPandas()
            # print(list(df.year_week)[0])
            return list(df.year_week)[0]

    @staticmethod
    def udf_year_week(dt):
        year, week = dt.split("-")[0], dt.split("-")[1]
        if int(week) < 10:
            return f"{year}-0{week}"
        else:
            return f"{year}-{week}"

    @staticmethod
    def udf_get_volume(volume):
        # print("get_volume", volume)
        volume = str(volume)
        if volume == "null":
            return 0.0
        else:
            pattern = r"\d+\.?\d*"
            volumeList = re.findall(pattern, volume)
            if len(volumeList):
                volumeList = list(map(float, volumeList))
                result = reduce((lambda x, y: x * y), volumeList)
                return result
            else:
                return 0.0

    @staticmethod
    def udf_get_image_type(type_flag, image_type):
        str_type = str(image_type)
        type_flag = str(type_flag)
        if type_flag in str_type:
            return 1
        else:
            return 0

    @staticmethod
    def udf_title_contains(search_term, title):
        if str(search_term).lower() in str(title).lower():
            return 1
        else:
            return 0

    def read_data(self):
        # 1. 获取st 销量相关dwd表
        sql = f"select search_term, asin, 2 as st_asin_zr_orders, 10 as st_asin_bs_orders from dwd_st_asin_info " \
              f" where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' " \
              f"and st_asin_zr_page is not null"
        self.df_st_asin_orders_info = self.spark.sql(sqlQuery=sql).cache()

        # 2. 获取dwd_st_asin_info表
        sql = f"select search_term, asin,page,page_row,page_rank,data_type from dim_st_asin_info " \
              f" where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'"
        self.df_st_asin_info = self.spark.sql(sqlQuery=sql).cache()
        # print("self.df_st_asin_info", self.df_st_asin_info.show(10, truncate=False))

        # 3. 获取asin详情基表dim_asin_detail
        sql = f"select asin,asin_title,asin_title_len,asin_total_comments,asin_price,asin_rating,asin_buy_box_seller_type," \
              f"asin_volume,asin_weight,asin_img_type,asin_brand_name,asin_is_new,asin_is_sale,asin_rank  from dim_asin_detail " \
              f" where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'"
        self.df_asin_detail = self.spark.sql(sqlQuery=sql).cache()

        # 4. 获取商家相关信息dim_seller_asin_info
        sql = f" select asin,account_name,country_name  from dim_seller_asin_info " \
              f" where site_name = '{self.site_name}' and date_type = 'week' and date_info = '{self.year_week}' "
        self.df_seller_asin_info = self.spark.sql(sqlQuery=sql)
        self.df_seller_asin_info = self.df_seller_asin_info.drop_duplicates(['asin']).cache()
        # print("self.df_seller_asin_info", self.df_seller_asin_info.show(10, truncate=False))

    def handle_data(self):
        # 初始化表join,获取目标合并表
        self.handle_st_asin_join()

        # 将asin相关得原子指标聚合得到派生指标
        self.handle_asin_agg()


        # 处理销量、预估销量原子指标聚合
        self.handle_st_orders_agg()

        # 数据入库前的字段处理
        self.handle_column()

        print(self.df_save.columns)

    def handle_st_asin_join(self):
        self.df_st_asin_info = self.df_st_asin_info.drop_duplicates(['search_term', 'asin'])
        self.df_st_asin_join = self.df_st_asin_info.join(
            self.df_asin_detail, on='asin', how='left'
        ).join(
            self.df_seller_asin_info, on='asin', how='left'
        ).join(
            self.df_st_asin_orders_info, on=['search_term', 'asin'], how='left'
        )
        self.df_st_asin_join.cache()
        # print("df_st_asin_join", self.df_st_asin_join.show(10, truncate=False))

    def handle_asin_agg(self):
        # 加载需要参与计算的字段
        df_st_asin_agg = self.df_st_asin_join.select(
            "search_term", "asin", "page", "asin_title", "asin_title_len",
            "asin_total_comments", "asin_price", "asin_rating", "asin_volume",
            "asin_weight", "asin_buy_box_seller_type", "asin_img_type",
            "country_name", "asin_is_new", "asin_is_sale"
        )

        # 为计算指标打上统计标签,方便统计
        df_st_asin_agg = self.handle_asin_agg_flag(df_st_asin_agg)

        # 多列聚合-并更名
        df_st_asin_agg = self.handle_asin_group_agg(df_st_asin_agg)

        # 品牌,卖家数量统计
        df_st_asin_agg = self.handle_asin_agg_brand_seller(df_st_asin_agg)

        self.df_aba_st_analytics = df_st_asin_agg
        # print("df_aba_st_analytics", self.df_aba_st_analytics.show(10, truncate=False))

    def handle_st_orders_agg(self):
        # 获取计算销量所需要使用到的字段
        df_st_orders_data = self.df_st_asin_join.select(
            "search_term", "asin", "asin_is_new", "asin_brand_name", "account_name", "st_asin_zr_orders",
            "st_asin_bs_orders"
        ).cache()

        # 总销量 + 预估销量
        df_asin_bs_orders = df_st_orders_data.groupby(['search_term']).agg(
            F.sum("st_asin_bs_orders").alias("bsr_orders"))
        df_asin_zr_orders = df_st_orders_data.groupby(['search_term']).agg(F.sum("st_asin_zr_orders").alias("orders"))

        # 新品bs销量 + 预估销量
        df_asin_new_bs_orders = df_st_orders_data.filter("asin_is_new=1").groupby(['search_term']).agg(
            F.sum("st_asin_bs_orders").alias("new_asin_bsr_orders"))
        df_asin_new_zr_orders = df_st_orders_data.filter("asin_is_new=1").groupby(['search_term']).agg(
            F.sum("st_asin_zr_orders").alias("new_asin_orders"))

        # 品牌销量 + 预估销量
        df_asin_brand_bs_orders = df_st_orders_data.filter(
            "asin_brand_name != 'null' or asin_brand_name is not null").groupby(
            ['search_term', 'asin_brand_name']).agg(F.sum("st_asin_bs_orders").alias("asin_brand_bs_orders"))
        df_asin_brand_zr_orders = df_st_orders_data.filter(
            "asin_brand_name != 'null' or asin_brand_name is not null").groupby(
            ['search_term', 'asin_brand_name']).agg(F.sum("st_asin_zr_orders").alias("asin_brand_zr_orders"))

        # 品牌top3 bs销量+预估销量
        window = Window.partitionBy(["search_term"]).orderBy(
            df_asin_brand_bs_orders.asin_brand_bs_orders.asc_nulls_last()
        )
        df_asin_brand_bs_orders = df_asin_brand_bs_orders.withColumn("brand_rank", F.row_number().over(window=window))
        df_asin_brand_bs_orders = df_asin_brand_bs_orders.filter("brand_rank<=3")
        df_top3_brand_bs_total = df_asin_brand_bs_orders.groupby(["search_term"]).agg(
            F.sum("asin_brand_bs_orders").alias("top3_brand_bsr_orders"))

        window = Window.partitionBy(["search_term"]).orderBy(
            df_asin_brand_zr_orders.asin_brand_zr_orders.asc_nulls_last()
        )
        df_asin_brand_zr_orders = df_asin_brand_zr_orders.withColumn("brand_rank", F.row_number().over(window=window))
        df_asin_brand_zr_orders = df_asin_brand_zr_orders.filter("brand_rank<=3")
        df_top3_brand_zr_total = df_asin_brand_zr_orders.groupby(["search_term"]).agg(
            F.sum("asin_brand_zr_orders").alias("top3_brand_orders"))

        # 卖家销量 + 预估销量
        df_asin_seller_bs_orders = df_st_orders_data.groupby(['search_term', 'account_name']).agg(
            F.sum("st_asin_bs_orders").alias("asin_seller_bs_orders"))
        df_asin_seller_zr_orders = df_st_orders_data.groupby(['search_term', 'account_name']).agg(
            F.sum("st_asin_zr_orders").alias("asin_seller_zr_orders"))

        # 卖家top3 bs销量+预估销量
        window = Window.partitionBy(["search_term"]).orderBy(
            df_asin_seller_bs_orders.asin_seller_bs_orders.asc_nulls_last()
        )
        df_asin_seller_bs_orders = df_asin_seller_bs_orders.withColumn("seller_rank",
                                                                       F.row_number().over(window=window))
        df_asin_seller_bs_orders = df_asin_seller_bs_orders.filter("seller_rank<=3")
        df_top3_seller_bs_total = df_asin_seller_bs_orders.groupby(["search_term"]).agg(
            F.sum("asin_seller_bs_orders").alias("top3_seller_bsr_orders"))
        window = Window.partitionBy(["search_term"]).orderBy(
            df_asin_seller_zr_orders.asin_seller_zr_orders.asc_nulls_last()
        )
        df_asin_seller_zr_orders = df_asin_seller_zr_orders.withColumn("seller_rank",
                                                                       F.row_number().over(window=window))
        df_asin_seller_zr_orders = df_asin_seller_zr_orders.filter("seller_rank<=3")
        df_top3_seller_zr_total = df_asin_seller_zr_orders.groupby(["search_term"]).agg(
            F.sum("asin_seller_zr_orders").alias("top3_seller_orders"))

        # 关联销量各项指标
        self.df_aba_st_analytics = self.df_aba_st_analytics. \
            join(df_asin_bs_orders, on=['search_term'], how='left'). \
            join(df_asin_zr_orders, on=['search_term'], how='left'). \
            join(df_asin_new_bs_orders, on=['search_term'], how='left'). \
            join(df_asin_new_zr_orders, on=['search_term'], how='left'). \
            join(df_top3_brand_bs_total, on=['search_term'], how='left'). \
            join(df_top3_brand_zr_total, on=['search_term'], how='left'). \
            join(df_top3_seller_bs_total, on=['search_term'], how='left'). \
            join(df_top3_seller_zr_total, on=['search_term'], how='left')

    def handle_column(self):
        # 测试用字段填充
        self.df_aba_st_analytics = self.df_aba_st_analytics.withColumn("st_num", F.lit(1))
        self.df_aba_st_analytics = self.df_aba_st_analytics.withColumn("quantity_being_sold", F.lit(999))
        self.df_aba_st_analytics = self.df_aba_st_analytics.withColumn("search_volume", F.lit(999))
        self.df_aba_st_analytics = self.df_aba_st_analytics.withColumn("st_adv_num", F.lit(999))
        self.df_aba_st_analytics = self.df_aba_st_analytics.withColumn("st_zr_num", F.lit(999))

        # 列选择
        self.df_save = self.df_aba_st_analytics.select(
             "search_term", "st_num", "orders", "bsr_orders", "search_volume", "quantity_being_sold",
             "new_asin_num", "total_asin_num", "new_asin_orders", "new_asin_bsr_orders", "st_adv_num",
             "st_zr_num", "title_st_one_num", "title_page_one_total", "asin_price_total", "having_price_num",
             "asin_comment_total", "having_comment_num", "asin_rating_total", "having_rating_num", "asin_weight_total",
             "having_weight_num", "asin_volume_total", "having_volume_num", "asin_title_len_total", "having_title_num",
             "is_A_num", "is_video_num", "is_FBM_num", "is_CN_num", "is_Amazon_num", "asin_brand_num",
             "asin_account_num", "top3_seller_orders", "top3_seller_bsr_orders", "top3_brand_orders",
             "top3_brand_bsr_orders"
             )

        # 空值处理
        self.df_save = self.df_save.na.fill(
            {
                "search_term": 0, "st_num": 0, "orders": 0, "bsr_orders": 0, "search_volume": 0,
                "quantity_being_sold": 0, "new_asin_num": 0, "total_asin_num": 0, "new_asin_orders": 0,
                "new_asin_bsr_orders": 0, "st_adv_num": 0, "st_zr_num": 0, "title_st_one_num": 0,
                "title_page_one_total": 0, "asin_price_total": 0, "having_price_num": 0, "asin_comment_total": 0,
                "having_comment_num": 0, "asin_rating_total": 0, "having_rating_num": 0, "asin_weight_total": 0,
                "having_weight_num": 0, "asin_volume_total": 0, "having_volume_num": 0, "asin_title_len_total": 0,
                "having_title_num": 0, "is_A_num": 0, "is_video_num": 0, "is_FBM_num": 0, "is_CN_num": 0,
                "is_Amazon_num": 0, "asin_brand_num": 0, "asin_account_num": 0, "top3_seller_orders": 0,
                "top3_seller_bsr_orders": 0, "top3_brand_orders": 0, "top3_brand_bsr_orders": 0

            })

        # 预留字段补全
        self.df_save = self.df_save.withColumn("re_double_field1", F.lit(0.0))
        self.df_save = self.df_save.withColumn("re_double_field2", F.lit(0.0))
        self.df_save = self.df_save.withColumn("re_double_field3", F.lit(0.0))
        self.df_save = self.df_save.withColumn("re_int_field1", F.lit(0))
        self.df_save = self.df_save.withColumn("re_int_field2", F.lit(0))
        self.df_save = self.df_save.withColumn("re_int_field3", F.lit(0))

        # 分区字段补全
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))

    def handle_asin_agg_flag(self, df_st_asin_agg):
        # 为统计指标打上标签方便计算
        # asin标题在关键词第一页数量
        df_st_asin_agg = df_st_asin_agg.withColumn("title_st_one_flag", F.when(F.col("page") == 1,
                                                                               self.u_title_contains(
                                                                                   F.col("search_term"),
                                                                                   F.col("asin_title"))).
                                                   otherwise(F.lit(0)))

        # A+产品标签
        df_st_asin_agg = df_st_asin_agg.withColumn("is_A_flag",
                                                   self.u_get_image_type(F.lit('3'), F.col("asin_img_type")))
        # 视频产品标签
        df_st_asin_agg = df_st_asin_agg.withColumn("is_video_flag",
                                                   self.u_get_image_type(F.lit('2'), F.col("asin_img_type")))
        # FBM产品标签
        df_st_asin_agg = df_st_asin_agg.withColumn("is_FBM_flag",
                                                   F.when(F.col("asin_buy_box_seller_type") == 3, F.lit(1)).otherwise(
                                                       F.lit(0)))
        # Amazon自营产品标签
        df_st_asin_agg = df_st_asin_agg.withColumn("is_Amazon_flag",
                                                   F.when(F.col("asin_buy_box_seller_type") == 1, F.lit(1)).otherwise(
                                                       F.lit(0)))
        # 中国卖家标签
        df_st_asin_agg = df_st_asin_agg.withColumn("is_CN_flag",
                                                   F.when(F.col("country_name") == 'CN', F.lit(1)).otherwise(F.lit(0)))
        # asin体积从5.12"D x 6.69"W x 1.38"H中提取并计算计算
        df_st_asin_agg = df_st_asin_agg.withColumn("asin_volume_val", self.u_get_volume(F.col("asin_volume")))
        # 是否新品统计标签
        df_st_asin_agg = df_st_asin_agg.withColumn("asin_is_new_flag",
                                                   F.when(F.col("asin_is_new") == 1, F.lit(1)).otherwise(F.lit(0)))
        # 第一页标题得统计标签
        df_st_asin_agg = df_st_asin_agg.withColumn("title_page_one_flag",
                                                   F.when((F.col("page") == 1) & (F.col("asin_title_len") > 0.0),
                                                          F.lit(1)).otherwise(F.lit(0)))
        # 计算 售价、分数、星级、重量、体积、标题 统计分母(为保证不扩大误差,匹配上以上数据且数值>0的方为计数分母)
        df_st_asin_agg = df_st_asin_agg.withColumn("price_flag",
                                                   F.when(F.col("asin_price") > 0.0, F.lit(1)).otherwise(F.lit(0)))
        df_st_asin_agg = df_st_asin_agg.withColumn("rating_flag",
                                                   F.when(F.col("asin_rating") > 0.0, F.lit(1)).otherwise(F.lit(0)))
        df_st_asin_agg = df_st_asin_agg.withColumn("weight_flag",
                                                   F.when(F.col("asin_weight") > 0.0, F.lit(1)).otherwise(F.lit(0)))
        df_st_asin_agg = df_st_asin_agg.withColumn("comments_flag",
                                                   F.when(F.col("asin_total_comments") > 0.0, F.lit(1)).otherwise(
                                                       F.lit(0)))
        df_st_asin_agg = df_st_asin_agg.withColumn("volume_flag",
                                                   F.when(F.col("asin_volume_val") > 0.0, F.lit(1)).otherwise(F.lit(0)))
        # 统计有asin标题标签
        df_st_asin_agg = df_st_asin_agg.withColumn("title_flag",
                                                   F.when(F.col("asin_title_len") > 0.0, F.lit(1)).otherwise(F.lit(0)))

        return df_st_asin_agg

    def handle_asin_group_agg(self, df_st_asin_agg):
        # 将统计好的flag聚合得到计算聚合总数和数量
        df_st_asin_agg = df_st_asin_agg.groupby(['search_term']) \
            .agg(
            F.count("asin").alias("total_asin_num"),
            F.sum("asin_title_len").alias("asin_title_len_total"),
            F.sum("asin_total_comments").alias("asin_comment_total"),
            F.sum("asin_price").alias("asin_price_total"),
            F.sum("asin_rating").alias("asin_rating_total"),
            F.sum("asin_volume_val").alias("asin_volume_total"),
            F.sum("asin_weight").alias("asin_weight_total"),
            F.sum("is_A_flag").alias("is_A_num"),
            F.sum("is_video_flag").alias("is_video_num"),
            F.sum("is_FBM_flag").alias("is_FBM_num"),
            F.sum("is_Amazon_flag").alias("is_Amazon_num"),
            F.sum("is_CN_flag").alias("is_CN_num"),
            F.sum("title_st_one_flag").alias("title_st_one_num"),
            F.sum("asin_is_new_flag").alias("new_asin_num"),
            F.sum("title_page_one_flag").alias("title_page_one_total"),
            F.sum("price_flag").alias("having_price_num"),
            F.sum("rating_flag").alias("having_rating_num"),
            F.sum("weight_flag").alias("having_weight_num"),
            F.sum("comments_flag").alias("having_comment_num"),
            F.sum("volume_flag").alias("having_volume_num"),
            F.sum("title_flag").alias("having_title_num")
        )
        return df_st_asin_agg

    def handle_asin_agg_brand_seller(self, df_st_asin_agg):
        # 品牌数量
        df_brand_count = self.df_st_asin_join.select("search_term", "asin_brand_name")
        df_brand_count = df_brand_count.filter(" asin_brand_name is not null or asin_brand_name != 'null' ")
        df_brand_count = df_brand_count.groupby(['search_term']) \
            .agg(F.count_distinct("asin_brand_name").alias("asin_brand_num"))

        # 卖家数量
        df_account_count = self.df_st_asin_join.select("search_term", "account_name")
        df_account_count = df_account_count.filter(" account_name is not null or account_name != 'null' ")
        df_account_count = df_account_count.groupby(['search_term']) \
            .agg(F.count_distinct("account_name").alias("asin_account_num"))
        # 关联补入到聚合df_st_asin_agg
        df_st_asin_agg = df_st_asin_agg. \
            join(df_brand_count, on=['search_term'], how='left'). \
            join(df_account_count, on=['search_term'], how='left')
        return df_st_asin_agg


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwsAbaStAnalyticsDay(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()