# author : wangrui
# data : 2024/5/29 17:27


import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录

from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from pyspark.storagelevel import StorageLevel
from pyspark.sql import Window
from pyspark.sql import functions as F
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil
from pyspark.sql.types import *


class DwsLatestAsinGeneralAttributes(object):
    def __init__(self, site_name, date_type, date_info):
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.hive_tb = f'dws_latest_asin_general_attributes'
        self.partition_dict = {
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
        # 落表路径校验
        self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)
        # 创建spark_session对象相关
        app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.partition_num = CommonUtil.reset_partitions(self.site_name, partitions_num=80)
        # 初始化全局dataframe
        self.df_asin_detail = self.spark.sql(f"select 1+1")
        self.df_asin_bs_category = self.spark.sql(f"select 1+1")
        self.df_fd_asin_info = self.spark.sql(f"select 1+1")
        self.df_asin_measure = self.spark.sql(f"select 1+1")
        self.df_main = self.spark.sql(f"select 1+1")
        self.df_hide_category = self.spark.sql(f"select 1+1")
        self.df_bsr_end = self.spark.sql(f"select 1+1")

        # 读取数据
    def read_data(self):
        print("1. 读取dim_asin_detail获取变体信息")
        sql = f"""
            select asin, parent_asin, one_star as asin_one_star, two_star as asin_two_star, asin_brand_name, 
            asin_is_brand, asin_is_alarm, three_star as asin_three_star, four_star as asin_four_star, 
            five_star as asin_five_star, low_star as asin_low_star, variation_num as asin_variation_num, asin_rating, 
            asin_total_comments, asin_buy_box_seller_type, account_name, account_id, updated_time, 
            category_id as top_category_id, category_first_id as top_category_first_id from dim_asin_detail 
            where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
            and parent_asin is not null"""
        print("sql:", sql)
        self.df_asin_detail = self.spark.sql(sql)
        self.df_asin_detail = self.df_asin_detail.na.fill({"asin_is_brand": 0, "asin_is_alarm": 0})
        self.df_asin_detail = self.df_asin_detail.repartition(60).persist(StorageLevel.DISK_ONLY)
        self.df_asin_detail.show(10, truncate=False)
        print("2. 读取dim_asin_bs_category获取分类信息")
        sql = f"""
              select asin, asin_bs_cate_1_rank as first_category_rank, asin_bs_cate_1_id as category_first_id, 
              asin_bs_cate_current_rank as current_category_rank, asin_bs_cate_current_id as category_id 
              from dim_asin_bs_info where site_name='{self.site_name}' and date_type='{self.date_type}' 
              and date_info = '{self.date_info}'"""
        print("sql:", sql)
        self.df_asin_bs_category = self.spark.sql(sqlQuery=sql)
        self.df_asin_bs_category = self.df_asin_bs_category.repartition(60).persist(StorageLevel.DISK_ONLY)
        self.df_asin_bs_category.show(10, truncate=False)
        print("3. 读取dim_fd_asin_info, 获取卖家信息")
        if (self.date_type in ['month', 'month_week'] and self.date_info >= '2024-05') or (
                self.date_type == '4_week' and self.date_info >= '2024-21'):
            sql = f"""
                   select fd_unique as account_id, upper(fd_country_name) as asin_seller_country_name 
                   from dim_fd_asin_info where site_name='{self.site_name}' and fd_unique is not null 
                   group by fd_unique, fd_country_name"""
        else:
            sql = f"""
                   select asin, account_id, account_name, asin_seller_country_name  
                   from (select fd_unique as account_id, fd_account_name as account_name, 
                   upper(fd_country_name) as asin_seller_country_name, asin, 
                   ROW_NUMBER() OVER (PARTITION BY asin ORDER BY updated_at DESC) AS t_rank 
                   from dim_fd_asin_info where site_name = '{self.site_name}' and fd_unique is not null) tmp
                   where tmp.t_rank = 1"""
        print("sql:", sql)
        self.df_fd_asin_info = self.spark.sql(sqlQuery=sql)
        self.df_fd_asin_info = self.df_fd_asin_info.repartition(60).persist(StorageLevel.DISK_ONLY)
        self.df_fd_asin_info.show(10, truncate=False)
        print("4. 读取dwd_asin_measure, 获取bsr销量、母体ao、母体自然流量占比等信息")
        sql = f"""
            select asin ,cast(asin_bsr_orders as int) as asin_bsr_orders, round(asin_flow_proportion_matrix, 3) as asin_flow_proportion_matrix, 
            round(asin_ao_val_matrix, 3) as asin_ao_val_matrix from dwd_asin_measure 
            where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"""
        print("sql:" + sql)
        self.df_asin_measure = self.spark.sql(sqlQuery=sql)
        self.df_asin_measure = self.df_asin_measure.repartition(60).persist(StorageLevel.DISK_ONLY)
        self.df_asin_measure.show(10, truncate=False)
        print("5. 读取隐藏分类信息")
        sql = f"""
            select category_id_base as category_id, 1 as hide_flag from us_bs_category_hide group by category_id_base"""
        print("sql:", sql)
        mysql_con_info = DBUtil.get_connection_info(db_type='mysql', site_name='us')
        if mysql_con_info is not None:
            df_hide_category = SparkUtil.read_jdbc_query(
                session=self.spark, url=mysql_con_info['url'], pwd=mysql_con_info['pwd'],
                username=mysql_con_info['username'], query=sql)
            self.df_hide_category = F.broadcast(df_hide_category)
            self.df_hide_category.show(10, truncate=False)
        print("6.获取ods_bsr_end,获取有效rank信息")
        sql = f"""select rank as limit_rank, category_id as category_first_id from ods_bsr_end where site_name='{self.site_name}'"""
        print("sql:", sql)
        df_bsr_end = self.spark.sql(sqlQuery=sql)
        self.df_bsr_end = F.broadcast(df_bsr_end)
        self.df_bsr_end.show(10, truncate=False)

    # 获取变体下最新asin
    def handle_latest_asin(self):
        latest_asin_window = Window.partitionBy('parent_asin').orderBy(
            F.desc_nulls_last("updated_time")
        )
        self.df_asin_detail = self.df_asin_detail.withColumn("u_rank", F.row_number().over(window=latest_asin_window))
        self.df_asin_detail = self.df_asin_detail.filter("u_rank=1").drop("u_rank", "updated_time")
        self.df_asin_detail = self.df_asin_detail.repartition(60)

    # 获取变体下最新asin的通用详情
    def handle_latest_asin_detail(self):
        if (self.date_type in ['month', 'month_week'] and self.date_info >= '2024-05') or (
                self.date_type == '4_week' and self.date_info >= '2024-21'):
            self.df_main = self.df_asin_detail.join(self.df_fd_asin_info, on=['account_id'], how='left')
        else:
            self.df_asin_detail = self.df_asin_detail.drop("account_id", "account_name")
            self.df_main = self.df_asin_detail.join(self.df_fd_asin_info, on=['asin'], how='left')
        self.df_main = self.df_main.join(
            self.df_asin_bs_category, on=['asin'], how='left'
        ).join(
            self.df_asin_measure, on=['asin'], how='left'
        )
        self.df_main = self.df_main.withColumn(
            "category_id", F.coalesce(F.col("category_id"), F.col("top_category_id"))
        ).withColumn(
            "category_first_id", F.coalesce(F.col("category_first_id"), F.col("top_category_first_id"))
        ).drop("top_category_id", "top_categoty_first_id")

    # 处理asin分类是否属于隐藏分类
    def handle_asin_category_is_hide(self):
        self.df_main = self.df_main.join(self.df_hide_category, on=['category_id'], how='left')
        self.df_main = self.df_main.withColumn("asin_is_hide", F.expr("""
        CASE WHEN hide_flag = 1 THEN 1 WHEN category_first_id = 'grocery' and category_id != '6492272011' THEN 1 
        WHEN category_first_id in ('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed') THEN 1 
        WHEN category_id in ('21393128011', '21377129011', '21377127011', '21377130011', '21388218011', '21377132011') THEN 1 ELSE 0 END
        """))

    # 处理变体下最新asin的基础类别信息
    def handle_asin_basic_type(self):
        self.df_main = self.df_main.join(self.df_bsr_end, on=['category_first_id'], how='left')
        self.df_main = self.df_main.withColumn(
            "asin_rank_type", F.expr("""
            CASE WHEN first_category_rank IS NOT NULL AND first_category_rank BETWEEN 0 AND 1000 THEN 1 
            WHEN first_category_rank BETWEEN 1000 AND 5000 THEN 2 WHEN first_category_rank BETWEEN 5000 AND 10000 THEN 3 
            WHEN first_category_rank BETWEEN 10000 AND 20000 THEN 4 WHEN first_category_rank BETWEEN 20000 AND 30000 THEN 5 
            WHEN first_category_rank BETWEEN 30000 AND 50000 THEN 6 WHEN first_category_rank BETWEEN 50000 AND 70000 THEN 7 
            WHEN first_category_rank >= 70000 THEN 8 ELSE 0 END""")
        ).withColumn(
            "asin_site_name_type",  F.expr("""CASE WHEN asin_buy_box_seller_type = 1  THEN 4 
            WHEN asin_buy_box_seller_type != 1 AND asin_seller_country_name is not null AND asin_seller_country_name like '%US%' THEN 1 
            WHEN asin_buy_box_seller_type != 1 AND asin_seller_country_name is not null AND asin_seller_country_name like '%CN%' THEN 2 ELSE 3 END""")
        ).withColumn(
            "asin_rating_type",
            F.expr("""
            CASE WHEN asin_rating >= 4.5 THEN 1 
            WHEN asin_rating >= 4 AND asin_rating < 4.5 THEN 2 
            WHEN asin_rating >= 3.5 AND asin_rating < 4 THEN 3 
            WHEN asin_rating >= 3 AND asin_rating < 3.5 THEN 4 
            WHEN asin_rating < 3 AND asin_rating >= 0 THEN 5 ELSE 0 END""")
        ).withColumn(
            "bsr_type", F.expr("""CASE WHEN limit_rank is null and category_first_id <= 500000 THEN 1 WHEN limit_rank is not null and category_first_id <= limit_rank THEN 1 ELSE 0 END""")
        ).drop("limit_rank")

    # 字段标准化及存储数据
    def df_save(self):
        df_save = self.df_main\
            .select("parent_asin",
                    "asin",
                    "asin_one_star",
                    "asin_two_star",
                    "asin_three_star",
                    "asin_four_star",
                    "asin_five_star",
                    "asin_low_star",
                    "asin_variation_num",
                    "asin_rating",
                    "asin_rating_type",
                    "asin_total_comments",
                    "asin_buy_box_seller_type",
                    "account_name",
                    "account_id",
                    "asin_brand_name",
                    "asin_is_brand",
                    "asin_is_alarm",
                    "asin_is_hide",
                    "first_category_rank",
                    "current_category_rank",
                    "category_first_id",
                    "category_id",
                    "asin_rank_type",
                    "bsr_type",
                    "asin_seller_country_name",
                    "asin_site_name_type",
                    "asin_bsr_orders",
                    "asin_flow_proportion_matrix",
                    "asin_ao_val_matrix",
                    F.lit(self.site_name).alias('site_name'),
                    F.lit(self.date_type).alias('date_type'),
                    F.lit(self.date_info).alias('date_info')).cache()
        df_save = df_save.drop_duplicates(['parent_asin'])
        df_save = df_save.repartition(60)
        print("dws_latest_asin_detail处理完毕, 最后的数据量为: ", df_save.count())
        df_save.show(10, truncate=False)
        print(f"清除hdfs目录中:{self.hdfs_path}")
        HdfsUtils.delete_file_in_folder(self.hdfs_path)
        partition_by = ["site_name", "date_type", "date_info"]
        print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}")
        df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=self.partitions_by)
        print("success")

    def run(self):
        self.read_data()
        self.handle_latest_asin()
        self.handle_latest_asin_detail()
        self.handle_asin_category_is_hide()
        self.handle_asin_basic_type()
        self.df_save()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter/day
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
    latest_asin_obj = DwsLatestAsinGeneralAttributes(site_name, date_type, date_info)
    latest_asin_obj.run()